use std::{
fmt::Debug,
num::NonZeroU32,
sync::{
Arc, LazyLock, Mutex,
atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
},
};
use arc_swap::ArcSwap;
use nautilus_common::live::get_runtime;
use nautilus_core::string::REDACTED;
use nautilus_network::{
mode::ConnectionMode,
ratelimiter::quota::Quota,
websocket::{PingHandler, WebSocketClient, WebSocketConfig, channel_message_handler},
};
use tokio_util::sync::CancellationToken;
use ustr::Ustr;
use super::{
error::{BinanceWsApiError, BinanceWsApiResult},
handler::BinanceSpotWsTradingHandler,
messages::{BinanceSpotWsTradingCommand, BinanceSpotWsTradingMessage},
};
use crate::{
common::{
consts::{BINANCE_API_KEY_HEADER, BINANCE_SPOT_SBE_WS_API_URL},
credential::SigningCredential,
},
spot::http::query::{CancelOrderParams, CancelReplaceOrderParams, NewOrderParams},
};
pub const BINANCE_API_KEY: &str = "BINANCE_API_KEY";
pub const BINANCE_API_SECRET: &str = "BINANCE_API_SECRET";
pub static BINANCE_WS_RATE_LIMIT_KEY_ORDER: LazyLock<[Ustr; 1]> =
LazyLock::new(|| [Ustr::from("order")]);
#[allow(clippy::missing_panics_doc)]
#[must_use]
pub fn binance_ws_order_quota() -> Quota {
Quota::per_second(NonZeroU32::new(20).expect("non-zero")).expect("valid constant")
}
#[derive(Clone)]
pub struct BinanceSpotWsTradingClient {
url: String,
credential: Arc<SigningCredential>,
heartbeat: Option<u64>,
signal: Arc<AtomicBool>,
connection_mode: Arc<ArcSwap<AtomicU8>>,
cmd_tx:
Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<BinanceSpotWsTradingCommand>>>,
out_rx: Arc<Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<BinanceSpotWsTradingMessage>>>>,
task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
request_id_counter: Arc<AtomicU64>,
cancellation_token: CancellationToken,
}
impl Debug for BinanceSpotWsTradingClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(BinanceSpotWsTradingClient))
.field("url", &self.url)
.field("credential", &REDACTED)
.field("heartbeat", &self.heartbeat)
.finish_non_exhaustive()
}
}
impl BinanceSpotWsTradingClient {
#[must_use]
pub fn new(
url: Option<String>,
api_key: String,
api_secret: String,
heartbeat: Option<u64>,
) -> Self {
let url = url.unwrap_or_else(|| BINANCE_SPOT_SBE_WS_API_URL.to_string());
let credential = Arc::new(SigningCredential::new(api_key, api_secret));
let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel();
Self {
url,
credential,
heartbeat,
signal: Arc::new(AtomicBool::new(false)),
connection_mode: Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
ConnectionMode::Closed as u8,
)))),
cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
out_rx: Arc::new(Mutex::new(None)),
task_handle: None,
request_id_counter: Arc::new(AtomicU64::new(1)),
cancellation_token: CancellationToken::new(),
}
}
pub fn with_env(
url: Option<String>,
api_key: Option<String>,
api_secret: Option<String>,
heartbeat: Option<u64>,
) -> anyhow::Result<Self> {
let api_key = nautilus_core::env::get_or_env_var(api_key, BINANCE_API_KEY)?;
let api_secret = nautilus_core::env::get_or_env_var(api_secret, BINANCE_API_SECRET)?;
Ok(Self::new(url, api_key, api_secret, heartbeat))
}
pub fn from_env(url: Option<String>, heartbeat: Option<u64>) -> anyhow::Result<Self> {
Self::with_env(url, None, None, heartbeat)
}
#[must_use]
pub fn is_active(&self) -> bool {
let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
mode_u8 == ConnectionMode::Active as u8
}
#[must_use]
pub fn is_closed(&self) -> bool {
let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
mode_u8 == ConnectionMode::Closed as u8
}
pub fn next_request_id(&self) -> String {
let id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
format!("req-{id}")
}
#[allow(clippy::missing_panics_doc)]
pub async fn connect(&mut self) -> BinanceWsApiResult<()> {
self.signal.store(false, Ordering::Relaxed);
self.cancellation_token = CancellationToken::new();
let (raw_handler, raw_rx) = channel_message_handler();
let ping_handler: PingHandler = Arc::new(move |_| {});
let headers = vec![(
BINANCE_API_KEY_HEADER.to_string(),
self.credential.api_key().to_string(),
)];
let config = WebSocketConfig {
url: self.url.clone(),
headers,
heartbeat: self.heartbeat,
heartbeat_msg: None,
reconnect_timeout_ms: Some(5_000),
reconnect_delay_initial_ms: Some(500),
reconnect_delay_max_ms: Some(5_000),
reconnect_backoff_factor: Some(2.0),
reconnect_jitter_ms: Some(250),
reconnect_max_attempts: None,
idle_timeout_ms: None,
};
let keyed_quotas = vec![(
BINANCE_WS_RATE_LIMIT_KEY_ORDER[0].as_str().to_string(),
binance_ws_order_quota(),
)];
let client = WebSocketClient::connect(
config,
Some(raw_handler),
Some(ping_handler),
None,
keyed_quotas,
Some(binance_ws_order_quota()), )
.await
.map_err(|e| BinanceWsApiError::ConnectionError(e.to_string()))?;
self.connection_mode.store(client.connection_mode_atomic());
let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
{
let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
*rx_guard = Some(out_rx);
}
{
let mut tx_guard = self.cmd_tx.write().await;
*tx_guard = cmd_tx;
}
let signal = self.signal.clone();
let credential = self.credential.clone();
let mut handler =
BinanceSpotWsTradingHandler::new(signal, cmd_rx, raw_rx, out_tx, credential);
self.cmd_tx
.read()
.await
.send(BinanceSpotWsTradingCommand::SetClient(client))
.map_err(|e| BinanceWsApiError::HandlerUnavailable(e.to_string()))?;
let cancellation_token = self.cancellation_token.clone();
let handle = get_runtime().spawn(async move {
tokio::select! {
() = cancellation_token.cancelled() => {
log::debug!("Handler task cancelled");
}
_ = handler.run() => {
log::debug!("Handler run completed");
}
}
});
self.task_handle = Some(Arc::new(handle));
Ok(())
}
pub async fn disconnect(&mut self) {
self.signal.store(true, Ordering::Relaxed);
if let Err(e) = self
.cmd_tx
.read()
.await
.send(BinanceSpotWsTradingCommand::Disconnect)
{
log::warn!("Failed to send disconnect command: {e}");
}
self.cancellation_token.cancel();
if let Some(handle) = self.task_handle.take()
&& let Ok(handle) = Arc::try_unwrap(handle)
{
let _ = handle.await;
}
}
pub async fn place_order(&self, params: NewOrderParams) -> BinanceWsApiResult<String> {
let id = self.next_request_id();
self.place_order_with_id(id.clone(), params).await?;
Ok(id)
}
pub async fn place_order_with_id(
&self,
id: String,
params: NewOrderParams,
) -> BinanceWsApiResult<()> {
let cmd = BinanceSpotWsTradingCommand::PlaceOrder { id, params };
self.send_cmd(cmd).await
}
pub async fn cancel_order(&self, params: CancelOrderParams) -> BinanceWsApiResult<String> {
let id = self.next_request_id();
self.cancel_order_with_id(id.clone(), params).await?;
Ok(id)
}
pub async fn cancel_order_with_id(
&self,
id: String,
params: CancelOrderParams,
) -> BinanceWsApiResult<()> {
let cmd = BinanceSpotWsTradingCommand::CancelOrder { id, params };
self.send_cmd(cmd).await
}
pub async fn cancel_replace_order(
&self,
params: CancelReplaceOrderParams,
) -> BinanceWsApiResult<String> {
let id = self.next_request_id();
self.cancel_replace_order_with_id(id.clone(), params)
.await?;
Ok(id)
}
pub async fn cancel_replace_order_with_id(
&self,
id: String,
params: CancelReplaceOrderParams,
) -> BinanceWsApiResult<()> {
let cmd = BinanceSpotWsTradingCommand::CancelReplaceOrder { id, params };
self.send_cmd(cmd).await
}
pub async fn cancel_all_orders(&self, symbol: impl Into<String>) -> BinanceWsApiResult<String> {
let id = self.next_request_id();
let cmd = BinanceSpotWsTradingCommand::CancelAllOrders {
id: id.clone(),
symbol: symbol.into(),
};
self.send_cmd(cmd).await?;
Ok(id)
}
pub async fn recv(&self) -> Option<BinanceSpotWsTradingMessage> {
let rx_opt = {
let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
rx_guard.take()
};
if let Some(mut rx) = rx_opt {
let result = rx.recv().await;
let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
*rx_guard = Some(rx);
result
} else {
None
}
}
pub async fn session_logon(&self) -> BinanceWsApiResult<()> {
self.send_cmd(BinanceSpotWsTradingCommand::SessionLogon)
.await
}
pub async fn subscribe_user_data(&self) -> BinanceWsApiResult<()> {
self.send_cmd(BinanceSpotWsTradingCommand::SubscribeUserData)
.await
}
async fn send_cmd(&self, cmd: BinanceSpotWsTradingCommand) -> BinanceWsApiResult<()> {
self.cmd_tx
.read()
.await
.send(cmd)
.map_err(|e| BinanceWsApiError::HandlerUnavailable(e.to_string()))
}
}