use std::{
fmt::Debug,
future::Future,
pin::Pin,
sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
},
time::Duration,
};
use futures_util::future;
use nautilus_common::live::get_runtime;
use nautilus_model::{
enums::OrderSide,
identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
instruments::InstrumentAny,
reports::OrderStatusReport,
};
use tokio::{sync::RwLock, task::JoinHandle, time::interval};
use crate::{common::consts::BITMEX_HTTP_TESTNET_URL, http::client::BitmexHttpClient};
const IDEMPOTENT_ALREADY_CANCELED: &str = "AlreadyCanceled";
const IDEMPOTENT_ORDER_NOT_FOUND: &str = "orderID not found";
const IDEMPOTENT_UNABLE_DUE_TO_STATE: &str = "Unable to cancel order due to existing state";
trait CancelExecutor: Send + Sync {
fn add_instrument(&self, instrument: InstrumentAny);
fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
fn cancel_order(
&self,
instrument_id: InstrumentId,
client_order_id: Option<ClientOrderId>,
venue_order_id: Option<VenueOrderId>,
) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
fn cancel_orders(
&self,
instrument_id: InstrumentId,
client_order_ids: Option<Vec<ClientOrderId>>,
venue_order_ids: Option<Vec<VenueOrderId>>,
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
fn cancel_all_orders(
&self,
instrument_id: InstrumentId,
order_side: Option<OrderSide>,
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
}
impl CancelExecutor for BitmexHttpClient {
fn add_instrument(&self, instrument: InstrumentAny) {
Self::cache_instrument(self, instrument);
}
fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
Box::pin(async move {
Self::get_server_time(self)
.await
.map(|_| ())
.map_err(|e| anyhow::anyhow!("{e}"))
})
}
fn cancel_order(
&self,
instrument_id: InstrumentId,
client_order_id: Option<ClientOrderId>,
venue_order_id: Option<VenueOrderId>,
) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
Box::pin(async move {
Self::cancel_order(self, instrument_id, client_order_id, venue_order_id).await
})
}
fn cancel_orders(
&self,
instrument_id: InstrumentId,
client_order_ids: Option<Vec<ClientOrderId>>,
venue_order_ids: Option<Vec<VenueOrderId>>,
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
Box::pin(async move {
Self::cancel_orders(self, instrument_id, client_order_ids, venue_order_ids).await
})
}
fn cancel_all_orders(
&self,
instrument_id: InstrumentId,
order_side: Option<OrderSide>,
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
Box::pin(async move { Self::cancel_all_orders(self, instrument_id, order_side).await })
}
}
#[derive(Debug, Clone)]
pub struct CancelBroadcasterConfig {
pub pool_size: usize,
pub api_key: Option<String>,
pub api_secret: Option<String>,
pub base_url: Option<String>,
pub testnet: bool,
pub timeout_secs: u64,
pub max_retries: u32,
pub retry_delay_ms: u64,
pub retry_delay_max_ms: u64,
pub recv_window_ms: u64,
pub max_requests_per_second: u32,
pub max_requests_per_minute: u32,
pub health_check_interval_secs: u64,
pub health_check_timeout_secs: u64,
pub expected_reject_patterns: Vec<String>,
pub idempotent_success_patterns: Vec<String>,
pub proxy_urls: Vec<Option<String>>,
}
impl Default for CancelBroadcasterConfig {
fn default() -> Self {
Self {
pool_size: 2,
api_key: None,
api_secret: None,
base_url: None,
testnet: false,
timeout_secs: 60,
max_retries: 3,
retry_delay_ms: 1_000,
retry_delay_max_ms: 5_000,
recv_window_ms: 10_000,
max_requests_per_second: 10,
max_requests_per_minute: 120,
health_check_interval_secs: 30,
health_check_timeout_secs: 5,
expected_reject_patterns: vec![
"Order had execInst of ParticipateDoNotInitiate".to_string(),
],
idempotent_success_patterns: vec![
IDEMPOTENT_ALREADY_CANCELED.to_string(),
IDEMPOTENT_ORDER_NOT_FOUND.to_string(),
IDEMPOTENT_UNABLE_DUE_TO_STATE.to_string(),
],
proxy_urls: vec![],
}
}
}
#[derive(Clone)]
struct TransportClient {
executor: Arc<dyn CancelExecutor>,
client_id: String,
healthy: Arc<AtomicBool>,
cancel_count: Arc<AtomicU64>,
error_count: Arc<AtomicU64>,
}
impl Debug for TransportClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(TransportClient))
.field("client_id", &self.client_id)
.field("healthy", &self.healthy)
.field("cancel_count", &self.cancel_count)
.field("error_count", &self.error_count)
.finish()
}
}
impl TransportClient {
fn new<E: CancelExecutor + 'static>(executor: E, client_id: String) -> Self {
Self {
executor: Arc::new(executor),
client_id,
healthy: Arc::new(AtomicBool::new(true)),
cancel_count: Arc::new(AtomicU64::new(0)),
error_count: Arc::new(AtomicU64::new(0)),
}
}
fn is_healthy(&self) -> bool {
self.healthy.load(Ordering::Relaxed)
}
fn mark_healthy(&self) {
self.healthy.store(true, Ordering::Relaxed);
}
fn mark_unhealthy(&self) {
self.healthy.store(false, Ordering::Relaxed);
}
fn get_cancel_count(&self) -> u64 {
self.cancel_count.load(Ordering::Relaxed)
}
fn get_error_count(&self) -> u64 {
self.error_count.load(Ordering::Relaxed)
}
async fn health_check(&self, timeout_secs: u64) -> bool {
match tokio::time::timeout(
Duration::from_secs(timeout_secs),
self.executor.health_check(),
)
.await
{
Ok(Ok(())) => {
self.mark_healthy();
true
}
Ok(Err(e)) => {
log::warn!("Health check failed for client {}: {e:?}", self.client_id);
self.mark_unhealthy();
false
}
Err(_) => {
log::warn!("Health check timeout for client {}", self.client_id);
self.mark_unhealthy();
false
}
}
}
async fn cancel_order(
&self,
instrument_id: InstrumentId,
client_order_id: Option<ClientOrderId>,
venue_order_id: Option<VenueOrderId>,
) -> anyhow::Result<OrderStatusReport> {
self.cancel_count.fetch_add(1, Ordering::Relaxed);
match self
.executor
.cancel_order(instrument_id, client_order_id, venue_order_id)
.await
{
Ok(report) => {
self.mark_healthy();
Ok(report)
}
Err(e) => {
self.error_count.fetch_add(1, Ordering::Relaxed);
Err(e)
}
}
}
}
#[cfg_attr(feature = "python", pyo3::pyclass)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.bitmex")
)]
#[derive(Debug)]
pub struct CancelBroadcaster {
config: CancelBroadcasterConfig,
transports: Arc<Vec<TransportClient>>,
health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
running: Arc<AtomicBool>,
total_cancels: Arc<AtomicU64>,
successful_cancels: Arc<AtomicU64>,
failed_cancels: Arc<AtomicU64>,
expected_rejects: Arc<AtomicU64>,
idempotent_successes: Arc<AtomicU64>,
}
impl CancelBroadcaster {
pub fn new(config: CancelBroadcasterConfig) -> anyhow::Result<Self> {
let mut transports = Vec::with_capacity(config.pool_size);
let base_url = if config.testnet && config.base_url.is_none() {
Some(BITMEX_HTTP_TESTNET_URL.to_string())
} else {
config.base_url.clone()
};
for i in 0..config.pool_size {
let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
let client = BitmexHttpClient::with_credentials(
config.api_key.clone(),
config.api_secret.clone(),
base_url.clone(),
config.timeout_secs,
config.max_retries,
config.retry_delay_ms,
config.retry_delay_max_ms,
config.recv_window_ms,
config.max_requests_per_second,
config.max_requests_per_minute,
proxy_url,
)
.map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
transports.push(TransportClient::new(client, format!("bitmex-cancel-{i}")));
}
Ok(Self {
config,
transports: Arc::new(transports),
health_check_task: Arc::new(RwLock::new(None)),
running: Arc::new(AtomicBool::new(false)),
total_cancels: Arc::new(AtomicU64::new(0)),
successful_cancels: Arc::new(AtomicU64::new(0)),
failed_cancels: Arc::new(AtomicU64::new(0)),
expected_rejects: Arc::new(AtomicU64::new(0)),
idempotent_successes: Arc::new(AtomicU64::new(0)),
})
}
pub async fn start(&self) -> anyhow::Result<()> {
if self.running.load(Ordering::Relaxed) {
return Ok(());
}
self.running.store(true, Ordering::Relaxed);
self.run_health_checks().await;
let transports = Arc::clone(&self.transports);
let running = Arc::clone(&self.running);
let interval_secs = self.config.health_check_interval_secs;
let timeout_secs = self.config.health_check_timeout_secs;
let task = get_runtime().spawn(async move {
let mut ticker = interval(Duration::from_secs(interval_secs));
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
ticker.tick().await;
if !running.load(Ordering::Relaxed) {
break;
}
let tasks: Vec<_> = transports
.iter()
.map(|t| t.health_check(timeout_secs))
.collect();
let results = future::join_all(tasks).await;
let healthy_count = results.iter().filter(|&&r| r).count();
log::debug!(
"Health check complete: {}/{} clients healthy",
healthy_count,
results.len()
);
}
});
*self.health_check_task.write().await = Some(task);
log::info!(
"CancelBroadcaster started with {} clients",
self.transports.len()
);
Ok(())
}
pub async fn stop(&self) {
if !self.running.load(Ordering::Relaxed) {
return;
}
self.running.store(false, Ordering::Relaxed);
if let Some(task) = self.health_check_task.write().await.take() {
task.abort();
}
log::info!("CancelBroadcaster stopped");
}
async fn run_health_checks(&self) {
let tasks: Vec<_> = self
.transports
.iter()
.map(|t| t.health_check(self.config.health_check_timeout_secs))
.collect();
let results = future::join_all(tasks).await;
let healthy_count = results.iter().filter(|&&r| r).count();
log::debug!(
"Health check complete: {}/{} clients healthy",
healthy_count,
results.len()
);
}
fn is_expected_reject(&self, error_message: &str) -> bool {
self.config
.expected_reject_patterns
.iter()
.any(|pattern| error_message.contains(pattern))
}
fn is_idempotent_success(&self, error_message: &str) -> bool {
self.config
.idempotent_success_patterns
.iter()
.any(|pattern| error_message.contains(pattern))
}
async fn process_cancel_results<T>(
&self,
mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
idempotent_result: impl FnOnce() -> anyhow::Result<T>,
operation: &str,
params: String,
idempotent_reason: &str,
) -> anyhow::Result<T>
where
T: Send + 'static,
{
let mut errors = Vec::new();
while !handles.is_empty() {
let current_handles = std::mem::take(&mut handles);
let (result, _idx, remaining) = future::select_all(current_handles).await;
handles = remaining.into_iter().collect();
match result {
Ok((client_id, Ok(result))) => {
for handle in &handles {
handle.abort();
}
self.successful_cancels.fetch_add(1, Ordering::Relaxed);
log::debug!("{operation} broadcast succeeded [{client_id}] {params}");
return Ok(result);
}
Ok((client_id, Err(e))) => {
let error_msg = e.to_string();
if self.is_idempotent_success(&error_msg) {
for handle in &handles {
handle.abort();
}
self.idempotent_successes.fetch_add(1, Ordering::Relaxed);
log::debug!(
"Idempotent success [{client_id}] - {idempotent_reason}: {error_msg} {params}",
);
return idempotent_result();
}
if self.is_expected_reject(&error_msg) {
self.expected_rejects.fetch_add(1, Ordering::Relaxed);
log::debug!(
"Expected {} rejection [{}]: {} {}",
operation.to_lowercase(),
client_id,
error_msg,
params
);
errors.push(error_msg);
} else {
log::warn!(
"{operation} request failed [{client_id}]: {error_msg} {params}"
);
errors.push(error_msg);
}
}
Err(e) => {
log::warn!("{operation} task join error: {e:?}");
errors.push(format!("Task panicked: {e:?}"));
}
}
}
self.failed_cancels.fetch_add(1, Ordering::Relaxed);
log::error!(
"All {} requests failed: {errors:?} {params}",
operation.to_lowercase(),
);
Err(anyhow::anyhow!(
"All {} requests failed: {errors:?}",
operation.to_lowercase(),
))
}
pub async fn broadcast_cancel(
&self,
instrument_id: InstrumentId,
client_order_id: Option<ClientOrderId>,
venue_order_id: Option<VenueOrderId>,
) -> anyhow::Result<Option<OrderStatusReport>> {
self.total_cancels.fetch_add(1, Ordering::Relaxed);
let healthy_transports: Vec<TransportClient> = self
.transports
.iter()
.filter(|t| t.is_healthy())
.cloned()
.collect();
if healthy_transports.is_empty() {
self.failed_cancels.fetch_add(1, Ordering::Relaxed);
anyhow::bail!("No healthy transport clients available");
}
let mut handles = Vec::new();
for transport in healthy_transports {
let handle = get_runtime().spawn(async move {
let client_id = transport.client_id.clone();
let result = transport
.cancel_order(instrument_id, client_order_id, venue_order_id)
.await
.map(Some); (client_id, result)
});
handles.push(handle);
}
self.process_cancel_results(
handles,
|| Ok(None),
"Cancel",
format!("(client_order_id={client_order_id:?}, venue_order_id={venue_order_id:?})"),
"order already cancelled/not found",
)
.await
}
pub async fn broadcast_batch_cancel(
&self,
instrument_id: InstrumentId,
client_order_ids: Option<Vec<ClientOrderId>>,
venue_order_ids: Option<Vec<VenueOrderId>>,
) -> anyhow::Result<Vec<OrderStatusReport>> {
self.total_cancels.fetch_add(1, Ordering::Relaxed);
let healthy_transports: Vec<TransportClient> = self
.transports
.iter()
.filter(|t| t.is_healthy())
.cloned()
.collect();
if healthy_transports.is_empty() {
self.failed_cancels.fetch_add(1, Ordering::Relaxed);
anyhow::bail!("No healthy transport clients available");
}
let mut handles = Vec::new();
for transport in healthy_transports {
let client_order_ids_clone = client_order_ids.clone();
let venue_order_ids_clone = venue_order_ids.clone();
let handle = get_runtime().spawn(async move {
let client_id = transport.client_id.clone();
let result = transport
.executor
.cancel_orders(instrument_id, client_order_ids_clone, venue_order_ids_clone)
.await;
(client_id, result)
});
handles.push(handle);
}
self.process_cancel_results(
handles,
|| Ok(Vec::new()),
"Batch cancel",
format!("(client_order_ids={client_order_ids:?}, venue_order_ids={venue_order_ids:?})"),
"orders already cancelled/not found",
)
.await
}
pub async fn broadcast_cancel_all(
&self,
instrument_id: InstrumentId,
order_side: Option<OrderSide>,
) -> anyhow::Result<Vec<OrderStatusReport>> {
self.total_cancels.fetch_add(1, Ordering::Relaxed);
let healthy_transports: Vec<TransportClient> = self
.transports
.iter()
.filter(|t| t.is_healthy())
.cloned()
.collect();
if healthy_transports.is_empty() {
self.failed_cancels.fetch_add(1, Ordering::Relaxed);
anyhow::bail!("No healthy transport clients available");
}
let mut handles = Vec::new();
for transport in healthy_transports {
let handle = get_runtime().spawn(async move {
let client_id = transport.client_id.clone();
let result = transport
.executor
.cancel_all_orders(instrument_id, order_side)
.await;
(client_id, result)
});
handles.push(handle);
}
self.process_cancel_results(
handles,
|| Ok(Vec::new()),
"Cancel all",
format!("(instrument_id={instrument_id}, order_side={order_side:?})"),
"no orders to cancel",
)
.await
}
pub fn get_metrics(&self) -> BroadcasterMetrics {
let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
let total_clients = self.transports.len();
BroadcasterMetrics {
total_cancels: self.total_cancels.load(Ordering::Relaxed),
successful_cancels: self.successful_cancels.load(Ordering::Relaxed),
failed_cancels: self.failed_cancels.load(Ordering::Relaxed),
expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
idempotent_successes: self.idempotent_successes.load(Ordering::Relaxed),
healthy_clients,
total_clients,
}
}
pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
self.get_metrics()
}
pub fn get_client_stats(&self) -> Vec<ClientStats> {
self.transports
.iter()
.map(|t| ClientStats {
client_id: t.client_id.clone(),
healthy: t.is_healthy(),
cancel_count: t.get_cancel_count(),
error_count: t.get_error_count(),
})
.collect()
}
pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
self.get_client_stats()
}
pub fn cache_instrument(&self, instrument: &InstrumentAny) {
for transport in self.transports.iter() {
transport.executor.add_instrument(instrument.clone());
}
}
#[must_use]
pub fn clone_for_async(&self) -> Self {
Self {
config: self.config.clone(),
transports: Arc::clone(&self.transports),
health_check_task: Arc::clone(&self.health_check_task),
running: Arc::clone(&self.running),
total_cancels: Arc::clone(&self.total_cancels),
successful_cancels: Arc::clone(&self.successful_cancels),
failed_cancels: Arc::clone(&self.failed_cancels),
expected_rejects: Arc::clone(&self.expected_rejects),
idempotent_successes: Arc::clone(&self.idempotent_successes),
}
}
#[cfg(test)]
fn new_with_transports(
config: CancelBroadcasterConfig,
transports: Vec<TransportClient>,
) -> Self {
Self {
config,
transports: Arc::new(transports),
health_check_task: Arc::new(RwLock::new(None)),
running: Arc::new(AtomicBool::new(false)),
total_cancels: Arc::new(AtomicU64::new(0)),
successful_cancels: Arc::new(AtomicU64::new(0)),
failed_cancels: Arc::new(AtomicU64::new(0)),
expected_rejects: Arc::new(AtomicU64::new(0)),
idempotent_successes: Arc::new(AtomicU64::new(0)),
}
}
}
#[derive(Debug, Clone)]
pub struct BroadcasterMetrics {
pub total_cancels: u64,
pub successful_cancels: u64,
pub failed_cancels: u64,
pub expected_rejects: u64,
pub idempotent_successes: u64,
pub healthy_clients: usize,
pub total_clients: usize,
}
#[derive(Debug, Clone)]
pub struct ClientStats {
pub client_id: String,
pub healthy: bool,
pub cancel_count: u64,
pub error_count: u64,
}
#[cfg(test)]
mod tests {
use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
use nautilus_core::UUID4;
use nautilus_model::{
enums::{
ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
},
identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
reports::OrderStatusReport,
types::{Price, Quantity},
};
use super::*;
#[derive(Clone)]
#[allow(clippy::type_complexity)]
struct MockExecutor {
handler: Arc<
dyn Fn(
InstrumentId,
Option<ClientOrderId>,
Option<VenueOrderId>,
)
-> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
+ Send
+ Sync,
>,
}
impl MockExecutor {
fn new<F, Fut>(handler: F) -> Self
where
F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
+ Send
+ Sync
+ 'static,
Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
{
Self {
handler: Arc::new(move |id, cid, vid| Box::pin(handler(id, cid, vid))),
}
}
}
impl CancelExecutor for MockExecutor {
fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
Box::pin(async { Ok(()) })
}
fn cancel_order(
&self,
instrument_id: InstrumentId,
client_order_id: Option<ClientOrderId>,
venue_order_id: Option<VenueOrderId>,
) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
(self.handler)(instrument_id, client_order_id, venue_order_id)
}
fn cancel_orders(
&self,
_instrument_id: InstrumentId,
_client_order_ids: Option<Vec<ClientOrderId>>,
_venue_order_ids: Option<Vec<VenueOrderId>>,
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
{
Box::pin(async { Ok(Vec::new()) })
}
fn cancel_all_orders(
&self,
instrument_id: InstrumentId,
_order_side: Option<OrderSide>,
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
{
let handler = Arc::clone(&self.handler);
Box::pin(async move {
let result = handler(instrument_id, None, None).await;
match result {
Ok(_) => Ok(Vec::new()),
Err(e) => Err(e),
}
})
}
fn add_instrument(&self, _instrument: InstrumentAny) {
}
}
fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
OrderStatusReport {
account_id: AccountId::from("BITMEX-001"),
instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
venue_order_id: VenueOrderId::from(venue_order_id),
order_side: OrderSide::Buy,
order_type: OrderType::Limit,
time_in_force: TimeInForce::Gtc,
order_status: OrderStatus::Canceled,
price: Some(Price::new(50000.0, 2)),
quantity: Quantity::new(100.0, 0),
filled_qty: Quantity::new(0.0, 0),
report_id: UUID4::new(),
ts_accepted: 0.into(),
ts_last: 0.into(),
ts_init: 0.into(),
client_order_id: None,
avg_px: None,
trigger_price: None,
trigger_type: None,
contingency_type: ContingencyType::NoContingency,
expire_time: None,
order_list_id: None,
venue_position_id: None,
linked_order_ids: None,
parent_order_id: None,
display_qty: None,
limit_offset: None,
trailing_offset: None,
trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
post_only: false,
reduce_only: false,
cancel_reason: None,
ts_triggered: None,
}
}
fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
where
F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
+ Send
+ Sync
+ 'static,
Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
{
let executor = MockExecutor::new(handler);
TransportClient::new(executor, client_id.to_string())
}
#[tokio::test]
async fn test_broadcast_cancel_immediate_success() {
let report = create_test_report("ORDER-1");
let report_clone = report.clone();
let transports = vec![
create_stub_transport("client-0", move |_, _, _| {
let report = report_clone.clone();
async move { Ok(report) }
}),
create_stub_transport("client-1", |_, _, _| async {
tokio::time::sleep(Duration::from_secs(10)).await;
anyhow::bail!("Should be aborted")
}),
];
let config = CancelBroadcasterConfig::default();
let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
let result = broadcaster
.broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
.await;
assert!(result.is_ok());
let returned_report = result.unwrap();
assert!(returned_report.is_some());
assert_eq!(
returned_report.unwrap().venue_order_id,
report.venue_order_id
);
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.successful_cancels, 1);
assert_eq!(metrics.failed_cancels, 0);
assert_eq!(metrics.total_cancels, 1);
}
#[tokio::test]
async fn test_broadcast_cancel_idempotent_success() {
let transports = vec![
create_stub_transport("client-0", |_, _, _| async {
anyhow::bail!("AlreadyCanceled")
}),
create_stub_transport("client-1", |_, _, _| async {
tokio::time::sleep(Duration::from_secs(10)).await;
anyhow::bail!("Should be aborted")
}),
];
let config = CancelBroadcasterConfig::default();
let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
let result = broadcaster
.broadcast_cancel(instrument_id, None, Some(VenueOrderId::from("12345")))
.await;
assert!(result.is_ok());
assert!(result.unwrap().is_none());
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.idempotent_successes, 1);
assert_eq!(metrics.successful_cancels, 0);
assert_eq!(metrics.failed_cancels, 0);
}
#[tokio::test]
async fn test_broadcast_cancel_mixed_idempotent_and_failure() {
let transports = vec![
create_stub_transport("client-0", |_, _, _| async {
anyhow::bail!("502 Bad Gateway")
}),
create_stub_transport("client-1", |_, _, _| async {
anyhow::bail!("orderID not found")
}),
];
let config = CancelBroadcasterConfig::default();
let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
let result = broadcaster
.broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-456")), None)
.await;
assert!(result.is_ok());
assert!(result.unwrap().is_none());
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.idempotent_successes, 1);
assert_eq!(metrics.failed_cancels, 0);
}
#[tokio::test]
async fn test_broadcast_cancel_all_failures() {
let transports = vec![
create_stub_transport("client-0", |_, _, _| async {
anyhow::bail!("502 Bad Gateway")
}),
create_stub_transport("client-1", |_, _, _| async {
anyhow::bail!("Connection refused")
}),
];
let config = CancelBroadcasterConfig::default();
let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
let result = broadcaster.broadcast_cancel_all(instrument_id, None).await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("All cancel all requests failed")
);
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.failed_cancels, 1);
assert_eq!(metrics.successful_cancels, 0);
assert_eq!(metrics.idempotent_successes, 0);
}
#[tokio::test]
async fn test_broadcast_cancel_no_healthy_clients() {
let transport = create_stub_transport("client-0", |_, _, _| async {
Ok(create_test_report("ORDER-1"))
});
transport.healthy.store(false, Ordering::Relaxed);
let config = CancelBroadcasterConfig::default();
let broadcaster = CancelBroadcaster::new_with_transports(config, vec![transport]);
let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
let result = broadcaster
.broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-789")), None)
.await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("No healthy transport clients available")
);
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.failed_cancels, 1);
}
#[tokio::test]
async fn test_broadcast_cancel_metrics_increment() {
let report1 = create_test_report("ORDER-1");
let report1_clone = report1.clone();
let report2 = create_test_report("ORDER-2");
let report2_clone = report2.clone();
let transports = vec![
create_stub_transport("client-0", move |_, _, _| {
let report = report1_clone.clone();
async move { Ok(report) }
}),
create_stub_transport("client-1", move |_, _, _| {
let report = report2_clone.clone();
async move { Ok(report) }
}),
];
let config = CancelBroadcasterConfig::default();
let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
let _ = broadcaster
.broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-1")), None)
.await;
let _ = broadcaster
.broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-2")), None)
.await;
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.total_cancels, 2);
assert_eq!(metrics.successful_cancels, 2);
}
#[tokio::test]
async fn test_broadcast_cancel_expected_reject_pattern() {
let transports = vec![
create_stub_transport("client-0", |_, _, _| async {
anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
}),
create_stub_transport("client-1", |_, _, _| async {
anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
}),
];
let config = CancelBroadcasterConfig::default();
let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
let result = broadcaster
.broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-PDI")), None)
.await;
assert!(result.is_err());
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.expected_rejects, 2);
assert_eq!(metrics.failed_cancels, 1);
}
#[tokio::test]
async fn test_broadcaster_creation_with_pool() {
let transports = vec![
create_stub_transport("client-0", |_, _, _| async {
Ok(create_test_report("ORDER-1"))
}),
create_stub_transport("client-1", |_, _, _| async {
Ok(create_test_report("ORDER-1"))
}),
create_stub_transport("client-2", |_, _, _| async {
Ok(create_test_report("ORDER-1"))
}),
];
let config = CancelBroadcasterConfig::default();
let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.total_clients, 3);
assert_eq!(metrics.total_cancels, 0);
assert_eq!(metrics.successful_cancels, 0);
assert_eq!(metrics.failed_cancels, 0);
}
#[tokio::test]
async fn test_broadcaster_lifecycle() {
let transports = vec![
create_stub_transport("client-0", |_, _, _| async {
Ok(create_test_report("ORDER-1"))
}),
create_stub_transport("client-1", |_, _, _| async {
Ok(create_test_report("ORDER-1"))
}),
];
let config = CancelBroadcasterConfig::default();
let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
assert!(!broadcaster.running.load(Ordering::Relaxed));
let start_result = broadcaster.start().await;
assert!(start_result.is_ok());
assert!(broadcaster.running.load(Ordering::Relaxed));
let start_again = broadcaster.start().await;
assert!(start_again.is_ok());
broadcaster.stop().await;
assert!(!broadcaster.running.load(Ordering::Relaxed));
broadcaster.stop().await;
assert!(!broadcaster.running.load(Ordering::Relaxed));
}
#[tokio::test]
async fn test_client_stats_collection() {
let transports = vec![
create_stub_transport("client-0", |_, _, _| async {
Ok(create_test_report("ORDER-1"))
}),
create_stub_transport("client-1", |_, _, _| async {
Ok(create_test_report("ORDER-1"))
}),
];
let config = CancelBroadcasterConfig::default();
let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
let stats = broadcaster.get_client_stats_async().await;
assert_eq!(stats.len(), 2);
assert_eq!(stats[0].client_id, "client-0");
assert_eq!(stats[1].client_id, "client-1");
assert!(stats[0].healthy); assert!(stats[1].healthy);
assert_eq!(stats[0].cancel_count, 0);
assert_eq!(stats[1].cancel_count, 0);
assert_eq!(stats[0].error_count, 0);
assert_eq!(stats[1].error_count, 0);
}
#[tokio::test]
async fn test_testnet_config_sets_base_url() {
let config = CancelBroadcasterConfig {
pool_size: 1,
api_key: Some("test_key".to_string()),
api_secret: Some("test_secret".to_string()),
base_url: None, testnet: true, timeout_secs: 5,
max_retries: 3,
retry_delay_ms: 1_000,
retry_delay_max_ms: 5_000,
recv_window_ms: 10_000,
max_requests_per_second: 10,
max_requests_per_minute: 120,
health_check_interval_secs: 60,
health_check_timeout_secs: 5,
expected_reject_patterns: vec![],
idempotent_success_patterns: vec![],
proxy_urls: vec![],
};
let broadcaster = CancelBroadcaster::new(config);
assert!(broadcaster.is_ok());
}
#[tokio::test]
async fn test_constructor_honors_default_pool_size() {
let config = CancelBroadcasterConfig {
api_key: Some("test_key".to_string()),
api_secret: Some("test_secret".to_string()),
base_url: Some("http://127.0.0.1:19999".to_string()),
..Default::default()
};
let expected_pool = config.pool_size;
let broadcaster = CancelBroadcaster::new(config).unwrap();
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.total_clients, expected_pool);
}
#[tokio::test]
async fn test_default_config() {
let transports = vec![
create_stub_transport("client-0", |_, _, _| async {
Ok(create_test_report("ORDER-1"))
}),
create_stub_transport("client-1", |_, _, _| async {
Ok(create_test_report("ORDER-1"))
}),
];
let config = CancelBroadcasterConfig::default();
let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.total_clients, 2);
}
#[tokio::test]
async fn test_clone_for_async() {
let transports = vec![create_stub_transport("client-0", |_, _, _| async {
Ok(create_test_report("ORDER-1"))
})];
let config = CancelBroadcasterConfig::default();
let broadcaster1 = CancelBroadcaster::new_with_transports(config, transports);
broadcaster1.total_cancels.fetch_add(1, Ordering::Relaxed);
let broadcaster2 = broadcaster1.clone_for_async();
let metrics2 = broadcaster2.get_metrics_async().await;
assert_eq!(metrics2.total_cancels, 1);
broadcaster2
.successful_cancels
.fetch_add(5, Ordering::Relaxed);
let metrics1 = broadcaster1.get_metrics_async().await;
assert_eq!(metrics1.successful_cancels, 5);
}
#[tokio::test]
async fn test_pattern_matching() {
let transports = vec![create_stub_transport("client-0", |_, _, _| async {
Ok(create_test_report("ORDER-1"))
})];
let config = CancelBroadcasterConfig {
expected_reject_patterns: vec![
"ParticipateDoNotInitiate".to_string(),
"Close-only".to_string(),
],
idempotent_success_patterns: vec![
"AlreadyCanceled".to_string(),
"orderID not found".to_string(),
"Unable to cancel".to_string(),
],
..Default::default()
};
let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
assert!(broadcaster.is_expected_reject("Order had execInst of ParticipateDoNotInitiate"));
assert!(broadcaster.is_expected_reject("This is a Close-only order"));
assert!(!broadcaster.is_expected_reject("Connection timeout"));
assert!(broadcaster.is_idempotent_success("AlreadyCanceled"));
assert!(broadcaster.is_idempotent_success("Error: orderID not found for this account"));
assert!(broadcaster.is_idempotent_success("Unable to cancel order due to existing state"));
assert!(!broadcaster.is_idempotent_success("502 Bad Gateway"));
}
#[tokio::test]
async fn test_broadcast_batch_cancel_structure() {
let transports = vec![
create_stub_transport("client-0", |_, _, _| async {
Ok(create_test_report("ORDER-1"))
}),
create_stub_transport("client-1", |_, _, _| async {
Ok(create_test_report("ORDER-1"))
}),
];
let config = CancelBroadcasterConfig {
idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
..Default::default()
};
let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.total_clients, 2);
assert_eq!(metrics.total_cancels, 0);
assert_eq!(metrics.successful_cancels, 0);
assert_eq!(metrics.failed_cancels, 0);
}
#[tokio::test]
async fn test_broadcast_cancel_all_structure() {
let transports = vec![
create_stub_transport("client-0", |_, _, _| async {
Ok(create_test_report("ORDER-1"))
}),
create_stub_transport("client-1", |_, _, _| async {
Ok(create_test_report("ORDER-1"))
}),
create_stub_transport("client-2", |_, _, _| async {
Ok(create_test_report("ORDER-1"))
}),
];
let config = CancelBroadcasterConfig {
idempotent_success_patterns: vec!["orderID not found".to_string()],
..Default::default()
};
let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.total_clients, 3);
assert_eq!(metrics.healthy_clients, 3);
assert_eq!(metrics.total_cancels, 0);
}
#[tokio::test]
async fn test_single_cancel_metrics_with_mixed_responses() {
let transports = vec![
create_stub_transport("client-0", |_, _, _| async {
anyhow::bail!("Connection timeout")
}),
create_stub_transport("client-1", |_, _, _| async {
anyhow::bail!("AlreadyCanceled")
}),
];
let config = CancelBroadcasterConfig::default();
let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
let result = broadcaster
.broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
.await;
assert!(result.is_ok());
assert!(result.unwrap().is_none());
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(
metrics.failed_cancels, 0,
"Idempotent success should not increment failed_cancels"
);
assert_eq!(metrics.idempotent_successes, 1);
assert_eq!(metrics.successful_cancels, 0);
}
#[tokio::test]
async fn test_metrics_initialization_and_health() {
let transports = vec![
create_stub_transport("client-0", |_, _, _| async {
Ok(create_test_report("ORDER-1"))
}),
create_stub_transport("client-1", |_, _, _| async {
Ok(create_test_report("ORDER-1"))
}),
create_stub_transport("client-2", |_, _, _| async {
Ok(create_test_report("ORDER-1"))
}),
create_stub_transport("client-3", |_, _, _| async {
Ok(create_test_report("ORDER-1"))
}),
];
let config = CancelBroadcasterConfig::default();
let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.total_cancels, 0);
assert_eq!(metrics.successful_cancels, 0);
assert_eq!(metrics.failed_cancels, 0);
assert_eq!(metrics.expected_rejects, 0);
assert_eq!(metrics.idempotent_successes, 0);
assert_eq!(metrics.healthy_clients, 4);
assert_eq!(metrics.total_clients, 4);
}
#[tokio::test]
async fn test_health_check_task_lifecycle() {
let transports = vec![create_stub_transport("client-0", |_, _, _| async {
Ok(create_test_report("ORDER-1"))
})];
let config = CancelBroadcasterConfig {
health_check_interval_secs: 1, health_check_timeout_secs: 1,
..Default::default()
};
let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
broadcaster.start().await.unwrap();
assert!(broadcaster.running.load(Ordering::Relaxed));
{
let task_guard = broadcaster.health_check_task.read().await;
assert!(task_guard.is_some());
}
broadcaster.stop().await;
assert!(!broadcaster.running.load(Ordering::Relaxed));
{
let task_guard = broadcaster.health_check_task.read().await;
assert!(task_guard.is_none());
}
}
}