use crate::{
AccountEventKind, AccountSnapshot, InstrumentAccountSnapshot, UnindexedAccountEvent,
UnindexedAccountSnapshot,
balance::{AssetBalance, Balance},
client::ExecutionClient,
error::{ApiError, ConnectivityError, OrderError, UnindexedClientError, UnindexedOrderError},
order::{
Order, OrderKey, OrderKind, TimeInForce,
id::{ClientOrderId, OrderId, StrategyId},
request::{OrderRequestCancel, OrderRequestOpen, UnindexedOrderResponseCancel},
state::{Cancelled, Filled, Open, OrderState, UnindexedOrderState},
},
trade::{AssetFees, Trade, TradeId},
};
use binance_sdk::{
common::{
config::{ConfigurationRestApi, ConfigurationWebsocketApi},
errors::WebsocketError,
models::WebsocketEvent,
},
spot::{
SpotRestApi, SpotWsApi,
rest_api::{GetAccountParams, GetOpenOrdersParams, MyTradesParams, RestApi},
websocket_api::{
OrderCancelParams, OrderPlaceParams, OrderPlaceSideEnum, OrderPlaceTimeInForceEnum,
OrderPlaceTypeEnum, UserDataStreamEventsResponse,
UserDataStreamSubscribeSignatureParams, WebsocketApi, WebsocketApiHandle,
},
},
};
use chrono::{DateTime, TimeZone, Utc};
use futures::stream::BoxStream;
use lru::LruCache;
use rust_decimal::Decimal;
use rustrade_instrument::{
Side, asset::name::AssetNameExchange, exchange::ExchangeId,
instrument::name::InstrumentNameExchange,
};
use serde::Deserialize;
use smol_str::{SmolStr, format_smolstr};
use std::{
num::NonZeroUsize,
pin::Pin,
str::FromStr,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
task::{Context, Poll},
time::Duration,
};
use tokio::sync::{RwLock, mpsc, oneshot};
use tracing::{debug, error, info, trace, warn};
struct AbortOnDropStream<S> {
inner: S,
handle: tokio::task::JoinHandle<()>,
}
impl<S> AbortOnDropStream<S> {
fn new(inner: S, handle: tokio::task::JoinHandle<()>) -> Self {
Self { inner, handle }
}
}
impl<S: futures::Stream + Unpin> futures::Stream for AbortOnDropStream<S> {
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
impl<S> Drop for AbortOnDropStream<S> {
fn drop(&mut self) {
self.handle.abort();
}
}
const INITIAL_BACKOFF_MS: u64 = 1_000;
const MAX_BACKOFF_MS: u64 = 30_000;
const MAX_RECONNECT_ATTEMPTS: u32 = 10;
const HEARTBEAT_TIMEOUT_SECS: u64 = 30;
const FILL_RECOVERY_TIMEOUT_SECS: u64 = 30;
const CONNECT_TIMEOUT_SECS: u64 = 15;
const SIGNAL_RECOVERY_LOOKBACK_MS: i64 = 500;
const DEDUP_CACHE_SIZE: usize = 10_000;
const BINANCE_MAX_TRADES: usize = 1000;
const _: () = assert!(
BINANCE_MAX_TRADES <= i32::MAX as usize,
"BINANCE_MAX_TRADES overflows i32"
);
const DEFAULT_RATE_LIMIT_DELAY_SECS: u64 = 10;
const MAX_RATE_LIMIT_RETRIES: u32 = 3;
#[derive(Debug, Hash, Eq, PartialEq, Clone, Copy)]
enum DedupEventKind {
Trade,
New,
Cancelled,
}
#[derive(Debug, Hash, Eq, PartialEq)]
struct DedupKey {
instrument: SmolStr,
id: SmolStr,
kind: DedupEventKind,
}
type SharedDedupCache = Arc<parking_lot::Mutex<LruCache<DedupKey, ()>>>;
fn new_dedup_cache() -> SharedDedupCache {
#[allow(clippy::unwrap_used)]
Arc::new(parking_lot::Mutex::new(LruCache::new(
NonZeroUsize::new(DEDUP_CACHE_SIZE).unwrap(),
)))
}
fn dedup_key_from_event(event: &UnindexedAccountEvent) -> Option<DedupKey> {
match &event.kind {
AccountEventKind::Trade(trade) => Some(DedupKey {
instrument: trade.instrument.name().clone(),
id: trade.id.0.clone(),
kind: DedupEventKind::Trade,
}),
AccountEventKind::OrderSnapshot(snap) => {
match &snap.0.state {
OrderState::Active(active) => {
use crate::order::state::ActiveOrderState;
match active {
ActiveOrderState::Open(open) => Some(DedupKey {
instrument: snap.0.key.instrument.name().clone(),
id: open.id.0.clone(),
kind: DedupEventKind::New,
}),
_ => None,
}
}
_ => None,
}
}
AccountEventKind::OrderCancelled(resp) => match &resp.state {
Ok(cancelled) => Some(DedupKey {
instrument: resp.key.instrument.name().clone(),
id: cancelled.id.0.clone(),
kind: DedupEventKind::Cancelled,
}),
Err(_) => None, },
_ => None, }
}
fn is_duplicate(cache: &SharedDedupCache, key: DedupKey) -> bool {
let mut guard = cache.lock();
if guard.peek(&key).is_some() {
return true;
}
guard.put(key, ());
false
}
struct RateLimitTracker {
blocked_until: parking_lot::Mutex<Option<tokio::time::Instant>>,
}
impl RateLimitTracker {
fn new() -> Self {
Self {
blocked_until: parking_lot::Mutex::new(None),
}
}
async fn wait_if_blocked(&self) {
loop {
let deadline = *self.blocked_until.lock();
match deadline {
None => return,
Some(until) => {
let now = tokio::time::Instant::now();
if until <= now {
return;
}
#[allow(clippy::cast_possible_truncation)]
let delay_ms = (until - now).as_millis() as u64;
debug!(
delay_ms,
"BinanceSpot REST rate-limited, waiting before request"
);
tokio::time::sleep_until(until).await;
}
}
}
}
fn on_rate_limited(&self, retry_after: Option<Duration>) {
let delay = retry_after.unwrap_or(Duration::from_secs(DEFAULT_RATE_LIMIT_DELAY_SECS));
let new_deadline = tokio::time::Instant::now() + delay;
let mut guard = self.blocked_until.lock();
let was_blocked = guard.is_some();
*guard = Some(guard.map_or(new_deadline, |existing| existing.max(new_deadline)));
if was_blocked {
debug!(
delay_secs = delay.as_secs(),
"BinanceSpot rate-limit cooldown extended"
);
} else {
warn!(
delay_secs = delay.as_secs(),
"BinanceSpot entering rate-limit degradation mode"
);
}
}
}
fn is_rate_limit_error(e: &anyhow::Error) -> bool {
for cause in e.chain() {
let msg = cause.to_string();
if msg.contains("Too many requests")
|| msg.contains("been banned for exceeding rate limits")
|| contains_error_code(&msg, "-1015")
|| contains_error_code(&msg, "-1003")
{
return true;
}
}
false
}
fn is_api_rejection_error(e: &anyhow::Error) -> bool {
e.downcast_ref::<WebsocketError>()
.is_some_and(|we| matches!(we, WebsocketError::ResponseError { .. }))
}
struct ExponentialBackoff {
attempt: u32,
max_attempts: u32,
initial_ms: u64,
max_ms: u64,
}
impl ExponentialBackoff {
fn new() -> Self {
Self {
attempt: 0,
max_attempts: MAX_RECONNECT_ATTEMPTS,
initial_ms: INITIAL_BACKOFF_MS,
max_ms: MAX_BACKOFF_MS,
}
}
fn reset(&mut self) {
self.attempt = 0;
}
async fn wait(&mut self) -> bool {
if self.attempt >= self.max_attempts {
return false;
}
let delay_ms = self
.initial_ms
.saturating_mul(2u64.saturating_pow(self.attempt))
.min(self.max_ms);
self.attempt += 1;
debug!(
attempt = self.attempt,
max = self.max_attempts,
delay_ms,
"BinanceSpot reconnect backoff"
);
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
true
}
}
#[derive(Clone, Deserialize)]
pub struct BinanceSpotConfig {
api_key: String,
secret_key: String,
pub testnet: bool,
}
impl std::fmt::Debug for BinanceSpotConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BinanceSpotConfig")
.field("api_key", &"***")
.field("secret_key", &"***")
.field("testnet", &self.testnet)
.finish()
}
}
impl BinanceSpotConfig {
pub fn new(api_key: String, secret_key: String, testnet: bool) -> Self {
Self {
api_key,
secret_key,
testnet,
}
}
pub fn api_key(&self) -> &str {
&self.api_key
}
}
#[derive(Clone)]
pub struct BinanceSpot {
config: Arc<BinanceSpotConfig>,
rest: Arc<RestApi>,
ws_handle: WebsocketApiHandle,
ws_api: Arc<RwLock<Option<WebsocketApi>>>,
rate_limiter: Arc<RateLimitTracker>,
}
impl std::fmt::Debug for BinanceSpot {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BinanceSpot")
.field("testnet", &self.config.testnet)
.finish_non_exhaustive()
}
}
impl BinanceSpot {
#[allow(clippy::expect_used)] fn build_rest(config: &BinanceSpotConfig) -> Arc<RestApi> {
let rest_config = ConfigurationRestApi::builder()
.api_key(config.api_key.clone())
.api_secret(config.secret_key.clone())
.build()
.expect("failed to build Binance REST configuration");
Arc::new(if config.testnet {
SpotRestApi::testnet(rest_config)
} else {
SpotRestApi::production(rest_config)
})
}
#[allow(clippy::expect_used)] fn build_ws_handle(config: &BinanceSpotConfig) -> WebsocketApiHandle {
let ws_config = ConfigurationWebsocketApi::builder()
.api_key(config.api_key.clone())
.api_secret(config.secret_key.clone())
.build()
.expect("failed to build Binance WebSocket configuration");
if config.testnet {
SpotWsApi::testnet(ws_config)
} else {
SpotWsApi::production(ws_config)
}
}
async fn get_ws_api(&self) -> anyhow::Result<WebsocketApi> {
{
let guard = self.ws_api.read().await;
if let Some(ref ws) = *guard {
return Ok(ws.clone());
}
}
let mut guard = self.ws_api.write().await;
if let Some(ref ws) = *guard {
return Ok(ws.clone());
}
let ws = tokio::time::timeout(
Duration::from_secs(CONNECT_TIMEOUT_SECS),
self.ws_handle.connect(),
)
.await
.map_err(|_| {
anyhow::anyhow!("BinanceSpot WS connect timed out after {CONNECT_TIMEOUT_SECS}s")
})??;
*guard = Some(ws.clone());
Ok(ws)
}
async fn clear_ws_api(&self) {
let ws = {
let mut guard = self.ws_api.write().await;
guard.take()
};
if let Some(ws) = ws {
match tokio::time::timeout(Duration::from_secs(5), ws.disconnect()).await {
Ok(Err(e)) => warn!(%e, "BinanceSpot failed to disconnect stale WS session"),
Err(_) => warn!("BinanceSpot WS disconnect timed out (5s)"),
Ok(Ok(())) => {}
}
}
}
}
async fn rest_call_with_retry<T>(
rest: &Arc<RestApi>,
rate_limiter: &RateLimitTracker,
mut make_call: impl FnMut(
Arc<RestApi>,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = anyhow::Result<T>> + Send>,
>,
) -> anyhow::Result<T> {
for attempt in 0..=MAX_RATE_LIMIT_RETRIES {
rate_limiter.wait_if_blocked().await;
match make_call(Arc::clone(rest)).await {
Ok(v) => return Ok(v),
Err(e) if is_rate_limit_error(&e) && attempt < MAX_RATE_LIMIT_RETRIES => {
let delay = Duration::from_secs(2u64.saturating_pow(attempt).min(30));
warn!(
attempt = attempt + 1,
max = MAX_RATE_LIMIT_RETRIES,
delay_secs = delay.as_secs(),
"BinanceSpot REST rate-limited, retrying"
);
rate_limiter.on_rate_limited(Some(delay));
}
Err(e) => return Err(e),
}
}
unreachable!("BinanceSpot REST retries exhausted: loop invariant violated")
}
async fn fetch_open_orders_for_instrument(
rest: Arc<RestApi>,
rate_limiter: Arc<RateLimitTracker>,
instrument: InstrumentNameExchange,
) -> Result<
(
InstrumentNameExchange,
Vec<Order<ExchangeId, InstrumentNameExchange, Open>>,
),
UnindexedClientError,
> {
let symbol_str = instrument.name().to_string();
let response = rest_call_with_retry(&rest, &rate_limiter, |rest| {
let sym = symbol_str.clone();
Box::pin(async move {
let params = GetOpenOrdersParams::builder().symbol(sym).build()?;
rest.get_open_orders(params).await
})
})
.await
.map_err(connectivity_error)?;
let orders_data = response
.data()
.await
.map_err(|e| connectivity_error(e.into()))?;
let orders = orders_data
.into_iter()
.filter_map(|o| convert_open_order(&o, &instrument))
.collect();
Ok((instrument, orders))
}
async fn paginate_my_trades(
rest: &Arc<RestApi>,
rate_limiter: &Arc<RateLimitTracker>,
instrument: &InstrumentNameExchange,
start_time_ms: i64,
) -> Result<Vec<binance_sdk::spot::rest_api::MyTradesResponseInner>, UnindexedClientError> {
let symbol_str = instrument.name().to_string();
let mut all_pages = Vec::new();
let mut cursor: Option<i64> = None;
loop {
let fid = cursor; let response = rest_call_with_retry(rest, rate_limiter, |rest| {
let sym = symbol_str.clone();
let stm = start_time_ms;
Box::pin(async move {
#[allow(clippy::cast_possible_truncation)]
let builder = MyTradesParams::builder(sym).limit(BINANCE_MAX_TRADES as i32);
let params = if let Some(id) = fid {
builder.from_id(id).build()?
} else {
builder.start_time(stm).build()?
};
rest.my_trades(params).await
})
})
.await
.map_err(connectivity_error)?;
let page = response
.data()
.await
.map_err(|e| connectivity_error(e.into()))?;
let page_len = page.len();
let last_id = page.last().and_then(|t| t.id);
all_pages.extend(page);
if page_len < BINANCE_MAX_TRADES {
break;
}
match last_id {
Some(id) => {
debug!(%instrument, "BinanceSpot paginate_my_trades: fetching next page ({page_len} results)");
match id.checked_add(1) {
Some(next) => cursor = Some(next),
None => break, }
}
None => {
warn!(%instrument, "BinanceSpot paginate_my_trades: trade missing ID, stopping pagination");
break;
}
}
}
Ok(all_pages)
}
impl ExecutionClient for BinanceSpot {
const EXCHANGE: ExchangeId = ExchangeId::BinanceSpot;
type Config = BinanceSpotConfig;
type AccountStream = BoxStream<'static, UnindexedAccountEvent>;
fn new(config: Self::Config) -> Self {
let rest = Self::build_rest(&config);
let ws_handle = Self::build_ws_handle(&config);
Self {
config: Arc::new(config),
rest,
ws_handle,
ws_api: Arc::new(RwLock::new(None)),
rate_limiter: Arc::new(RateLimitTracker::new()),
}
}
async fn account_snapshot(
&self,
assets: &[AssetNameExchange],
instruments: &[InstrumentNameExchange],
) -> Result<UnindexedAccountSnapshot, UnindexedClientError> {
let response = rest_call_with_retry(&self.rest, &self.rate_limiter, |rest| {
Box::pin(async move {
let params = GetAccountParams::builder().build()?;
rest.get_account(params).await
})
})
.await
.map_err(connectivity_error)?;
let account = response
.data()
.await
.map_err(|e| connectivity_error(e.into()))?;
let balances = filter_and_convert_balances(account.balances.unwrap_or_default(), assets);
use futures::{StreamExt as _, TryStreamExt};
let instrument_snapshots: Vec<_> =
futures::stream::iter(instruments.iter().cloned().map(|instrument| {
fetch_open_orders_for_instrument(
self.rest.clone(),
self.rate_limiter.clone(),
instrument,
)
}))
.buffer_unordered(8)
.map(|result| {
let (inst, orders) = result?;
let wrapped = orders
.into_iter()
.map(|o| Order {
key: o.key,
side: o.side,
price: o.price,
quantity: o.quantity,
kind: o.kind,
time_in_force: o.time_in_force,
state: OrderState::active(o.state),
})
.collect();
Ok::<_, UnindexedClientError>(InstrumentAccountSnapshot::new(inst, wrapped, None))
})
.try_collect()
.await?;
Ok(AccountSnapshot::new(
ExchangeId::BinanceSpot,
balances,
instrument_snapshots,
))
}
async fn account_stream(
&self,
_assets: &[AssetNameExchange],
instruments: &[InstrumentNameExchange],
) -> Result<Self::AccountStream, UnindexedClientError> {
let (tx, rx) = mpsc::unbounded_channel::<UnindexedAccountEvent>();
let dedup = new_dedup_cache();
let ws_handle = self.ws_handle.clone();
let rest = self.rest.clone();
let rate_limiter = self.rate_limiter.clone();
let instruments = instruments.to_vec();
debug_assert!(
instruments.iter().all(|i| i.name().len() <= 23),
"instrument name exceeds SmolStr inline capacity: {:?}",
instruments.iter().find(|i| i.name().len() > 23)
);
let initial_ws = ws_handle.connect().await.map_err(|e| {
UnindexedClientError::Connectivity(ConnectivityError::Socket(e.to_string()))
})?;
#[allow(clippy::expect_used)] let params = UserDataStreamSubscribeSignatureParams::builder()
.build()
.expect("UserDataStreamSubscribeSignatureParams has no required fields");
match initial_ws
.user_data_stream_subscribe_signature(params)
.await
{
Ok(_) => {}
Err(e) => {
match tokio::time::timeout(Duration::from_secs(5), initial_ws.disconnect()).await {
Ok(Err(de)) => {
warn!(%de, "BinanceSpot failed to disconnect WS after subscribe failure")
}
Err(_) => {
warn!("BinanceSpot WS disconnect timed out (5s) after subscribe failure")
}
Ok(Ok(())) => {}
}
return Err(UnindexedClientError::Internal(e.to_string()));
}
}
let cm_handle = tokio::spawn(connection_manager(
tx,
dedup,
ws_handle,
rest,
rate_limiter,
instruments,
Some(initial_ws),
));
let rx_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
let guarded_stream = AbortOnDropStream::new(rx_stream, cm_handle);
Ok(futures::StreamExt::boxed(guarded_stream))
}
async fn cancel_order(
&self,
request: OrderRequestCancel<ExchangeId, &InstrumentNameExchange>,
) -> Option<UnindexedOrderResponseCancel> {
let instrument = request.key.instrument.clone();
let key = OrderKey {
exchange: request.key.exchange,
instrument: instrument.clone(),
strategy: request.key.strategy.clone(),
cid: request.key.cid.clone(),
};
let ws = match self.get_ws_api().await {
Ok(ws) => ws,
Err(e) => {
return Some(UnindexedOrderResponseCancel {
key,
state: Err(UnindexedOrderError::Connectivity(
ConnectivityError::Socket(format!("{e:#}")),
)),
});
}
};
let mut params_builder =
OrderCancelParams::builder(request.key.instrument.name().to_string());
if let Some(ref order_id) = request.state.id {
if let Ok(id) = order_id.0.parse::<i64>() {
params_builder = params_builder.order_id(id);
} else {
error!(
order_id = %order_id.0,
"BinanceSpot cancel: exchange orderId not parseable as i64, falling back to clientOrderId"
);
params_builder = params_builder.orig_client_order_id(request.key.cid.0.to_string());
}
} else {
params_builder = params_builder.orig_client_order_id(request.key.cid.0.to_string());
}
let params = match params_builder.build() {
Ok(p) => p,
Err(e) => {
error!(%e, "BinanceSpot failed to build cancel order params");
return Some(UnindexedOrderResponseCancel {
key,
state: Err(UnindexedOrderError::Rejected(ApiError::OrderRejected(
e.to_string(),
))),
});
}
};
match ws.order_cancel(params).await {
Ok(response) => match response.data() {
Ok(data) => {
let time_exchange = data
.transact_time
.and_then(|ms| Utc.timestamp_millis_opt(ms).single())
.unwrap_or_else(Utc::now);
let exchange_order_id = match data.order_id {
Some(id) => OrderId(format_smolstr!("{id}")),
None => {
error!("BinanceSpot cancel response missing orderId");
return Some(UnindexedOrderResponseCancel {
key,
state: Err(UnindexedOrderError::Rejected(ApiError::OrderRejected(
"cancel response missing orderId".into(),
))),
});
}
};
let filled_qty = data
.executed_qty
.as_deref()
.and_then(|q| Decimal::from_str(q).ok())
.unwrap_or(Decimal::ZERO);
Some(UnindexedOrderResponseCancel {
key,
state: Ok(Cancelled::new(exchange_order_id, time_exchange, filled_qty)),
})
}
Err(e) => {
Some(UnindexedOrderResponseCancel {
key,
state: Err(UnindexedOrderError::Rejected(ApiError::OrderRejected(
e.to_string(),
))),
})
}
},
Err(e) => {
if is_api_rejection_error(&e) {
let api_err = parse_binance_api_error(e.to_string(), &instrument);
Some(UnindexedOrderResponseCancel {
key,
state: Err(UnindexedOrderError::from(api_err)),
})
} else if is_rate_limit_error(&e) {
self.rate_limiter.on_rate_limited(None);
Some(UnindexedOrderResponseCancel {
key,
state: Err(UnindexedOrderError::from(ApiError::RateLimit)),
})
} else {
self.clear_ws_api().await;
Some(UnindexedOrderResponseCancel {
key,
state: Err(UnindexedOrderError::Connectivity(
ConnectivityError::Socket(format!("{e:#}")),
)),
})
}
}
}
}
async fn open_order(
&self,
request: OrderRequestOpen<ExchangeId, &InstrumentNameExchange>,
) -> Option<Order<ExchangeId, InstrumentNameExchange, UnindexedOrderState>> {
let instrument = request.key.instrument.clone();
let side = request.state.side;
let price = request.state.price;
let quantity = request.state.quantity;
let kind = request.state.kind;
let time_in_force = request.state.time_in_force;
let cid = request.key.cid.clone();
let order_key = OrderKey::new(
ExchangeId::BinanceSpot,
instrument.clone(),
request.key.strategy.clone(),
cid.clone(),
);
let ws = match self.get_ws_api().await {
Ok(ws) => ws,
Err(e) => {
return Some(Order {
key: order_key,
side,
price,
quantity,
kind,
time_in_force,
state: OrderState::inactive(OrderError::Connectivity(
ConnectivityError::Socket(format!("{e:#}")),
)),
});
}
};
let binance_side = match side {
Side::Buy => OrderPlaceSideEnum::Buy,
Side::Sell => OrderPlaceSideEnum::Sell,
};
let (binance_type, binance_tif) = match convert_order_kind_tif(kind, time_in_force) {
Some(converted) => converted,
None => {
return Some(Order {
key: order_key,
side,
price,
quantity,
kind,
time_in_force,
state: OrderState::inactive(OrderError::UnsupportedOrderType(format!(
"Binance Spot does not yet support OrderKind::{kind:?}"
))),
});
}
};
let mut params_builder =
OrderPlaceParams::builder(instrument.name().to_string(), binance_side, binance_type)
.quantity(quantity)
.new_client_order_id(cid.0.to_string());
if matches!(kind, OrderKind::Limit) {
params_builder = params_builder.price(price);
}
if let Some(tif) = binance_tif {
params_builder = params_builder.time_in_force(tif);
}
let params = match params_builder.build() {
Ok(p) => p,
Err(e) => {
error!(%e, "BinanceSpot failed to build new order params");
return Some(Order {
key: order_key,
side,
price,
quantity,
kind,
time_in_force,
state: OrderState::inactive(OrderError::Rejected(ApiError::OrderRejected(
e.to_string(),
))),
});
}
};
match ws.order_place(params).await {
Ok(response) => match response.data() {
Ok(data) => {
let time_exchange = data
.transact_time
.and_then(|ms| Utc.timestamp_millis_opt(ms).single())
.unwrap_or_else(Utc::now);
let exchange_order_id = match data.order_id {
Some(id) => OrderId(format_smolstr!("{id}")),
None => {
error!("BinanceSpot open_order response missing orderId");
return Some(Order {
key: order_key,
side,
price,
quantity,
kind,
time_in_force,
state: OrderState::inactive(OrderError::Rejected(
ApiError::OrderRejected(
"open_order response missing orderId".into(),
),
)),
});
}
};
let filled_qty = data
.executed_qty
.as_deref()
.and_then(|q| Decimal::from_str(q).ok())
.unwrap_or(Decimal::ZERO);
let state = if filled_qty >= quantity {
OrderState::fully_filled(Filled::new(
exchange_order_id,
time_exchange,
filled_qty,
None, ))
} else {
OrderState::active(Open::new(exchange_order_id, time_exchange, filled_qty))
};
Some(Order {
key: order_key,
side,
price,
quantity,
kind,
time_in_force,
state,
})
}
Err(e) => {
Some(Order {
key: order_key,
side,
price,
quantity,
kind,
time_in_force,
state: OrderState::inactive(OrderError::Rejected(ApiError::OrderRejected(
e.to_string(),
))),
})
}
},
Err(e) => {
if is_api_rejection_error(&e) {
let api_err = parse_binance_api_error(e.to_string(), &instrument);
Some(Order {
key: order_key,
side,
price,
quantity,
kind,
time_in_force,
state: OrderState::inactive(OrderError::from(api_err)),
})
} else if is_rate_limit_error(&e) {
self.rate_limiter.on_rate_limited(None);
Some(Order {
key: order_key,
side,
price,
quantity,
kind,
time_in_force,
state: OrderState::inactive(OrderError::from(ApiError::RateLimit)),
})
} else {
self.clear_ws_api().await;
Some(Order {
key: order_key,
side,
price,
quantity,
kind,
time_in_force,
state: OrderState::inactive(OrderError::Connectivity(
ConnectivityError::Socket(format!("{e:#}")),
)),
})
}
}
}
}
async fn fetch_balances(
&self,
assets: &[AssetNameExchange],
) -> Result<Vec<AssetBalance<AssetNameExchange>>, UnindexedClientError> {
let response = rest_call_with_retry(&self.rest, &self.rate_limiter, |rest| {
Box::pin(async move {
let params = GetAccountParams::builder().build()?;
rest.get_account(params).await
})
})
.await
.map_err(connectivity_error)?;
let account = response
.data()
.await
.map_err(|e| connectivity_error(e.into()))?;
Ok(filter_and_convert_balances(
account.balances.unwrap_or_default(),
assets,
))
}
async fn fetch_open_orders(
&self,
instruments: &[InstrumentNameExchange],
) -> Result<Vec<Order<ExchangeId, InstrumentNameExchange, Open>>, UnindexedClientError> {
use futures::{StreamExt as _, TryStreamExt as _};
futures::stream::iter(instruments.iter().cloned().map(|instrument| {
fetch_open_orders_for_instrument(
self.rest.clone(),
self.rate_limiter.clone(),
instrument,
)
}))
.buffer_unordered(8)
.try_fold(Vec::with_capacity(instruments.len()), |mut acc: Vec<Order<ExchangeId, InstrumentNameExchange, Open>>, (_, orders)| async move {
acc.extend(orders);
Ok(acc)
})
.await
}
#[allow(clippy::redundant_iter_cloned)]
async fn fetch_trades(
&self,
time_since: DateTime<Utc>,
instruments: &[InstrumentNameExchange],
) -> Result<Vec<Trade<AssetNameExchange, InstrumentNameExchange>>, UnindexedClientError> {
use futures::StreamExt;
if instruments.is_empty() {
debug!(
"BinanceSpot fetch_trades called with empty instruments slice — returning empty result"
);
return Ok(Vec::new());
}
let start_time_ms = time_since.timestamp_millis();
let mut all_trades = Vec::new();
let mut stream = futures::stream::iter(instruments.iter().cloned().map(|inst| {
let rest = self.rest.clone();
let rate_limiter = self.rate_limiter.clone();
async move {
let pages = paginate_my_trades(&rest, &rate_limiter, &inst, start_time_ms).await?;
Ok::<_, UnindexedClientError>((inst, pages))
}
}))
.buffer_unordered(8);
while let Some(result) = stream.next().await {
let (instrument, trades_data) = result?;
for t in trades_data {
if let Some(trade) = convert_my_trade(&t, &instrument) {
all_trades.push(trade);
}
}
}
Ok(all_trades)
}
}
async fn connect_and_subscribe(ws_handle: &WebsocketApiHandle) -> anyhow::Result<WebsocketApi> {
let ws = ws_handle.connect().await?;
#[allow(clippy::expect_used)] let params = UserDataStreamSubscribeSignatureParams::builder()
.build()
.expect("UserDataStreamSubscribeSignatureParams has no required fields");
match ws.user_data_stream_subscribe_signature(params).await {
Ok(_) => Ok(ws),
Err(e) => {
warn!(%e, "BinanceSpot WS subscribe failed, cleaning up connection");
let ws_cleanup = ws;
tokio::spawn(async move {
match tokio::time::timeout(Duration::from_secs(5), ws_cleanup.disconnect()).await {
Ok(Err(dc_err)) => warn!(%dc_err, "BinanceSpot WS cleanup disconnect failed"),
Err(_) => warn!("BinanceSpot WS cleanup disconnect timed out (5s)"),
Ok(Ok(())) => {}
}
});
Err(e)
}
}
}
#[allow(clippy::cognitive_complexity)]
async fn connection_manager(
tx: mpsc::UnboundedSender<UnindexedAccountEvent>,
dedup: SharedDedupCache,
ws_handle: WebsocketApiHandle,
rest: Arc<RestApi>,
rate_limiter: Arc<RateLimitTracker>,
instruments: Vec<InstrumentNameExchange>,
initial_ws: Option<WebsocketApi>,
) {
let mut backoff = ExponentialBackoff::new();
let mut disconnect_time: Option<DateTime<Utc>> = None;
let mut current_ws = initial_ws;
loop {
let ws = match current_ws.take() {
Some(ws) => ws,
None => match connect_and_subscribe(&ws_handle).await {
Ok(ws) => ws,
Err(e) => {
error!(%e, "BinanceSpot WS connect/subscribe failed");
if !backoff.wait().await {
error!("BinanceSpot max reconnect attempts exhausted");
break;
}
continue;
}
},
};
info!("BinanceSpot account_stream connected and subscribed");
let (signal_tx, signal_rx) = oneshot::channel::<()>();
let mut signal_tx_opt = Some(signal_tx);
let heartbeat_flag = Arc::new(AtomicBool::new(true));
let hb_callback = heartbeat_flag.clone();
let dedup_callback = dedup.clone();
let mut event_tx = Some(tx.clone());
let mut event_buf = Vec::with_capacity(32);
let subscription = ws.subscribe_on_ws_events(move |event| {
let Some(ref sender) = event_tx else { return };
match event {
WebsocketEvent::Message(json_str) => {
hb_callback.store(true, Ordering::Release);
match serde_json::from_str::<UserDataStreamEventsResponse>(&json_str) {
Ok(user_event) => {
let stream_terminated = convert_user_data_events(user_event, &mut event_buf);
for ev in event_buf.drain(..) {
if let Some(key) = dedup_key_from_event(&ev)
&& is_duplicate(&dedup_callback, key)
{
trace!("BinanceSpot dedup: skipping duplicate event");
continue;
}
if sender.send(ev).is_err() {
warn!("BinanceSpot account_stream receiver dropped, suppressing further sends");
event_tx.take();
if let Some(s) = signal_tx_opt.take() {
let _ = s.send(());
}
return;
}
}
if stream_terminated {
event_tx.take();
if let Some(s) = signal_tx_opt.take() {
let _ = s.send(());
}
}
}
Err(e) => {
trace!(
error = %e,
raw = %json_str.get(..200).unwrap_or(json_str.as_str()),
"BinanceSpot WS: skipped non-UserDataStream message"
);
}
}
}
WebsocketEvent::Ping | WebsocketEvent::Pong => {
hb_callback.store(true, Ordering::Release);
}
WebsocketEvent::Error(e) => {
warn!(%e, "BinanceSpot WebSocket error, will attempt reconnect");
event_tx.take();
if let Some(s) = signal_tx_opt.take() {
let _ = s.send(());
}
}
WebsocketEvent::Close(code, reason) => {
warn!(code, %reason, "BinanceSpot WebSocket closed");
event_tx.take();
if let Some(s) = signal_tx_opt.take() {
let _ = s.send(());
}
}
_ => {}
}
});
if let Some(dt) = disconnect_time.take() {
match tokio::time::timeout(
Duration::from_secs(FILL_RECOVERY_TIMEOUT_SECS),
recover_fills(&rest, &rate_limiter, &instruments, dt, &tx, &dedup),
)
.await
{
Ok(()) => {}
Err(_) => {
warn!(
timeout_secs = FILL_RECOVERY_TIMEOUT_SECS,
"BinanceSpot fill recovery timed out — remaining instruments not queried, some fills may be missing"
);
}
}
}
enum DisconnectReason {
Signal,
HeartbeatTimeout,
ConsumerDropped,
}
let reason = {
let mut signal_rx = signal_rx;
loop {
tokio::select! {
_ = tx.closed() => {
debug!("BinanceSpot account_stream consumer dropped, terminating");
break DisconnectReason::ConsumerDropped;
}
_ = &mut signal_rx => {
warn!("BinanceSpot WS disconnected, will attempt reconnect");
break DisconnectReason::Signal;
}
_ = tokio::time::sleep(Duration::from_secs(HEARTBEAT_TIMEOUT_SECS)) => {
if heartbeat_flag.swap(false, Ordering::AcqRel) {
backoff.reset();
continue;
}
warn!("BinanceSpot heartbeat timeout ({}s), will attempt reconnect", HEARTBEAT_TIMEOUT_SECS);
break DisconnectReason::HeartbeatTimeout;
}
}
}
};
let should_reconnect = !matches!(reason, DisconnectReason::ConsumerDropped);
if should_reconnect {
disconnect_time = Some(match reason {
DisconnectReason::HeartbeatTimeout => {
Utc::now()
- chrono::Duration::seconds(HEARTBEAT_TIMEOUT_SECS as i64)
- chrono::Duration::milliseconds(SIGNAL_RECOVERY_LOOKBACK_MS)
}
_ => Utc::now() - chrono::Duration::milliseconds(SIGNAL_RECOVERY_LOOKBACK_MS),
});
}
subscription.unsubscribe();
if let Err(e) = ws.disconnect().await {
warn!(%e, "BinanceSpot failed to disconnect WebSocket");
}
if !should_reconnect || tx.is_closed() {
debug!("BinanceSpot connection manager exiting");
break;
}
if !backoff.wait().await {
error!("BinanceSpot max reconnect attempts exhausted, stream terminating");
break;
}
}
}
#[allow(clippy::redundant_iter_cloned)]
async fn recover_fills(
rest: &Arc<RestApi>,
rate_limiter: &Arc<RateLimitTracker>,
instruments: &[InstrumentNameExchange],
disconnect_time: DateTime<Utc>,
tx: &mpsc::UnboundedSender<UnindexedAccountEvent>,
dedup: &SharedDedupCache,
) {
use futures::StreamExt;
if instruments.is_empty() {
debug!(
"BinanceSpot recover_fills called with empty instruments slice — no fills will be recovered"
);
return;
}
info!(
since = %disconnect_time,
instruments = instruments.len(),
"BinanceSpot recovering fills after reconnect"
);
let start_time_ms = disconnect_time.timestamp_millis();
let mut recovered = 0u32;
let mut duplicates = 0u32;
let mut failed_instruments = 0u32;
let mut stream = futures::stream::iter(instruments.iter().cloned().map(|inst| {
let rest = rest.clone();
let rl = rate_limiter.clone();
async move {
let raw = match paginate_my_trades(&rest, &rl, &inst, start_time_ms).await {
Ok(pages) => pages,
Err(e) => {
warn!(%e, %inst, "BinanceSpot fill recovery: REST request failed");
return None;
}
};
let trades: Vec<_> = raw
.into_iter()
.filter_map(|t| convert_my_trade(&t, &inst))
.collect();
Some(trades)
}
}))
.buffer_unordered(8);
while let Some(result) = stream.next().await {
let trades = match result {
Some(t) => t,
None => {
failed_instruments += 1;
continue;
}
};
for trade in trades {
let event =
UnindexedAccountEvent::new(ExchangeId::BinanceSpot, AccountEventKind::Trade(trade));
if let Some(key) = dedup_key_from_event(&event)
&& is_duplicate(dedup, key)
{
duplicates += 1;
continue;
}
if tx.send(event).is_err() {
debug!("BinanceSpot fill recovery: consumer dropped during recovery");
return;
}
recovered += 1;
}
}
if failed_instruments > 0 {
error!(
recovered,
duplicates,
failed_instruments,
"BinanceSpot fill recovery complete with failures — some fills may be permanently missed"
);
} else {
info!(recovered, duplicates, "BinanceSpot fill recovery complete");
}
}
fn convert_balance_entry(
b: binance_sdk::spot::rest_api::GetAccountResponseBalancesInner,
now: chrono::DateTime<Utc>,
) -> Option<AssetBalance<AssetNameExchange>> {
let asset_name = AssetNameExchange::new(b.asset.as_deref()?);
let free = match b.free.as_deref().and_then(|s| Decimal::from_str(s).ok()) {
Some(v) => v,
None => {
warn!(%asset_name, "BinanceSpot balance missing/unparseable 'free' field");
return None;
}
};
let locked = match b.locked.as_deref().and_then(|s| Decimal::from_str(s).ok()) {
Some(v) => v,
None => {
warn!(%asset_name, "BinanceSpot balance missing/unparseable 'locked' field");
return None;
}
};
Some(AssetBalance::new(
asset_name,
Balance::new(free + locked, free),
now,
))
}
fn filter_and_convert_balances(
balances: Vec<binance_sdk::spot::rest_api::GetAccountResponseBalancesInner>,
assets: &[AssetNameExchange],
) -> Vec<AssetBalance<AssetNameExchange>> {
let now = Utc::now();
if assets.is_empty() {
return balances
.into_iter()
.filter_map(|b| convert_balance_entry(b, now))
.collect();
}
if assets.len() <= 16 {
return balances
.into_iter()
.filter_map(|b| {
let asset_name_str = b.asset.as_deref()?;
if !assets.iter().any(|a| a.name().as_str() == asset_name_str) {
return None;
}
convert_balance_entry(b, now)
})
.collect();
}
use std::collections::HashSet;
let asset_set: HashSet<&str> = assets.iter().map(|a| a.name().as_str()).collect();
balances
.into_iter()
.filter_map(|b| {
let asset_name_str = b.asset.as_deref()?;
if !asset_set.contains(asset_name_str) {
return None;
}
convert_balance_entry(b, now)
})
.collect()
}
fn convert_open_order(
o: &binance_sdk::spot::rest_api::AllOrdersResponseInner,
instrument: &InstrumentNameExchange,
) -> Option<Order<ExchangeId, InstrumentNameExchange, Open>> {
let order_id_raw = match o.order_id {
Some(id) => id,
None => {
warn!(%instrument, "BinanceSpot open order missing orderId");
return None;
}
};
let order_id = OrderId(format_smolstr!("{}", order_id_raw));
if o.client_order_id.is_none() {
warn!(%instrument, order_id = %order_id_raw, "BinanceSpot open order missing clientOrderId, using orderId as fallback — order may not reconcile with engine state");
}
let cid = ClientOrderId::new(
o.client_order_id
.as_deref()
.unwrap_or(&format_smolstr!("{}", order_id_raw)),
);
let side = match o.side.as_deref() {
Some(s) => parse_side(s)?,
None => {
warn!(%instrument, order_id = %order_id_raw, "BinanceSpot open order missing side");
return None;
}
};
let price = o.price.as_deref().and_then(|s| Decimal::from_str(s).ok());
let quantity = match o
.orig_qty
.as_deref()
.and_then(|s| Decimal::from_str(s).ok())
{
Some(v) => v,
None => {
warn!(%instrument, order_id = %order_id_raw, "BinanceSpot open order missing/unparseable origQty");
return None;
}
};
let filled_qty = match o.executed_qty.as_deref() {
Some(s) => match Decimal::from_str(s) {
Ok(v) => v,
Err(_) => {
warn!(%instrument, order_id = %order_id_raw, executed_qty = s, "BinanceSpot open order unparseable executedQty, defaulting to 0");
Decimal::ZERO
}
},
None => Decimal::ZERO,
};
let kind = match o.r#type.as_deref() {
Some(t) => parse_order_kind(t)?,
None => {
warn!(%instrument, order_id = %order_id_raw, "BinanceSpot open order missing type");
return None;
}
};
let time_in_force = parse_time_in_force(o.time_in_force.as_deref().unwrap_or("GTC"));
let time_exchange = match o.time.and_then(|ms| Utc.timestamp_millis_opt(ms).single()) {
Some(ts) => ts,
None => {
warn!(%instrument, order_id = %order_id_raw, "BinanceSpot open order missing/unparseable time, using now");
Utc::now()
}
};
Some(Order {
key: OrderKey::new(
ExchangeId::BinanceSpot,
instrument.clone(),
StrategyId::unknown(),
cid,
),
side,
price,
quantity,
kind,
time_in_force,
state: Open::new(order_id, time_exchange, filled_qty),
})
}
fn convert_my_trade(
t: &binance_sdk::spot::rest_api::MyTradesResponseInner,
instrument: &InstrumentNameExchange,
) -> Option<Trade<AssetNameExchange, InstrumentNameExchange>> {
let trade_id_raw = match t.id {
Some(id) => id,
None => {
warn!(%instrument, "BinanceSpot trade missing id");
return None;
}
};
let trade_id = TradeId(format_smolstr!("{}", trade_id_raw));
let order_id = match t.order_id {
Some(id) => OrderId(format_smolstr!("{}", id)),
None => {
warn!(%instrument, trade_id = %trade_id_raw, "BinanceSpot trade missing orderId");
return None;
}
};
let side = match t.is_buyer {
Some(is_buyer) => {
if is_buyer {
Side::Buy
} else {
Side::Sell
}
}
None => {
warn!(%instrument, trade_id = %trade_id_raw, "BinanceSpot trade missing isBuyer");
return None;
}
};
let price = match t.price.as_deref().and_then(|s| Decimal::from_str(s).ok()) {
Some(v) => v,
None => {
warn!(%instrument, trade_id = %trade_id_raw, "BinanceSpot trade missing/unparseable price");
return None;
}
};
let quantity = match t.qty.as_deref().and_then(|s| Decimal::from_str(s).ok()) {
Some(v) => v,
None => {
warn!(%instrument, trade_id = %trade_id_raw, "BinanceSpot trade missing/unparseable qty");
return None;
}
};
let commission = t
.commission
.as_deref()
.and_then(|s| Decimal::from_str(s).ok())
.unwrap_or(Decimal::ZERO);
let time_exchange = match t.time.and_then(|ms| Utc.timestamp_millis_opt(ms).single()) {
Some(ts) => ts,
None => {
warn!(%instrument, trade_id = %trade_id_raw, "BinanceSpot trade missing/unparseable time, using now");
Utc::now()
}
};
let fee_asset = t
.commission_asset
.as_deref()
.map(AssetNameExchange::from)
.unwrap_or_else(|| AssetNameExchange::from("UNKNOWN"));
Some(Trade::new(
trade_id,
order_id,
instrument.clone(),
StrategyId::unknown(), time_exchange,
side,
price,
quantity,
AssetFees::new(fee_asset, commission, None),
))
}
fn convert_user_data_events(
event: UserDataStreamEventsResponse,
buf: &mut Vec<UnindexedAccountEvent>,
) -> bool {
match event {
UserDataStreamEventsResponse::ExecutionReport(report) => {
if let Some(ev) = convert_execution_report(*report) {
buf.push(ev);
}
false
}
UserDataStreamEventsResponse::OutboundAccountPosition(position) => {
convert_account_position(*position, buf);
false
}
UserDataStreamEventsResponse::BalanceUpdate(_update) => {
debug!("BinanceSpot ignoring BalanceUpdate event");
false
}
UserDataStreamEventsResponse::EventStreamTerminated(_) => {
warn!("BinanceSpot user data stream terminated by exchange, signalling reconnect");
true
}
_ => {
trace!("BinanceSpot ignoring unhandled user data event");
false
}
}
}
#[allow(clippy::cognitive_complexity)]
fn convert_execution_report(
report: binance_sdk::spot::websocket_api::ExecutionReport,
) -> Option<UnindexedAccountEvent> {
let exec_type = match report.x.as_deref() {
Some(t) => t,
None => {
warn!("BinanceSpot executionReport missing execution type (x), dropping");
return None;
}
};
let symbol = match report.s.as_deref() {
Some(s) => InstrumentNameExchange::new(s),
None => {
warn!("BinanceSpot executionReport missing symbol (s), dropping");
return None;
}
};
let order_id = match report.i {
Some(id) => OrderId(format_smolstr!("{id}")),
None => {
warn!(%symbol, "BinanceSpot executionReport missing orderId (i), dropping");
return None;
}
};
let cid = match report.c.as_deref() {
Some(c) => ClientOrderId::new(c),
None => ClientOrderId::new(order_id.0.as_str()),
};
let time_exchange = match report
.t_uppercase
.and_then(|ms| Utc.timestamp_millis_opt(ms).single())
{
Some(t) => t,
None => {
warn!(%symbol, "BinanceSpot executionReport missing/unparseable transaction time (T), using now");
Utc::now()
}
};
match exec_type {
"NEW" => convert_new_order(&report, symbol, cid, order_id, time_exchange),
"TRADE" => {
let trade_id = match report.t {
Some(id) => TradeId(format_smolstr!("{id}")),
None => {
warn!(%symbol, "BinanceSpot TRADE event missing trade ID (t), dropping");
return None;
}
};
let side = match report.s_uppercase.as_deref().and_then(parse_side) {
Some(s) => s,
None => {
warn!(%symbol, "BinanceSpot TRADE event missing/unknown side (S), dropping");
return None;
}
};
let last_price = match report.l_uppercase.as_deref() {
Some(s) => match Decimal::from_str(s) {
Ok(v) => v,
Err(e) => {
warn!(%symbol, error = %e, raw = s, "BinanceSpot TRADE event unparseable last price (L), dropping fill");
return None;
}
},
None => {
warn!(%symbol, "BinanceSpot TRADE event missing last price (L), dropping fill");
return None;
}
};
let last_qty = match report.l.as_deref() {
Some(s) => match Decimal::from_str(s) {
Ok(v) => v,
Err(e) => {
warn!(%symbol, error = %e, raw = s, "BinanceSpot TRADE event unparseable last qty (l), dropping fill");
return None;
}
},
None => {
warn!(%symbol, "BinanceSpot TRADE event missing last qty (l), dropping fill");
return None;
}
};
let commission = match Decimal::from_str(report.n.as_deref().unwrap_or("0")) {
Ok(v) => v,
Err(e) => {
warn!(%symbol, error = %e, "BinanceSpot TRADE event unparseable commission (n), defaulting to 0");
Decimal::ZERO
}
};
let fee_asset = report
.n_uppercase
.as_deref()
.map(AssetNameExchange::from)
.unwrap_or_else(|| AssetNameExchange::from("UNKNOWN"));
let trade = Trade::new(
trade_id,
order_id,
symbol,
StrategyId::unknown(), time_exchange,
side,
last_price,
last_qty,
AssetFees::new(fee_asset, commission, None),
);
Some(UnindexedAccountEvent::new(
ExchangeId::BinanceSpot,
AccountEventKind::Trade(trade),
))
}
"CANCELED" | "EXPIRED" | "EXPIRED_IN_MATCH" => {
let filled_qty = report
.z
.as_deref()
.and_then(|s| Decimal::from_str(s).ok())
.unwrap_or(Decimal::ZERO);
let cancelled = Cancelled::new(order_id, time_exchange, filled_qty);
let response = UnindexedOrderResponseCancel {
key: OrderKey::new(
ExchangeId::BinanceSpot,
symbol,
StrategyId::unknown(), cid,
),
state: Ok(cancelled),
};
Some(UnindexedAccountEvent::new(
ExchangeId::BinanceSpot,
AccountEventKind::OrderCancelled(response),
))
}
"REJECTED" => {
let reject_reason = report.r.unwrap_or_else(|| "unknown".to_string());
warn!(
%symbol, %order_id, reason = %reject_reason,
"BinanceSpot order REJECTED by matching engine"
);
let response = UnindexedOrderResponseCancel {
key: OrderKey::new(ExchangeId::BinanceSpot, symbol, StrategyId::unknown(), cid),
state: Err(UnindexedOrderError::Rejected(ApiError::OrderRejected(
reject_reason,
))),
};
Some(UnindexedAccountEvent::new(
ExchangeId::BinanceSpot,
AccountEventKind::OrderCancelled(response),
))
}
"REPLACE" => {
let filled_qty = report
.z
.as_deref()
.and_then(|s| Decimal::from_str(s).ok())
.unwrap_or(Decimal::ZERO);
let cancelled = Cancelled::new(order_id, time_exchange, filled_qty);
let response = UnindexedOrderResponseCancel {
key: OrderKey::new(ExchangeId::BinanceSpot, symbol, StrategyId::unknown(), cid),
state: Ok(cancelled),
};
Some(UnindexedAccountEvent::new(
ExchangeId::BinanceSpot,
AccountEventKind::OrderCancelled(response),
))
}
_ => {
trace!(exec_type, "BinanceSpot ignoring execution type");
None
}
}
}
fn convert_new_order(
report: &binance_sdk::spot::websocket_api::ExecutionReport,
symbol: InstrumentNameExchange,
cid: ClientOrderId,
order_id: OrderId,
time_exchange: DateTime<Utc>,
) -> Option<UnindexedAccountEvent> {
let side = match report.s_uppercase.as_deref().and_then(parse_side) {
Some(s) => s,
None => {
warn!(%symbol, "BinanceSpot NEW event missing/unknown side (S), dropping");
return None;
}
};
let kind = parse_order_kind(report.o.as_deref().unwrap_or("LIMIT"))?;
let price: Option<Decimal> = match (report.p.as_deref(), &kind) {
(Some(p), _) => match Decimal::from_str(p) {
Ok(v) if !v.is_zero() => Some(v),
Ok(_) => {
if matches!(
kind,
OrderKind::Limit
| OrderKind::StopLimit { .. }
| OrderKind::TrailingStopLimit { .. }
) {
trace!(%symbol, %kind, "BinanceSpot NEW event has zero price (p) on limit-type order, treating as no limit price");
}
None
}
Err(e) => {
warn!(%symbol, price = p, error = %e, "BinanceSpot NEW event unparseable price (p), dropping");
return None;
}
},
(None, OrderKind::Market | OrderKind::Stop { .. } | OrderKind::TrailingStop { .. }) => {
None
}
(
None,
OrderKind::Limit | OrderKind::StopLimit { .. } | OrderKind::TrailingStopLimit { .. },
) => {
warn!(%symbol, "BinanceSpot NEW limit-type order missing price (p), dropping");
return None;
}
};
let quantity = match report.q.as_deref() {
Some(q) => match Decimal::from_str(q) {
Ok(v) => v,
Err(e) => {
warn!(%symbol, qty = q, error = %e, "BinanceSpot NEW event unparseable quantity (q), dropping");
return None;
}
},
None => {
warn!(%symbol, "BinanceSpot NEW order missing quantity (q), dropping");
return None;
}
};
let time_in_force = parse_time_in_force(report.f.as_deref().unwrap_or("GTC"));
let filled_qty = report
.z
.as_deref()
.and_then(|s| Decimal::from_str(s).ok())
.unwrap_or(Decimal::ZERO);
let order = Order {
key: OrderKey::new(
ExchangeId::BinanceSpot,
symbol,
StrategyId::unknown(), cid,
),
side,
price,
quantity,
kind,
time_in_force,
state: OrderState::active(Open::new(order_id, time_exchange, filled_qty)),
};
Some(UnindexedAccountEvent::new(
ExchangeId::BinanceSpot,
AccountEventKind::OrderSnapshot(rustrade_integration::collection::snapshot::Snapshot::new(
order,
)),
))
}
fn convert_account_position(
position: binance_sdk::spot::websocket_api::OutboundAccountPosition,
buf: &mut Vec<UnindexedAccountEvent>,
) {
let time_exchange = position
.u
.and_then(|ms| Utc.timestamp_millis_opt(ms).single())
.unwrap_or_else(Utc::now);
for b in position.b_uppercase.unwrap_or_default() {
let asset = match b.a {
Some(a) => AssetNameExchange::new(a),
None => {
warn!("BinanceSpot account position entry missing asset name");
continue;
}
};
let free = match b.f.as_deref().and_then(|s| Decimal::from_str(s).ok()) {
Some(v) => v,
None => {
warn!(%asset, "BinanceSpot account position missing/unparseable 'free' field");
continue;
}
};
let locked = match b.l.as_deref().and_then(|s| Decimal::from_str(s).ok()) {
Some(v) => v,
None => {
warn!(%asset, "BinanceSpot account position missing/unparseable 'locked' field");
continue;
}
};
let balance = AssetBalance::new(asset, Balance::new(free + locked, free), time_exchange);
buf.push(UnindexedAccountEvent::new(
ExchangeId::BinanceSpot,
AccountEventKind::BalanceSnapshot(
rustrade_integration::collection::snapshot::Snapshot::new(balance),
),
));
}
}
fn parse_side(s: &str) -> Option<Side> {
match s {
"BUY" => Some(Side::Buy),
"SELL" => Some(Side::Sell),
_ => {
warn!(side = s, "unknown Binance order side");
None
}
}
}
fn parse_order_kind(t: &str) -> Option<OrderKind> {
match t {
"MARKET" => Some(OrderKind::Market),
"STOP_LOSS" | "TAKE_PROFIT" => {
warn!(
order_type = t,
"dropping conditional Binance order type (no OrderKind equivalent)"
);
None
}
"LIMIT" | "LIMIT_MAKER" | "STOP_LOSS_LIMIT" | "TAKE_PROFIT_LIMIT" => Some(OrderKind::Limit),
_ => {
warn!(order_type = t, "unsupported Binance order type");
None
}
}
}
fn parse_time_in_force(tif: &str) -> TimeInForce {
match tif {
"GTC" => TimeInForce::GoodUntilCancelled { post_only: false },
"GTX" => TimeInForce::GoodUntilCancelled { post_only: true },
"IOC" => TimeInForce::ImmediateOrCancel,
"FOK" => TimeInForce::FillOrKill,
"GTD" => TimeInForce::GoodUntilEndOfDay,
_ => {
warn!(
time_in_force = tif,
"unknown Binance TimeInForce, defaulting to GTC"
);
TimeInForce::GoodUntilCancelled { post_only: false }
}
}
}
fn convert_order_kind_tif(
kind: OrderKind,
tif: TimeInForce,
) -> Option<(OrderPlaceTypeEnum, Option<OrderPlaceTimeInForceEnum>)> {
match kind {
OrderKind::Market => Some((OrderPlaceTypeEnum::Market, None)),
OrderKind::Limit => match tif {
TimeInForce::GoodUntilCancelled { post_only: false } => Some((
OrderPlaceTypeEnum::Limit,
Some(OrderPlaceTimeInForceEnum::Gtc),
)),
TimeInForce::GoodUntilCancelled { post_only: true } => {
Some((OrderPlaceTypeEnum::LimitMaker, None))
}
TimeInForce::FillOrKill => Some((
OrderPlaceTypeEnum::Limit,
Some(OrderPlaceTimeInForceEnum::Fok),
)),
TimeInForce::ImmediateOrCancel => Some((
OrderPlaceTypeEnum::Limit,
Some(OrderPlaceTimeInForceEnum::Ioc),
)),
TimeInForce::GoodUntilEndOfDay => {
warn!("Binance Spot does not support GTD; coercing to GTC");
Some((
OrderPlaceTypeEnum::Limit,
Some(OrderPlaceTimeInForceEnum::Gtc),
))
}
TimeInForce::GoodTillDate { .. } | TimeInForce::AtOpen | TimeInForce::AtClose => {
warn!(
time_in_force = ?tif,
"Binance Spot does not support this TimeInForce"
);
None
}
},
OrderKind::Stop { .. }
| OrderKind::StopLimit { .. }
| OrderKind::TrailingStop { .. }
| OrderKind::TrailingStopLimit { .. } => {
warn!(
"Binance connector does not yet support OrderKind::{:?}",
kind
);
None
}
}
}
fn contains_ignore_case(haystack: &str, needle: &str) -> bool {
haystack
.as_bytes()
.windows(needle.len())
.any(|w| w.eq_ignore_ascii_case(needle.as_bytes()))
}
fn contains_error_code(msg: &str, code: &str) -> bool {
let code_len = code.len();
let mut start = 0;
while let Some(rel) = msg[start..].find(code) {
let pos = start + rel;
let prefix_ok = pos == 0 || !msg[..pos].ends_with(|c: char| c.is_ascii_digit());
let suffix_ok = !msg[pos + code_len..].starts_with(|c: char| c.is_ascii_digit());
if prefix_ok && suffix_ok {
return true;
}
start = pos + 1;
}
false
}
fn parse_binance_api_error(
error_msg: String,
instrument: &InstrumentNameExchange,
) -> ApiError<AssetNameExchange, InstrumentNameExchange> {
if contains_error_code(&error_msg, "-1002") || contains_error_code(&error_msg, "-2015") {
return ApiError::Unauthenticated(error_msg);
}
if contains_error_code(&error_msg, "-1003") || contains_error_code(&error_msg, "-1015") {
return ApiError::RateLimit;
}
if contains_error_code(&error_msg, "-2011") {
return ApiError::OrderAlreadyCancelled;
}
if contains_error_code(&error_msg, "-2013") {
return ApiError::OrderAlreadyCancelled;
}
if contains_error_code(&error_msg, "-1121") {
return ApiError::InstrumentInvalid(instrument.clone(), error_msg);
}
if contains_error_code(&error_msg, "-2010") {
return ApiError::BalanceInsufficient(
AssetNameExchange::new(instrument.name().as_str()),
error_msg,
);
}
if contains_ignore_case(&error_msg, "insufficient")
|| contains_ignore_case(&error_msg, "not enough")
{
ApiError::BalanceInsufficient(
AssetNameExchange::new(instrument.name().as_str()),
error_msg,
)
} else if contains_ignore_case(&error_msg, "rate limit") {
ApiError::RateLimit
} else if contains_ignore_case(&error_msg, "unknown order") {
ApiError::OrderRejected(error_msg)
} else if contains_ignore_case(&error_msg, "invalid symbol") {
ApiError::InstrumentInvalid(instrument.clone(), error_msg)
} else {
ApiError::OrderRejected(error_msg)
}
}
fn connectivity_error(e: anyhow::Error) -> UnindexedClientError {
let msg = format!("{e:#}");
if contains_error_code(&msg, "-1002")
|| contains_error_code(&msg, "-2015")
|| contains_ignore_case(&msg, "invalid api-key")
|| contains_ignore_case(&msg, "invalid signature")
|| contains_ignore_case(&msg, "signature for this request is not valid")
{
return UnindexedClientError::Api(ApiError::Unauthenticated(msg));
}
UnindexedClientError::Connectivity(ConnectivityError::Socket(msg))
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)] mod tests {
use super::*;
use crate::order::TrailingOffsetType;
#[test]
fn test_parse_side() {
assert_eq!(parse_side("BUY"), Some(Side::Buy));
assert_eq!(parse_side("SELL"), Some(Side::Sell));
assert_eq!(parse_side("buy"), None);
assert_eq!(parse_side("UNKNOWN"), None);
}
#[test]
fn test_parse_order_kind() {
assert_eq!(parse_order_kind("MARKET"), Some(OrderKind::Market));
assert_eq!(parse_order_kind("LIMIT"), Some(OrderKind::Limit));
assert_eq!(parse_order_kind("LIMIT_MAKER"), Some(OrderKind::Limit));
assert_eq!(parse_order_kind("STOP_LOSS"), None);
assert_eq!(parse_order_kind("TAKE_PROFIT"), None);
assert_eq!(parse_order_kind("STOP_LOSS_LIMIT"), Some(OrderKind::Limit));
assert_eq!(
parse_order_kind("TAKE_PROFIT_LIMIT"),
Some(OrderKind::Limit)
);
assert_eq!(parse_order_kind("UNKNOWN_TYPE"), None);
}
#[test]
fn test_parse_time_in_force() {
assert_eq!(
parse_time_in_force("GTC"),
TimeInForce::GoodUntilCancelled { post_only: false }
);
assert_eq!(
parse_time_in_force("GTX"),
TimeInForce::GoodUntilCancelled { post_only: true }
);
assert_eq!(parse_time_in_force("IOC"), TimeInForce::ImmediateOrCancel);
assert_eq!(parse_time_in_force("FOK"), TimeInForce::FillOrKill);
assert_eq!(parse_time_in_force("GTD"), TimeInForce::GoodUntilEndOfDay);
assert_eq!(
parse_time_in_force("UNKNOWN"),
TimeInForce::GoodUntilCancelled { post_only: false }
);
}
#[test]
fn test_convert_order_kind_tif() {
use rust_decimal::Decimal;
assert!(matches!(
convert_order_kind_tif(OrderKind::Market, TimeInForce::ImmediateOrCancel),
Some((OrderPlaceTypeEnum::Market, None))
));
assert!(matches!(
convert_order_kind_tif(
OrderKind::Limit,
TimeInForce::GoodUntilCancelled { post_only: false }
),
Some((
OrderPlaceTypeEnum::Limit,
Some(OrderPlaceTimeInForceEnum::Gtc)
))
));
assert!(matches!(
convert_order_kind_tif(
OrderKind::Limit,
TimeInForce::GoodUntilCancelled { post_only: true }
),
Some((OrderPlaceTypeEnum::LimitMaker, None))
));
assert!(matches!(
convert_order_kind_tif(OrderKind::Limit, TimeInForce::FillOrKill),
Some((
OrderPlaceTypeEnum::Limit,
Some(OrderPlaceTimeInForceEnum::Fok)
))
));
assert!(matches!(
convert_order_kind_tif(OrderKind::Limit, TimeInForce::ImmediateOrCancel),
Some((
OrderPlaceTypeEnum::Limit,
Some(OrderPlaceTimeInForceEnum::Ioc)
))
));
assert!(matches!(
convert_order_kind_tif(OrderKind::Limit, TimeInForce::GoodUntilEndOfDay),
Some((
OrderPlaceTypeEnum::Limit,
Some(OrderPlaceTimeInForceEnum::Gtc)
))
));
assert!(
convert_order_kind_tif(
OrderKind::Stop {
trigger_price: Decimal::from(100)
},
TimeInForce::GoodUntilCancelled { post_only: false }
)
.is_none()
);
assert!(
convert_order_kind_tif(
OrderKind::TrailingStop {
offset: Decimal::from(5),
offset_type: TrailingOffsetType::Percentage,
},
TimeInForce::GoodUntilCancelled { post_only: false }
)
.is_none()
);
}
#[test]
fn test_parse_binance_api_error() {
let instrument = InstrumentNameExchange::new("BTCUSDT");
assert!(matches!(
parse_binance_api_error("Insufficient balance".into(), &instrument),
ApiError::BalanceInsufficient(_, _)
));
assert!(matches!(
parse_binance_api_error("Not enough funds".into(), &instrument),
ApiError::BalanceInsufficient(_, _)
));
assert_eq!(
parse_binance_api_error("Rate limit exceeded".into(), &instrument),
ApiError::RateLimit
);
assert_eq!(
parse_binance_api_error("Error code -1015".into(), &instrument),
ApiError::RateLimit
);
assert_eq!(
parse_binance_api_error("Error -1003: too many requests".into(), &instrument),
ApiError::RateLimit
);
assert_eq!(
parse_binance_api_error("Error code -2011".into(), &instrument),
ApiError::OrderAlreadyCancelled
);
assert!(matches!(
parse_binance_api_error("Unknown order sent -2013".into(), &instrument),
ApiError::OrderAlreadyCancelled
));
assert!(matches!(
parse_binance_api_error("Order does not exist -2013".into(), &instrument),
ApiError::OrderAlreadyCancelled
));
assert!(matches!(
parse_binance_api_error("Unknown order encountered".into(), &instrument),
ApiError::OrderRejected(_)
));
assert!(matches!(
parse_binance_api_error("Invalid symbol -1121".into(), &instrument),
ApiError::InstrumentInvalid(_, _)
));
assert!(matches!(
parse_binance_api_error(
"Server-side response error (code -2010): Account has insufficient balance".into(),
&instrument
),
ApiError::BalanceInsufficient(_, _)
));
assert!(matches!(
parse_binance_api_error("Some other error".into(), &instrument),
ApiError::OrderRejected(_)
));
}
#[test]
fn test_contains_error_code_suffix_guard() {
assert!(
!contains_error_code("-20130", "-2013"),
"-20130 should not match -2013"
);
assert!(
!contains_error_code("-20131", "-2013"),
"-20131 should not match -2013"
);
assert!(contains_error_code("-2013", "-2013"), "exact match");
assert!(
contains_error_code("Error -2013: text", "-2013"),
"match with trailing text"
);
}
#[test]
fn test_contains_error_code_prefix_guard() {
assert!(
!contains_error_code("1-2013", "-2013"),
"1-2013 should not match -2013"
);
assert!(
!contains_error_code("error 1-2013 text", "-2013"),
"embedded 1-2013 should not match"
);
assert!(
contains_error_code("code=-2013,", "-2013"),
"=-2013 prefix should match"
);
assert!(
contains_error_code(" -2013 ", "-2013"),
"space prefix should match"
);
}
#[test]
fn test_contains_error_code_second_occurrence_valid() {
assert!(
contains_error_code("response code -20130 or -2013: rate limit", "-2013"),
"second valid occurrence must match when first fails digit guard"
);
assert!(
contains_error_code("-2013 or -20130", "-2013"),
"first valid occurrence must match when second is a longer code"
);
}
#[test]
fn test_dedup_cache() {
let cache = new_dedup_cache();
let key = DedupKey {
instrument: SmolStr::from("BTCUSDT"),
id: SmolStr::from("12345"),
kind: DedupEventKind::Trade,
};
assert!(!is_duplicate(&cache, key));
let key = DedupKey {
instrument: SmolStr::from("BTCUSDT"),
id: SmolStr::from("12345"),
kind: DedupEventKind::Trade,
};
assert!(is_duplicate(&cache, key));
let key2 = DedupKey {
instrument: SmolStr::from("BTCUSDT"),
id: SmolStr::from("12345"),
kind: DedupEventKind::New,
};
assert!(!is_duplicate(&cache, key2));
}
#[test]
fn test_is_rate_limit_error() {
assert!(is_rate_limit_error(&anyhow::anyhow!(
"Too many requests. You are being rate-limited. Please slow down."
)));
assert!(is_rate_limit_error(&anyhow::anyhow!(
"The IP address has been banned for exceeding rate limits. Contact support."
)));
assert!(is_rate_limit_error(&anyhow::anyhow!(
"Error -1015: too many new orders"
)));
assert!(is_rate_limit_error(&anyhow::anyhow!(
"Error -1003: too many requests"
)));
assert!(!is_rate_limit_error(&anyhow::anyhow!("order 4290 failed")));
assert!(!is_rate_limit_error(&anyhow::anyhow!("connection timeout")));
assert!(!is_rate_limit_error(&anyhow::anyhow!("unknown error")));
assert!(
!is_rate_limit_error(&anyhow::anyhow!("Error -10150: some other error")),
"-10150 should not match -1015"
);
assert!(
!is_rate_limit_error(&anyhow::anyhow!("Error -10030: some other error")),
"-10030 should not match -1003"
);
}
#[test]
fn test_is_api_rejection_error() {
let rejection = anyhow::anyhow!(WebsocketError::ResponseError {
code: -2010,
message: "Account has insufficient balance for requested action.".into(),
});
assert!(
is_api_rejection_error(&rejection),
"ResponseError should be detected as API rejection"
);
let transport = anyhow::anyhow!("connection reset by peer");
assert!(
!is_api_rejection_error(&transport),
"plain transport error should not be detected as API rejection"
);
let rate_limit = anyhow::anyhow!("Too many requests. You are being rate-limited.");
assert!(
!is_api_rejection_error(&rate_limit),
"rate-limit string error should not be detected as API rejection"
);
}
#[test]
fn test_connectivity_error_detects_auth_failures() {
let err = connectivity_error(anyhow::anyhow!("Error -1002: unauthorized"));
assert!(
matches!(err, UnindexedClientError::Api(ApiError::Unauthenticated(_))),
"expected Unauthenticated for -1002, got {err:?}"
);
let err = connectivity_error(anyhow::anyhow!("Error -2015: permission denied for action"));
assert!(
matches!(err, UnindexedClientError::Api(ApiError::Unauthenticated(_))),
"expected Unauthenticated for -2015, got {err:?}"
);
let err = connectivity_error(anyhow::anyhow!("invalid signature provided"));
assert!(
matches!(err, UnindexedClientError::Api(ApiError::Unauthenticated(_))),
"expected Unauthenticated for 'invalid signature', got {err:?}"
);
let err = connectivity_error(anyhow::anyhow!(
"The signature for this request is not valid."
));
assert!(
matches!(err, UnindexedClientError::Api(ApiError::Unauthenticated(_))),
"expected Unauthenticated for 'signature for this request is not valid', got {err:?}"
);
let err = connectivity_error(anyhow::anyhow!("Invalid API-key format"));
assert!(
matches!(err, UnindexedClientError::Api(ApiError::Unauthenticated(_))),
"expected Unauthenticated for 'invalid api-key', got {err:?}"
);
let err = connectivity_error(anyhow::anyhow!("connection timeout"));
assert!(
matches!(err, UnindexedClientError::Connectivity(_)),
"expected Connectivity for timeout, got {err:?}"
);
}
#[test]
fn test_dedup_key_from_event_trade_includes_instrument() {
use crate::order::id::{OrderId, StrategyId};
use crate::trade::{AssetFees, Trade, TradeId};
use chrono::Utc;
use rust_decimal::Decimal;
use rustrade_instrument::Side;
let instrument = InstrumentNameExchange::new("BTCUSDT");
let trade = Trade::<AssetNameExchange, InstrumentNameExchange>::new(
TradeId::new("9001"),
OrderId::new("4242"),
instrument.clone(),
StrategyId::unknown(),
Utc::now(),
Side::Buy,
Decimal::ZERO,
Decimal::ZERO,
AssetFees::new(
AssetNameExchange::from("USDT"),
Decimal::ZERO,
Some(Decimal::ZERO),
),
);
let event =
UnindexedAccountEvent::new(ExchangeId::BinanceSpot, AccountEventKind::Trade(trade));
let key = dedup_key_from_event(&event).expect("Trade should produce a DedupKey");
assert_eq!(key.kind, DedupEventKind::Trade);
assert_eq!(key.id.as_str(), "9001", "key.id should be the trade ID");
assert_eq!(
key.instrument.as_str(),
"BTCUSDT",
"key.instrument should be the symbol"
);
let instrument2 = InstrumentNameExchange::new("ETHUSDT");
let trade2 = Trade::<AssetNameExchange, InstrumentNameExchange>::new(
TradeId::new("9001"),
OrderId::new("7777"),
instrument2,
StrategyId::unknown(),
Utc::now(),
Side::Buy,
Decimal::ZERO,
Decimal::ZERO,
AssetFees::new(
AssetNameExchange::from("USDT"),
Decimal::ZERO,
Some(Decimal::ZERO),
),
);
let event2 =
UnindexedAccountEvent::new(ExchangeId::BinanceSpot, AccountEventKind::Trade(trade2));
let key2 = dedup_key_from_event(&event2).expect("Trade should produce a DedupKey");
assert_ne!(
key, key2,
"same trade ID on different symbols must produce distinct keys"
);
}
fn make_balance_inner(
asset: &str,
free: &str,
locked: &str,
) -> binance_sdk::spot::websocket_api::OutboundAccountPositionBInner {
binance_sdk::spot::websocket_api::OutboundAccountPositionBInner {
a: Some(asset.to_string()),
f: Some(free.to_string()),
l: Some(locked.to_string()),
}
}
#[test]
fn test_convert_account_position_happy_path() {
let position = binance_sdk::spot::websocket_api::OutboundAccountPosition {
u: Some(1_700_000_000_000),
b_uppercase: Some(vec![make_balance_inner("BTC", "1.5", "0.5")]),
..Default::default()
};
let mut buf = Vec::new();
convert_account_position(position, &mut buf);
assert_eq!(buf.len(), 1);
match &buf[0].kind {
AccountEventKind::BalanceSnapshot(snap) => {
let balance = &snap.0;
assert_eq!(balance.asset.as_ref(), "BTC");
let expected_total = Decimal::from_str("2.0").unwrap();
let expected_free = Decimal::from_str("1.5").unwrap();
assert_eq!(balance.balance.total, expected_total);
assert_eq!(balance.balance.free, expected_free);
}
other => panic!("expected BalanceSnapshot, got {:?}", other),
}
}
#[test]
fn test_convert_account_position_u_field_none_uses_now() {
let position = binance_sdk::spot::websocket_api::OutboundAccountPosition {
u: None,
b_uppercase: Some(vec![make_balance_inner("ETH", "2.0", "0.0")]),
..Default::default()
};
let mut buf = Vec::new();
convert_account_position(position, &mut buf);
assert_eq!(buf.len(), 1);
}
#[test]
fn test_convert_account_position_missing_asset_name_skipped() {
let position = binance_sdk::spot::websocket_api::OutboundAccountPosition {
u: Some(1_700_000_000_000),
b_uppercase: Some(vec![
binance_sdk::spot::websocket_api::OutboundAccountPositionBInner {
a: None, f: Some("1.0".to_string()),
l: Some("0.0".to_string()),
},
make_balance_inner("USDT", "100.0", "0.0"), ]),
..Default::default()
};
let mut buf = Vec::new();
convert_account_position(position, &mut buf);
assert_eq!(buf.len(), 1);
match &buf[0].kind {
AccountEventKind::BalanceSnapshot(snap) => {
assert_eq!(snap.0.asset.as_ref(), "USDT");
}
other => panic!("expected BalanceSnapshot, got {:?}", other),
}
}
#[test]
fn test_convert_account_position_unparseable_free_skipped() {
let position = binance_sdk::spot::websocket_api::OutboundAccountPosition {
u: Some(1_700_000_000_000),
b_uppercase: Some(vec![
binance_sdk::spot::websocket_api::OutboundAccountPositionBInner {
a: Some("BTC".to_string()),
f: Some("not-a-number".to_string()),
l: Some("0.0".to_string()),
},
make_balance_inner("ETH", "1.0", "0.0"), ]),
..Default::default()
};
let mut buf = Vec::new();
convert_account_position(position, &mut buf);
assert_eq!(buf.len(), 1);
match &buf[0].kind {
AccountEventKind::BalanceSnapshot(snap) => {
assert_eq!(snap.0.asset.as_ref(), "ETH");
}
other => panic!("expected BalanceSnapshot, got {:?}", other),
}
}
#[test]
fn test_convert_account_position_empty_balances() {
let position = binance_sdk::spot::websocket_api::OutboundAccountPosition {
u: Some(1_700_000_000_000),
b_uppercase: Some(vec![]),
..Default::default()
};
let mut buf = Vec::new();
convert_account_position(position, &mut buf);
assert!(
buf.is_empty(),
"empty balance list should produce no events"
);
}
#[test]
fn test_convert_account_position_b_field_none() {
let position = binance_sdk::spot::websocket_api::OutboundAccountPosition {
u: Some(1_700_000_000_000),
b_uppercase: None, ..Default::default()
};
let mut buf = Vec::new();
convert_account_position(position, &mut buf);
assert!(buf.is_empty(), "None B field should produce no events");
}
#[tokio::test]
async fn test_exponential_backoff_exhaustion() {
tokio::time::pause(); let mut backoff = ExponentialBackoff::new();
for i in 0..MAX_RECONNECT_ATTEMPTS {
assert!(
backoff.wait().await,
"expected true on attempt {i} (before exhaustion)"
);
}
assert!(!backoff.wait().await, "expected false after max attempts");
}
#[tokio::test]
async fn test_exponential_backoff_reset() {
tokio::time::pause();
let mut backoff = ExponentialBackoff::new();
for _ in 0..MAX_RECONNECT_ATTEMPTS {
backoff.wait().await;
}
assert!(!backoff.wait().await, "should be exhausted");
backoff.reset();
assert!(backoff.wait().await, "should succeed again after reset");
}
fn make_base_report() -> binance_sdk::spot::websocket_api::ExecutionReport {
binance_sdk::spot::websocket_api::ExecutionReport {
s: Some("BTCUSDT".to_string()),
i: Some(12345),
c: Some("client-1".to_string()),
t_uppercase: Some(1_700_000_000_000),
..Default::default()
}
}
#[test]
fn test_convert_execution_report_new() {
let report = binance_sdk::spot::websocket_api::ExecutionReport {
x: Some("NEW".to_string()),
s_uppercase: Some("BUY".to_string()),
o: Some("LIMIT".to_string()),
p: Some("50000.00".to_string()),
q: Some("0.01".to_string()),
f: Some("GTC".to_string()),
z: Some("0".to_string()),
..make_base_report()
};
let event = convert_execution_report(report).expect("NEW event should produce Some");
assert_eq!(event.exchange, ExchangeId::BinanceSpot);
match &event.kind {
AccountEventKind::OrderSnapshot(snap) => {
let order = &snap.0;
assert_eq!(order.side, Side::Buy);
assert_eq!(order.kind, OrderKind::Limit);
assert_eq!(order.price, Some(Decimal::from_str("50000.00").unwrap()));
assert_eq!(order.quantity, Decimal::from_str("0.01").unwrap());
assert_eq!(order.key.cid.0.as_str(), "client-1");
assert_eq!(order.key.instrument.name().as_str(), "BTCUSDT");
}
other => panic!("NEW should yield OrderSnapshot, got {other:?}"),
}
}
#[test]
fn test_convert_execution_report_trade() {
let report = binance_sdk::spot::websocket_api::ExecutionReport {
x: Some("TRADE".to_string()),
s_uppercase: Some("BUY".to_string()),
t: Some(9999),
l_uppercase: Some("50000.00".to_string()),
l: Some("0.01".to_string()),
n: Some("0.000001".to_string()),
..make_base_report()
};
let event = convert_execution_report(report).expect("TRADE event should produce Some");
assert_eq!(event.exchange, ExchangeId::BinanceSpot);
match &event.kind {
AccountEventKind::Trade(trade) => {
assert_eq!(trade.side, Side::Buy);
assert_eq!(trade.price, Decimal::from_str("50000.00").unwrap());
assert_eq!(trade.quantity, Decimal::from_str("0.01").unwrap());
assert_eq!(trade.id.0.as_str(), "9999");
assert_eq!(trade.order_id.0.as_str(), "12345");
assert_eq!(trade.instrument.name().as_str(), "BTCUSDT");
}
other => panic!("TRADE should yield Trade, got {other:?}"),
}
}
#[test]
fn test_convert_execution_report_trade_missing_last_price() {
let report = binance_sdk::spot::websocket_api::ExecutionReport {
x: Some("TRADE".to_string()),
s_uppercase: Some("BUY".to_string()),
t: Some(9999),
l_uppercase: None, l: Some("0.01".to_string()),
..make_base_report()
};
assert!(
convert_execution_report(report).is_none(),
"missing last price (L) should return None"
);
}
#[test]
fn test_convert_execution_report_trade_missing_last_qty() {
let report = binance_sdk::spot::websocket_api::ExecutionReport {
x: Some("TRADE".to_string()),
s_uppercase: Some("BUY".to_string()),
t: Some(9999),
l_uppercase: Some("50000.00".to_string()),
l: None, ..make_base_report()
};
assert!(
convert_execution_report(report).is_none(),
"missing last qty (l) should return None"
);
}
#[test]
fn test_convert_execution_report_canceled() {
let report = binance_sdk::spot::websocket_api::ExecutionReport {
x: Some("CANCELED".to_string()),
..make_base_report()
};
let event = convert_execution_report(report).expect("CANCELED should produce Some");
assert!(
matches!(event.kind, AccountEventKind::OrderCancelled(ref r) if r.state.is_ok()),
"CANCELED should yield OrderCancelled with Ok state"
);
}
#[test]
fn test_convert_execution_report_expired() {
let report = binance_sdk::spot::websocket_api::ExecutionReport {
x: Some("EXPIRED".to_string()),
..make_base_report()
};
let event = convert_execution_report(report).expect("EXPIRED should produce Some");
assert!(
matches!(event.kind, AccountEventKind::OrderCancelled(ref r) if r.state.is_ok()),
"EXPIRED should yield OrderCancelled with Ok state"
);
}
#[test]
fn test_convert_execution_report_expired_in_match() {
let report = binance_sdk::spot::websocket_api::ExecutionReport {
x: Some("EXPIRED_IN_MATCH".to_string()),
..make_base_report()
};
let event = convert_execution_report(report).expect("EXPIRED_IN_MATCH should produce Some");
assert!(
matches!(event.kind, AccountEventKind::OrderCancelled(ref r) if r.state.is_ok()),
"EXPIRED_IN_MATCH should yield OrderCancelled with Ok state"
);
}
#[test]
fn test_convert_execution_report_rejected() {
let report = binance_sdk::spot::websocket_api::ExecutionReport {
x: Some("REJECTED".to_string()),
r: Some("INSUFFICIENT_FUNDS".to_string()),
..make_base_report()
};
let event = convert_execution_report(report).expect("REJECTED should produce Some");
assert!(
matches!(event.kind, AccountEventKind::OrderCancelled(ref r) if r.state.is_err()),
"REJECTED should yield OrderCancelled with Err state"
);
}
#[test]
fn test_convert_execution_report_missing_exec_type() {
let report = binance_sdk::spot::websocket_api::ExecutionReport {
s: Some("BTCUSDT".to_string()),
..Default::default()
};
assert!(
convert_execution_report(report).is_none(),
"missing execution type should return None"
);
}
#[test]
fn test_convert_execution_report_missing_symbol() {
let report = binance_sdk::spot::websocket_api::ExecutionReport {
x: Some("NEW".to_string()),
..Default::default()
};
assert!(
convert_execution_report(report).is_none(),
"missing symbol should return None"
);
}
#[test]
fn test_convert_execution_report_missing_order_id() {
let report = binance_sdk::spot::websocket_api::ExecutionReport {
x: Some("NEW".to_string()),
s: Some("BTCUSDT".to_string()),
i: None,
..Default::default()
};
assert!(
convert_execution_report(report).is_none(),
"missing orderId should return None"
);
}
#[test]
fn test_convert_execution_report_trade_missing_trade_id() {
let report = binance_sdk::spot::websocket_api::ExecutionReport {
x: Some("TRADE".to_string()),
t: None,
..make_base_report()
};
assert!(
convert_execution_report(report).is_none(),
"TRADE event missing tradeId should return None"
);
}
#[test]
fn test_convert_execution_report_replace_yields_cancelled() {
let report = binance_sdk::spot::websocket_api::ExecutionReport {
x: Some("REPLACE".to_string()),
..make_base_report()
};
let event = convert_execution_report(report).expect("REPLACE should produce Some");
assert!(
matches!(event.kind, AccountEventKind::OrderCancelled(ref r) if r.state.is_ok()),
"REPLACE should yield OrderCancelled with Ok state"
);
}
fn make_balance(
asset: &str,
free: &str,
locked: &str,
) -> binance_sdk::spot::rest_api::GetAccountResponseBalancesInner {
binance_sdk::spot::rest_api::GetAccountResponseBalancesInner {
asset: Some(asset.to_string()),
free: Some(free.to_string()),
locked: Some(locked.to_string()),
}
}
#[test]
fn test_filter_balances_empty_assets_returns_all() {
let balances = vec![
make_balance("BTC", "1.0", "0.0"),
make_balance("USDT", "500.0", "50.0"),
];
let result = filter_and_convert_balances(balances, &[]);
assert_eq!(result.len(), 2, "empty filter should return all balances");
}
#[test]
fn test_filter_balances_matching_asset_returned() {
let balances = vec![
make_balance("BTC", "1.5", "0.5"),
make_balance("ETH", "10.0", "0.0"),
];
let assets = vec![AssetNameExchange::new("BTC")];
let result = filter_and_convert_balances(balances, &assets);
assert_eq!(result.len(), 1);
assert_eq!(result[0].asset, AssetNameExchange::new("BTC"));
assert_eq!(result[0].balance.total, Decimal::from_str("2.0").unwrap());
assert_eq!(result[0].balance.free, Decimal::from_str("1.5").unwrap());
}
#[test]
fn test_filter_balances_non_matching_asset_filtered_out() {
let balances = vec![
make_balance("BTC", "1.0", "0.0"),
make_balance("ETH", "2.0", "0.0"),
];
let assets = vec![AssetNameExchange::new("USDT")];
let result = filter_and_convert_balances(balances, &assets);
assert!(
result.is_empty(),
"non-matching asset should be filtered out"
);
}
#[test]
fn test_filter_balances_missing_asset_field_skipped() {
let balances = vec![
binance_sdk::spot::rest_api::GetAccountResponseBalancesInner {
asset: None,
free: Some("1.0".to_string()),
locked: Some("0.0".to_string()),
},
make_balance("USDT", "100.0", "0.0"),
];
let result = filter_and_convert_balances(balances, &[]);
assert_eq!(
result.len(),
1,
"entry with missing asset should be skipped"
);
assert_eq!(result[0].asset, AssetNameExchange::new("USDT"));
}
#[test]
fn test_filter_balances_unparseable_free_skipped() {
let balances = vec![
binance_sdk::spot::rest_api::GetAccountResponseBalancesInner {
asset: Some("BTC".to_string()),
free: Some("not-a-number".to_string()),
locked: Some("0.0".to_string()),
},
];
let result = filter_and_convert_balances(balances, &[]);
assert!(
result.is_empty(),
"unparseable free field should be skipped"
);
}
#[test]
fn test_filter_balances_zero_balance_included() {
let balances = vec![
make_balance("BTC", "0.00000000", "0.00000000"),
make_balance("USDT", "100.0", "0.0"),
];
let result = filter_and_convert_balances(balances, &[]);
assert_eq!(result.len(), 2, "zero-balance entries must be included");
let btc = result
.iter()
.find(|b| b.asset == AssetNameExchange::new("BTC"))
.unwrap();
assert_eq!(btc.balance.total, Decimal::ZERO);
assert_eq!(btc.balance.free, Decimal::ZERO);
}
#[test]
fn test_filter_balances_duplicate_assets_in_response() {
let balances = vec![
make_balance("BTC", "1.0", "0.0"),
make_balance("BTC", "2.0", "0.0"),
];
let result = filter_and_convert_balances(balances, &[]);
assert_eq!(
result.len(),
2,
"duplicate asset entries produce two AssetBalance entries"
);
}
fn make_base_trade() -> binance_sdk::spot::rest_api::MyTradesResponseInner {
binance_sdk::spot::rest_api::MyTradesResponseInner {
id: Some(9001),
order_id: Some(4242),
price: Some("50000.00".to_string()),
qty: Some("0.01".to_string()),
commission: Some("0.05".to_string()),
time: Some(1_700_000_000_000),
is_buyer: Some(true),
..Default::default()
}
}
#[test]
fn test_convert_my_trade_happy_path() {
let instrument = InstrumentNameExchange::new("BTCUSDT");
let trade =
convert_my_trade(&make_base_trade(), &instrument).expect("valid trade should convert");
assert_eq!(trade.instrument, instrument);
assert_eq!(trade.side, Side::Buy);
assert_eq!(trade.price, Decimal::from_str("50000.00").unwrap());
assert_eq!(trade.quantity, Decimal::from_str("0.01").unwrap());
}
#[test]
fn test_convert_my_trade_sell_side() {
let instrument = InstrumentNameExchange::new("BTCUSDT");
let t = binance_sdk::spot::rest_api::MyTradesResponseInner {
is_buyer: Some(false),
..make_base_trade()
};
let trade = convert_my_trade(&t, &instrument).expect("sell-side trade should convert");
assert_eq!(trade.side, Side::Sell);
}
#[test]
fn test_convert_my_trade_missing_id_returns_none() {
let instrument = InstrumentNameExchange::new("BTCUSDT");
let t = binance_sdk::spot::rest_api::MyTradesResponseInner {
id: None,
..make_base_trade()
};
assert!(
convert_my_trade(&t, &instrument).is_none(),
"missing id should return None"
);
}
#[test]
fn test_convert_my_trade_missing_order_id_returns_none() {
let instrument = InstrumentNameExchange::new("BTCUSDT");
let t = binance_sdk::spot::rest_api::MyTradesResponseInner {
order_id: None,
..make_base_trade()
};
assert!(
convert_my_trade(&t, &instrument).is_none(),
"missing orderId should return None"
);
}
#[test]
fn test_convert_my_trade_missing_is_buyer_returns_none() {
let instrument = InstrumentNameExchange::new("BTCUSDT");
let t = binance_sdk::spot::rest_api::MyTradesResponseInner {
is_buyer: None,
..make_base_trade()
};
assert!(
convert_my_trade(&t, &instrument).is_none(),
"missing isBuyer should return None"
);
}
#[test]
fn test_convert_my_trade_commission_none_defaults_to_zero() {
let instrument = InstrumentNameExchange::new("BTCUSDT");
let t = binance_sdk::spot::rest_api::MyTradesResponseInner {
commission: None,
..make_base_trade()
};
let trade =
convert_my_trade(&t, &instrument).expect("None commission should still convert");
assert_eq!(trade.fees.fees, Decimal::ZERO);
}
fn make_base_open_order() -> binance_sdk::spot::rest_api::AllOrdersResponseInner {
binance_sdk::spot::rest_api::AllOrdersResponseInner {
order_id: Some(12345),
client_order_id: Some("cid-abc".to_string()),
side: Some("BUY".to_string()),
r#type: Some("LIMIT".to_string()),
price: Some("50000.00".to_string()),
orig_qty: Some("0.01".to_string()),
executed_qty: Some("0.0".to_string()),
time_in_force: Some("GTC".to_string()),
time: Some(1_700_000_000_000),
..Default::default()
}
}
#[test]
fn test_convert_open_order_happy_path() {
let instrument = InstrumentNameExchange::new("BTCUSDT");
let order = convert_open_order(&make_base_open_order(), &instrument)
.expect("valid order should convert");
assert_eq!(order.key.instrument, instrument);
assert_eq!(order.side, Side::Buy);
assert_eq!(order.kind, OrderKind::Limit);
assert_eq!(order.price, Some(Decimal::from_str("50000.00").unwrap()));
assert_eq!(order.quantity, Decimal::from_str("0.01").unwrap());
assert_eq!(order.state.filled_quantity, Decimal::ZERO);
}
#[test]
fn test_convert_open_order_missing_order_id_returns_none() {
let instrument = InstrumentNameExchange::new("BTCUSDT");
let o = binance_sdk::spot::rest_api::AllOrdersResponseInner {
order_id: None,
..make_base_open_order()
};
assert!(
convert_open_order(&o, &instrument).is_none(),
"missing orderId should return None"
);
}
#[test]
fn test_convert_open_order_missing_side_returns_none() {
let instrument = InstrumentNameExchange::new("BTCUSDT");
let o = binance_sdk::spot::rest_api::AllOrdersResponseInner {
side: None,
..make_base_open_order()
};
assert!(
convert_open_order(&o, &instrument).is_none(),
"missing side should return None"
);
}
#[test]
fn test_convert_open_order_missing_type_returns_none() {
let instrument = InstrumentNameExchange::new("BTCUSDT");
let o = binance_sdk::spot::rest_api::AllOrdersResponseInner {
r#type: None,
..make_base_open_order()
};
assert!(
convert_open_order(&o, &instrument).is_none(),
"missing type should return None"
);
}
#[test]
fn test_convert_open_order_executed_qty_none_defaults_to_zero() {
let instrument = InstrumentNameExchange::new("BTCUSDT");
let o = binance_sdk::spot::rest_api::AllOrdersResponseInner {
executed_qty: None,
..make_base_open_order()
};
let order =
convert_open_order(&o, &instrument).expect("None executedQty should still convert");
assert_eq!(
order.state.filled_quantity,
Decimal::ZERO,
"None executedQty should default to zero"
);
}
#[test]
fn test_convert_open_order_executed_qty_unparseable_defaults_to_zero() {
let instrument = InstrumentNameExchange::new("BTCUSDT");
let o = binance_sdk::spot::rest_api::AllOrdersResponseInner {
executed_qty: Some("bad-value".to_string()),
..make_base_open_order()
};
let order = convert_open_order(&o, &instrument)
.expect("unparseable executedQty should still convert");
assert_eq!(
order.state.filled_quantity,
Decimal::ZERO,
"unparseable executedQty should default to zero"
);
}
#[test]
fn test_dedup_key_from_event_non_open_states_return_none() {
use crate::order::state::{
ActiveOrderState, CancelInFlight, InactiveOrderState, OpenInFlight,
};
use rustrade_integration::collection::snapshot::Snapshot;
let key = OrderKey::new(
ExchangeId::BinanceSpot,
InstrumentNameExchange::new("BTCUSDT"),
StrategyId::unknown(),
ClientOrderId::new("cid1"),
);
let event = UnindexedAccountEvent::new(
ExchangeId::BinanceSpot,
AccountEventKind::OrderSnapshot(Snapshot(Order::new(
key.clone(),
Side::Buy,
None, Decimal::ZERO,
OrderKind::Market,
TimeInForce::ImmediateOrCancel,
OrderState::<AssetNameExchange, InstrumentNameExchange>::Active(
ActiveOrderState::OpenInFlight(OpenInFlight),
),
))),
);
assert!(
dedup_key_from_event(&event).is_none(),
"OpenInFlight should return None — dedup not meaningful before exchange ack"
);
let event = UnindexedAccountEvent::new(
ExchangeId::BinanceSpot,
AccountEventKind::OrderSnapshot(Snapshot(Order::new(
key.clone(),
Side::Buy,
None, Decimal::ZERO,
OrderKind::Market,
TimeInForce::ImmediateOrCancel,
OrderState::<AssetNameExchange, InstrumentNameExchange>::Active(
ActiveOrderState::CancelInFlight(CancelInFlight { order: None }),
),
))),
);
assert!(
dedup_key_from_event(&event).is_none(),
"CancelInFlight should return None"
);
let event = UnindexedAccountEvent::new(
ExchangeId::BinanceSpot,
AccountEventKind::OrderSnapshot(Snapshot(Order::new(
key,
Side::Buy,
None, Decimal::ONE,
OrderKind::Market,
TimeInForce::ImmediateOrCancel,
OrderState::<AssetNameExchange, InstrumentNameExchange>::Inactive(
InactiveOrderState::FullyFilled(crate::order::state::Filled::new(
OrderId::new("123"),
Utc::now(),
Decimal::ONE, None,
)),
),
))),
);
assert!(
dedup_key_from_event(&event).is_none(),
"Inactive state should return None"
);
}
#[test]
fn test_dedup_key_from_event_cancelled_error_returns_none() {
let report = binance_sdk::spot::websocket_api::ExecutionReport {
x: Some("REJECTED".to_string()),
r: Some("INSUFFICIENT_FUNDS".to_string()),
..make_base_report()
};
let event = convert_execution_report(report).expect("REJECTED report should produce Some");
assert!(
matches!(&event.kind, AccountEventKind::OrderCancelled(r) if r.state.is_err()),
"prerequisite: event is OrderCancelled with Err"
);
assert!(
dedup_key_from_event(&event).is_none(),
"OrderCancelled with Err state should return None"
);
}
#[test]
fn test_convert_user_data_events_execution_report_pushes_to_buf() {
let report = binance_sdk::spot::websocket_api::ExecutionReport {
x: Some("NEW".to_string()),
s_uppercase: Some("BUY".to_string()),
o: Some("LIMIT".to_string()),
p: Some("50000.00".to_string()),
q: Some("0.01".to_string()),
f: Some("GTC".to_string()),
z: Some("0".to_string()),
..make_base_report()
};
let mut buf = Vec::new();
let terminated = convert_user_data_events(
UserDataStreamEventsResponse::ExecutionReport(Box::new(report)),
&mut buf,
);
assert!(
!terminated,
"ExecutionReport should not signal stream termination"
);
assert_eq!(buf.len(), 1, "ExecutionReport should push one event");
assert!(matches!(buf[0].kind, AccountEventKind::OrderSnapshot(_)));
}
#[test]
fn test_convert_user_data_events_account_position_pushes_to_buf() {
let position = binance_sdk::spot::websocket_api::OutboundAccountPosition {
u: Some(1_700_000_000_000),
b_uppercase: Some(vec![make_balance_inner("BTC", "1.0", "0.0")]),
..Default::default()
};
let mut buf = Vec::new();
let terminated = convert_user_data_events(
UserDataStreamEventsResponse::OutboundAccountPosition(Box::new(position)),
&mut buf,
);
assert!(
!terminated,
"OutboundAccountPosition should not signal stream termination"
);
assert_eq!(
buf.len(),
1,
"OutboundAccountPosition should push one balance event"
);
}
#[test]
fn test_convert_user_data_events_balance_update_ignored() {
let update = binance_sdk::spot::websocket_api::BalanceUpdate {
..Default::default()
};
let mut buf = Vec::new();
let terminated = convert_user_data_events(
UserDataStreamEventsResponse::BalanceUpdate(Box::new(update)),
&mut buf,
);
assert!(
!terminated,
"BalanceUpdate should not signal stream termination"
);
assert!(buf.is_empty(), "BalanceUpdate should push no events");
}
#[test]
fn test_convert_user_data_events_stream_terminated_signals_reconnect() {
let mut buf = Vec::new();
let terminated = convert_user_data_events(
UserDataStreamEventsResponse::EventStreamTerminated(Default::default()),
&mut buf,
);
assert!(
terminated,
"EventStreamTerminated must signal stream termination"
);
assert!(
buf.is_empty(),
"EventStreamTerminated should push no events"
);
}
#[tokio::test]
async fn test_rate_limit_tracker_not_blocked_initially() {
tokio::time::pause();
let tracker = RateLimitTracker::new();
tokio::time::timeout(
std::time::Duration::from_millis(1),
tracker.wait_if_blocked(),
)
.await
.expect("wait_if_blocked should return immediately when not blocked");
}
#[tokio::test]
async fn test_rate_limit_tracker_blocks_until_deadline() {
tokio::time::pause();
let tracker = RateLimitTracker::new();
let delay = Duration::from_secs(5);
tracker.on_rate_limited(Some(delay));
assert!(
tokio::time::timeout(Duration::from_millis(1), tracker.wait_if_blocked())
.await
.is_err(),
"wait_if_blocked should block while cooldown is active"
);
tokio::time::advance(delay + Duration::from_millis(1)).await;
tokio::time::timeout(Duration::from_millis(1), tracker.wait_if_blocked())
.await
.expect("wait_if_blocked should return after cooldown expires");
}
#[tokio::test]
async fn test_rate_limit_tracker_cooldown_extends_to_max() {
tokio::time::pause();
let tracker = RateLimitTracker::new();
tracker.on_rate_limited(Some(Duration::from_secs(5)));
tracker.on_rate_limited(Some(Duration::from_secs(10)));
tokio::time::advance(Duration::from_secs(6)).await;
assert!(
tokio::time::timeout(Duration::from_millis(1), tracker.wait_if_blocked())
.await
.is_err(),
"cooldown should have been extended to 10s"
);
tokio::time::advance(Duration::from_secs(5)).await;
tokio::time::timeout(Duration::from_millis(1), tracker.wait_if_blocked())
.await
.expect("wait_if_blocked should return after extended cooldown expires");
}
#[tokio::test]
async fn test_rate_limit_tracker_shorter_cooldown_does_not_shorten() {
tokio::time::pause();
let tracker = RateLimitTracker::new();
tracker.on_rate_limited(Some(Duration::from_secs(10)));
tracker.on_rate_limited(Some(Duration::from_secs(2)));
tokio::time::advance(Duration::from_secs(3)).await;
assert!(
tokio::time::timeout(Duration::from_millis(1), tracker.wait_if_blocked())
.await
.is_err(),
"shorter on_rate_limited must not shorten existing cooldown"
);
}
#[test]
fn test_parse_binance_api_error_balance_insufficient_holds_instrument_name() {
let instrument = InstrumentNameExchange::new("BTCUSDT");
match parse_binance_api_error("Insufficient balance".into(), &instrument) {
ApiError::BalanceInsufficient(asset_field, _) => {
assert_eq!(
asset_field.name().as_str(),
"BTCUSDT",
"BalanceInsufficient.0 holds the instrument name, not an asset name"
);
}
other => panic!("expected BalanceInsufficient, got {other:?}"),
}
}
#[test]
fn test_is_api_rejection_error_with_wrapped_error_chain() {
let raw = anyhow::anyhow!(WebsocketError::ResponseError {
code: -2010,
message: "insufficient balance".into(),
});
assert!(
is_api_rejection_error(&raw),
"unwrapped ResponseError at root must be detected"
);
let wrapped = raw.context("outer context (e.g. SDK adds context layer)");
assert!(
is_api_rejection_error(&wrapped),
"context-wrapped ResponseError must still be detected — anyhow::downcast_ref searches the full chain"
);
}
}