use std::{
fmt::Debug,
num::NonZeroU32,
sync::{
Arc, LazyLock, Mutex,
atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
},
};
use arc_swap::ArcSwap;
use nautilus_common::live::get_runtime;
use nautilus_core::string::REDACTED;
use nautilus_network::{
mode::ConnectionMode,
ratelimiter::quota::Quota,
websocket::{PingHandler, WebSocketClient, WebSocketConfig, channel_message_handler},
};
use tokio_util::sync::CancellationToken;
use ustr::Ustr;
use super::{
error::{BinanceFuturesWsApiError, BinanceFuturesWsApiResult},
handler::BinanceFuturesWsTradingHandler,
messages::{BinanceFuturesWsTradingCommand, BinanceFuturesWsTradingMessage},
};
use crate::{
common::{
consts::{BINANCE_API_KEY_HEADER, BINANCE_FUTURES_USD_WS_API_URL},
credential::SigningCredential,
},
futures::http::query::{
BinanceCancelOrderParams, BinanceModifyOrderParams, BinanceNewOrderParams,
},
};
pub static BINANCE_FUTURES_WS_RATE_LIMIT_KEY_ORDER: LazyLock<[Ustr; 1]> =
LazyLock::new(|| [Ustr::from("futures_order")]);
#[allow(clippy::missing_panics_doc)]
#[must_use]
pub fn binance_futures_ws_order_quota() -> Quota {
Quota::per_second(NonZeroU32::new(20).expect("non-zero")).expect("valid constant")
}
#[derive(Clone)]
pub struct BinanceFuturesWsTradingClient {
url: String,
credential: Arc<SigningCredential>,
heartbeat: Option<u64>,
signal: Arc<AtomicBool>,
connection_mode: Arc<ArcSwap<AtomicU8>>,
cmd_tx: Arc<
tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<BinanceFuturesWsTradingCommand>>,
>,
out_rx:
Arc<Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<BinanceFuturesWsTradingMessage>>>>,
task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
request_id_counter: Arc<AtomicU64>,
cancellation_token: CancellationToken,
}
impl Debug for BinanceFuturesWsTradingClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(BinanceFuturesWsTradingClient))
.field("url", &self.url)
.field("credential", &REDACTED)
.field("heartbeat", &self.heartbeat)
.finish_non_exhaustive()
}
}
impl BinanceFuturesWsTradingClient {
#[must_use]
pub fn new(
url: Option<String>,
api_key: String,
api_secret: String,
heartbeat: Option<u64>,
) -> Self {
let url = url.unwrap_or_else(|| BINANCE_FUTURES_USD_WS_API_URL.to_string());
let credential = Arc::new(SigningCredential::new(api_key, api_secret));
let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel();
Self {
url,
credential,
heartbeat,
signal: Arc::new(AtomicBool::new(false)),
connection_mode: Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
ConnectionMode::Closed as u8,
)))),
cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
out_rx: Arc::new(Mutex::new(None)),
task_handle: None,
request_id_counter: Arc::new(AtomicU64::new(1)),
cancellation_token: CancellationToken::new(),
}
}
#[must_use]
pub fn is_active(&self) -> bool {
let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
mode_u8 == ConnectionMode::Active as u8
}
#[must_use]
pub fn is_closed(&self) -> bool {
let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
mode_u8 == ConnectionMode::Closed as u8
}
pub fn next_request_id(&self) -> String {
let id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
format!("req-{id}")
}
#[allow(clippy::missing_panics_doc)]
pub async fn connect(&mut self) -> BinanceFuturesWsApiResult<()> {
self.signal.store(false, Ordering::Relaxed);
self.cancellation_token = CancellationToken::new();
let (raw_handler, raw_rx) = channel_message_handler();
let ping_handler: PingHandler = Arc::new(move |_| {});
let headers = vec![(
BINANCE_API_KEY_HEADER.to_string(),
self.credential.api_key().to_string(),
)];
let config = WebSocketConfig {
url: self.url.clone(),
headers,
heartbeat: self.heartbeat,
heartbeat_msg: None,
reconnect_timeout_ms: Some(5_000),
reconnect_delay_initial_ms: Some(500),
reconnect_delay_max_ms: Some(5_000),
reconnect_backoff_factor: Some(2.0),
reconnect_jitter_ms: Some(250),
reconnect_max_attempts: None,
idle_timeout_ms: None,
};
let keyed_quotas = vec![(
BINANCE_FUTURES_WS_RATE_LIMIT_KEY_ORDER[0]
.as_str()
.to_string(),
binance_futures_ws_order_quota(),
)];
let client = WebSocketClient::connect(
config,
Some(raw_handler),
Some(ping_handler),
None,
keyed_quotas,
Some(binance_futures_ws_order_quota()),
)
.await
.map_err(|e| BinanceFuturesWsApiError::ConnectionError(e.to_string()))?;
self.connection_mode.store(client.connection_mode_atomic());
let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
{
let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
*rx_guard = Some(out_rx);
}
{
let mut tx_guard = self.cmd_tx.write().await;
*tx_guard = cmd_tx;
}
let signal = self.signal.clone();
let credential = self.credential.clone();
let mut handler =
BinanceFuturesWsTradingHandler::new(signal, cmd_rx, raw_rx, out_tx, credential);
self.cmd_tx
.read()
.await
.send(BinanceFuturesWsTradingCommand::SetClient(client))
.map_err(|e| BinanceFuturesWsApiError::HandlerUnavailable(e.to_string()))?;
let cancellation_token = self.cancellation_token.clone();
let handle = get_runtime().spawn(async move {
tokio::select! {
() = cancellation_token.cancelled() => {
log::debug!("Handler task cancelled");
}
_ = handler.run() => {
log::debug!("Handler run completed");
}
}
});
self.task_handle = Some(Arc::new(handle));
Ok(())
}
pub async fn disconnect(&mut self) {
self.signal.store(true, Ordering::Relaxed);
if let Err(e) = self
.cmd_tx
.read()
.await
.send(BinanceFuturesWsTradingCommand::Disconnect)
{
log::warn!("Failed to send disconnect command: {e}");
}
self.cancellation_token.cancel();
if let Some(handle) = self.task_handle.take()
&& let Ok(handle) = Arc::try_unwrap(handle)
{
let _ = handle.await;
}
}
pub async fn place_order(
&self,
params: BinanceNewOrderParams,
) -> BinanceFuturesWsApiResult<String> {
let id = self.next_request_id();
self.place_order_with_id(id.clone(), params).await?;
Ok(id)
}
pub async fn place_order_with_id(
&self,
id: String,
params: BinanceNewOrderParams,
) -> BinanceFuturesWsApiResult<()> {
let cmd = BinanceFuturesWsTradingCommand::PlaceOrder { id, params };
self.send_cmd(cmd).await
}
pub async fn cancel_order(
&self,
params: BinanceCancelOrderParams,
) -> BinanceFuturesWsApiResult<String> {
let id = self.next_request_id();
self.cancel_order_with_id(id.clone(), params).await?;
Ok(id)
}
pub async fn cancel_order_with_id(
&self,
id: String,
params: BinanceCancelOrderParams,
) -> BinanceFuturesWsApiResult<()> {
let cmd = BinanceFuturesWsTradingCommand::CancelOrder { id, params };
self.send_cmd(cmd).await
}
pub async fn modify_order(
&self,
params: BinanceModifyOrderParams,
) -> BinanceFuturesWsApiResult<String> {
let id = self.next_request_id();
self.modify_order_with_id(id.clone(), params).await?;
Ok(id)
}
pub async fn modify_order_with_id(
&self,
id: String,
params: BinanceModifyOrderParams,
) -> BinanceFuturesWsApiResult<()> {
let cmd = BinanceFuturesWsTradingCommand::ModifyOrder { id, params };
self.send_cmd(cmd).await
}
pub async fn cancel_all_orders(
&self,
symbol: impl Into<String>,
) -> BinanceFuturesWsApiResult<String> {
let id = self.next_request_id();
let cmd = BinanceFuturesWsTradingCommand::CancelAllOrders {
id: id.clone(),
symbol: symbol.into(),
};
self.send_cmd(cmd).await?;
Ok(id)
}
pub async fn recv(&self) -> Option<BinanceFuturesWsTradingMessage> {
let rx_opt = {
let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
rx_guard.take()
};
if let Some(mut rx) = rx_opt {
let result = rx.recv().await;
let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
*rx_guard = Some(rx);
result
} else {
None
}
}
async fn send_cmd(&self, cmd: BinanceFuturesWsTradingCommand) -> BinanceFuturesWsApiResult<()> {
self.cmd_tx
.read()
.await
.send(cmd)
.map_err(|e| BinanceFuturesWsApiError::HandlerUnavailable(e.to_string()))
}
}