use std::{
fmt::Debug,
future::Future,
pin::Pin,
sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
},
time::Duration,
};
use futures_util::future;
use nautilus_common::live::get_runtime;
use nautilus_model::{
enums::{ContingencyType, OrderSide, OrderType, TimeInForce, TrailingOffsetType, TriggerType},
identifiers::{ClientOrderId, InstrumentId, OrderListId},
instruments::InstrumentAny,
reports::OrderStatusReport,
types::{Price, Quantity},
};
use tokio::{sync::RwLock, task::JoinHandle, time::interval};
use crate::{
common::{consts::BITMEX_HTTP_TESTNET_URL, enums::BitmexPegPriceType},
http::client::BitmexHttpClient,
};
trait SubmitExecutor: Send + Sync {
fn add_instrument(&self, instrument: InstrumentAny);
fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
#[allow(clippy::too_many_arguments)]
fn submit_order(
&self,
instrument_id: InstrumentId,
client_order_id: ClientOrderId,
order_side: OrderSide,
order_type: OrderType,
quantity: Quantity,
time_in_force: TimeInForce,
price: Option<Price>,
trigger_price: Option<Price>,
trigger_type: Option<TriggerType>,
trailing_offset: Option<f64>,
trailing_offset_type: Option<TrailingOffsetType>,
display_qty: Option<Quantity>,
post_only: bool,
reduce_only: bool,
order_list_id: Option<OrderListId>,
contingency_type: Option<ContingencyType>,
peg_price_type: Option<BitmexPegPriceType>,
peg_offset_value: Option<f64>,
) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
}
impl SubmitExecutor for BitmexHttpClient {
fn add_instrument(&self, instrument: InstrumentAny) {
Self::cache_instrument(self, instrument);
}
fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
Box::pin(async move {
Self::get_server_time(self)
.await
.map(|_| ())
.map_err(|e| anyhow::anyhow!("{e}"))
})
}
#[allow(clippy::too_many_arguments)]
fn submit_order(
&self,
instrument_id: InstrumentId,
client_order_id: ClientOrderId,
order_side: OrderSide,
order_type: OrderType,
quantity: Quantity,
time_in_force: TimeInForce,
price: Option<Price>,
trigger_price: Option<Price>,
trigger_type: Option<TriggerType>,
trailing_offset: Option<f64>,
trailing_offset_type: Option<TrailingOffsetType>,
display_qty: Option<Quantity>,
post_only: bool,
reduce_only: bool,
order_list_id: Option<OrderListId>,
contingency_type: Option<ContingencyType>,
peg_price_type: Option<BitmexPegPriceType>,
peg_offset_value: Option<f64>,
) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
Box::pin(async move {
Self::submit_order(
self,
instrument_id,
client_order_id,
order_side,
order_type,
quantity,
time_in_force,
price,
trigger_price,
trigger_type,
trailing_offset,
trailing_offset_type,
display_qty,
post_only,
reduce_only,
order_list_id,
contingency_type,
peg_price_type,
peg_offset_value,
)
.await
})
}
}
#[derive(Debug, Clone)]
pub struct SubmitBroadcasterConfig {
pub pool_size: usize,
pub api_key: Option<String>,
pub api_secret: Option<String>,
pub base_url: Option<String>,
pub testnet: bool,
pub timeout_secs: u64,
pub max_retries: u32,
pub retry_delay_ms: u64,
pub retry_delay_max_ms: u64,
pub recv_window_ms: u64,
pub max_requests_per_second: u32,
pub max_requests_per_minute: u32,
pub health_check_interval_secs: u64,
pub health_check_timeout_secs: u64,
pub expected_reject_patterns: Vec<String>,
pub proxy_urls: Vec<Option<String>>,
}
impl Default for SubmitBroadcasterConfig {
fn default() -> Self {
Self {
pool_size: 3,
api_key: None,
api_secret: None,
base_url: None,
testnet: false,
timeout_secs: 60,
max_retries: 3,
retry_delay_ms: 1_000,
retry_delay_max_ms: 5_000,
recv_window_ms: 10_000,
max_requests_per_second: 10,
max_requests_per_minute: 120,
health_check_interval_secs: 30,
health_check_timeout_secs: 5,
expected_reject_patterns: vec!["Duplicate clOrdID".to_string()],
proxy_urls: vec![],
}
}
}
#[derive(Clone)]
struct TransportClient {
executor: Arc<dyn SubmitExecutor>,
client_id: String,
healthy: Arc<AtomicBool>,
submit_count: Arc<AtomicU64>,
error_count: Arc<AtomicU64>,
}
impl Debug for TransportClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(TransportClient))
.field("client_id", &self.client_id)
.field("healthy", &self.healthy)
.field("submit_count", &self.submit_count)
.field("error_count", &self.error_count)
.finish()
}
}
impl TransportClient {
fn new<E: SubmitExecutor + 'static>(executor: E, client_id: String) -> Self {
Self {
executor: Arc::new(executor),
client_id,
healthy: Arc::new(AtomicBool::new(true)),
submit_count: Arc::new(AtomicU64::new(0)),
error_count: Arc::new(AtomicU64::new(0)),
}
}
fn is_healthy(&self) -> bool {
self.healthy.load(Ordering::Relaxed)
}
fn mark_healthy(&self) {
self.healthy.store(true, Ordering::Relaxed);
}
fn mark_unhealthy(&self) {
self.healthy.store(false, Ordering::Relaxed);
}
fn get_submit_count(&self) -> u64 {
self.submit_count.load(Ordering::Relaxed)
}
fn get_error_count(&self) -> u64 {
self.error_count.load(Ordering::Relaxed)
}
async fn health_check(&self, timeout_secs: u64) -> bool {
match tokio::time::timeout(
Duration::from_secs(timeout_secs),
self.executor.health_check(),
)
.await
{
Ok(Ok(())) => {
self.mark_healthy();
true
}
Ok(Err(e)) => {
log::warn!("Health check failed for client {}: {e:?}", self.client_id);
self.mark_unhealthy();
false
}
Err(_) => {
log::warn!("Health check timeout for client {}", self.client_id);
self.mark_unhealthy();
false
}
}
}
#[allow(clippy::too_many_arguments)]
async fn submit_order(
&self,
instrument_id: InstrumentId,
client_order_id: ClientOrderId,
order_side: OrderSide,
order_type: OrderType,
quantity: Quantity,
time_in_force: TimeInForce,
price: Option<Price>,
trigger_price: Option<Price>,
trigger_type: Option<TriggerType>,
trailing_offset: Option<f64>,
trailing_offset_type: Option<TrailingOffsetType>,
display_qty: Option<Quantity>,
post_only: bool,
reduce_only: bool,
order_list_id: Option<OrderListId>,
contingency_type: Option<ContingencyType>,
peg_price_type: Option<BitmexPegPriceType>,
peg_offset_value: Option<f64>,
) -> anyhow::Result<OrderStatusReport> {
self.submit_count.fetch_add(1, Ordering::Relaxed);
match self
.executor
.submit_order(
instrument_id,
client_order_id,
order_side,
order_type,
quantity,
time_in_force,
price,
trigger_price,
trigger_type,
trailing_offset,
trailing_offset_type,
display_qty,
post_only,
reduce_only,
order_list_id,
contingency_type,
peg_price_type,
peg_offset_value,
)
.await
{
Ok(report) => {
self.mark_healthy();
Ok(report)
}
Err(e) => {
self.error_count.fetch_add(1, Ordering::Relaxed);
Err(e)
}
}
}
}
#[cfg_attr(feature = "python", pyo3::pyclass)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.bitmex")
)]
#[derive(Debug)]
pub struct SubmitBroadcaster {
config: SubmitBroadcasterConfig,
transports: Arc<Vec<TransportClient>>,
health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
running: Arc<AtomicBool>,
total_submits: Arc<AtomicU64>,
successful_submits: Arc<AtomicU64>,
failed_submits: Arc<AtomicU64>,
expected_rejects: Arc<AtomicU64>,
}
impl SubmitBroadcaster {
pub fn new(config: SubmitBroadcasterConfig) -> anyhow::Result<Self> {
let mut transports = Vec::with_capacity(config.pool_size);
let base_url = if config.testnet && config.base_url.is_none() {
Some(BITMEX_HTTP_TESTNET_URL.to_string())
} else {
config.base_url.clone()
};
for i in 0..config.pool_size {
let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
let client = BitmexHttpClient::with_credentials(
config.api_key.clone(),
config.api_secret.clone(),
base_url.clone(),
config.timeout_secs,
config.max_retries,
config.retry_delay_ms,
config.retry_delay_max_ms,
config.recv_window_ms,
config.max_requests_per_second,
config.max_requests_per_minute,
proxy_url,
)
.map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
transports.push(TransportClient::new(client, format!("bitmex-submit-{i}")));
}
Ok(Self {
config,
transports: Arc::new(transports),
health_check_task: Arc::new(RwLock::new(None)),
running: Arc::new(AtomicBool::new(false)),
total_submits: Arc::new(AtomicU64::new(0)),
successful_submits: Arc::new(AtomicU64::new(0)),
failed_submits: Arc::new(AtomicU64::new(0)),
expected_rejects: Arc::new(AtomicU64::new(0)),
})
}
pub async fn start(&self) -> anyhow::Result<()> {
if self.running.load(Ordering::Relaxed) {
return Ok(());
}
self.running.store(true, Ordering::Relaxed);
self.run_health_checks().await;
let transports = Arc::clone(&self.transports);
let running = Arc::clone(&self.running);
let interval_secs = self.config.health_check_interval_secs;
let timeout_secs = self.config.health_check_timeout_secs;
let task = get_runtime().spawn(async move {
let mut ticker = interval(Duration::from_secs(interval_secs));
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
ticker.tick().await;
if !running.load(Ordering::Relaxed) {
break;
}
let tasks: Vec<_> = transports
.iter()
.map(|t| t.health_check(timeout_secs))
.collect();
let results = future::join_all(tasks).await;
let healthy_count = results.iter().filter(|&&r| r).count();
log::debug!(
"Health check complete: {healthy_count}/{} clients healthy",
results.len()
);
}
});
*self.health_check_task.write().await = Some(task);
log::info!(
"SubmitBroadcaster started with {} clients",
self.transports.len()
);
Ok(())
}
pub async fn stop(&self) {
if !self.running.load(Ordering::Relaxed) {
return;
}
self.running.store(false, Ordering::Relaxed);
if let Some(task) = self.health_check_task.write().await.take() {
task.abort();
}
log::info!("SubmitBroadcaster stopped");
}
async fn run_health_checks(&self) {
let tasks: Vec<_> = self
.transports
.iter()
.map(|t| t.health_check(self.config.health_check_timeout_secs))
.collect();
let results = future::join_all(tasks).await;
let healthy_count = results.iter().filter(|&&r| r).count();
log::debug!(
"Health check complete: {healthy_count}/{} clients healthy",
results.len()
);
}
fn is_expected_reject(&self, error_message: &str) -> bool {
self.config
.expected_reject_patterns
.iter()
.any(|pattern| error_message.contains(pattern))
}
async fn process_submit_results<T>(
&self,
mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
operation: &str,
params: String,
) -> anyhow::Result<T>
where
T: Send + 'static,
{
let mut errors = Vec::new();
let mut all_duplicate_clordid = true;
while !handles.is_empty() {
let current_handles = std::mem::take(&mut handles);
let (result, _idx, remaining) = future::select_all(current_handles).await;
handles = remaining.into_iter().collect();
match result {
Ok((client_id, Ok(result))) => {
for handle in &handles {
handle.abort();
}
self.successful_submits.fetch_add(1, Ordering::Relaxed);
log::debug!("{operation} broadcast succeeded [{client_id}] {params}",);
return Ok(result);
}
Ok((client_id, Err(e))) => {
let error_msg = e.to_string();
let is_duplicate = error_msg.contains("Duplicate clOrdID");
if !is_duplicate {
all_duplicate_clordid = false;
}
if self.is_expected_reject(&error_msg) {
self.expected_rejects.fetch_add(1, Ordering::Relaxed);
log::debug!(
"Expected {} rejection [{client_id}]: {error_msg} {params}",
operation.to_lowercase(),
);
errors.push(error_msg);
} else {
log::warn!(
"{operation} request failed [{client_id}]: {error_msg} {params}",
);
errors.push(error_msg);
}
}
Err(e) => {
all_duplicate_clordid = false;
log::warn!("{operation} task join error: {e:?}");
errors.push(format!("Task panicked: {e:?}"));
}
}
}
self.failed_submits.fetch_add(1, Ordering::Relaxed);
if all_duplicate_clordid && !errors.is_empty() {
log::warn!(
"All {} requests returned 'Duplicate clOrdID' - order likely exists {params}",
operation.to_lowercase(),
);
anyhow::bail!("IDEMPOTENT_DUPLICATE: Order likely exists but confirmation was lost");
}
log::error!(
"All {} requests failed: {errors:?} {params}",
operation.to_lowercase(),
);
Err(anyhow::anyhow!(
"All {} requests failed: {:?}",
operation.to_lowercase(),
errors
))
}
#[allow(clippy::too_many_arguments)]
pub async fn broadcast_submit(
&self,
instrument_id: InstrumentId,
client_order_id: ClientOrderId,
order_side: OrderSide,
order_type: OrderType,
quantity: Quantity,
time_in_force: TimeInForce,
price: Option<Price>,
trigger_price: Option<Price>,
trigger_type: Option<TriggerType>,
trailing_offset: Option<f64>,
trailing_offset_type: Option<TrailingOffsetType>,
display_qty: Option<Quantity>,
post_only: bool,
reduce_only: bool,
order_list_id: Option<OrderListId>,
contingency_type: Option<ContingencyType>,
submit_tries: Option<usize>,
peg_price_type: Option<BitmexPegPriceType>,
peg_offset_value: Option<f64>,
) -> anyhow::Result<OrderStatusReport> {
self.total_submits.fetch_add(1, Ordering::Relaxed);
let pool_size = self.config.pool_size;
let actual_tries = if let Some(t) = submit_tries {
if t > pool_size {
log::warn!("submit_tries={t} exceeds pool_size={pool_size}, capping at pool_size");
}
std::cmp::min(t, pool_size)
} else {
pool_size
};
log::debug!(
"Submit broadcast requested for client_order_id={client_order_id} (tries={actual_tries}/{pool_size})",
);
let healthy_transports: Vec<TransportClient> = self
.transports
.iter()
.filter(|t| t.is_healthy())
.take(actual_tries)
.cloned()
.collect();
if healthy_transports.is_empty() {
self.failed_submits.fetch_add(1, Ordering::Relaxed);
anyhow::bail!("No healthy transport clients available");
}
log::debug!(
"Broadcasting submit to {} clients: client_order_id={client_order_id}, instrument_id={instrument_id}",
healthy_transports.len(),
);
let mut handles = Vec::new();
for transport in healthy_transports {
let handle = get_runtime().spawn(async move {
let client_id = transport.client_id.clone();
let result = transport
.submit_order(
instrument_id,
client_order_id,
order_side,
order_type,
quantity,
time_in_force,
price,
trigger_price,
trigger_type,
trailing_offset,
trailing_offset_type,
display_qty,
post_only,
reduce_only,
order_list_id,
contingency_type,
peg_price_type,
peg_offset_value,
)
.await;
(client_id, result)
});
handles.push(handle);
}
self.process_submit_results(
handles,
"Submit",
format!("(client_order_id={client_order_id:?})"),
)
.await
}
pub fn get_metrics(&self) -> BroadcasterMetrics {
let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
let total_clients = self.transports.len();
BroadcasterMetrics {
total_submits: self.total_submits.load(Ordering::Relaxed),
successful_submits: self.successful_submits.load(Ordering::Relaxed),
failed_submits: self.failed_submits.load(Ordering::Relaxed),
expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
healthy_clients,
total_clients,
}
}
pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
self.get_metrics()
}
pub fn get_client_stats(&self) -> Vec<ClientStats> {
self.transports
.iter()
.map(|t| ClientStats {
client_id: t.client_id.clone(),
healthy: t.is_healthy(),
submit_count: t.get_submit_count(),
error_count: t.get_error_count(),
})
.collect()
}
pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
self.get_client_stats()
}
pub fn cache_instrument(&self, instrument: &InstrumentAny) {
for transport in self.transports.iter() {
transport.executor.add_instrument(instrument.clone());
}
}
#[must_use]
pub fn clone_for_async(&self) -> Self {
Self {
config: self.config.clone(),
transports: Arc::clone(&self.transports),
health_check_task: Arc::clone(&self.health_check_task),
running: Arc::clone(&self.running),
total_submits: Arc::clone(&self.total_submits),
successful_submits: Arc::clone(&self.successful_submits),
failed_submits: Arc::clone(&self.failed_submits),
expected_rejects: Arc::clone(&self.expected_rejects),
}
}
#[cfg(test)]
fn new_with_transports(
config: SubmitBroadcasterConfig,
transports: Vec<TransportClient>,
) -> Self {
Self {
config,
transports: Arc::new(transports),
health_check_task: Arc::new(RwLock::new(None)),
running: Arc::new(AtomicBool::new(false)),
total_submits: Arc::new(AtomicU64::new(0)),
successful_submits: Arc::new(AtomicU64::new(0)),
failed_submits: Arc::new(AtomicU64::new(0)),
expected_rejects: Arc::new(AtomicU64::new(0)),
}
}
}
#[derive(Debug, Clone)]
pub struct BroadcasterMetrics {
pub total_submits: u64,
pub successful_submits: u64,
pub failed_submits: u64,
pub expected_rejects: u64,
pub healthy_clients: usize,
pub total_clients: usize,
}
#[derive(Debug, Clone)]
pub struct ClientStats {
pub client_id: String,
pub healthy: bool,
pub submit_count: u64,
pub error_count: u64,
}
#[cfg(test)]
mod tests {
use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
use nautilus_core::UUID4;
use nautilus_model::{
enums::{
ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
},
identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
reports::OrderStatusReport,
types::{Price, Quantity},
};
use super::*;
#[derive(Clone)]
#[allow(clippy::type_complexity)]
struct MockExecutor {
handler: Arc<
dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
+ Send
+ Sync,
>,
}
impl MockExecutor {
fn new<F, Fut>(handler: F) -> Self
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
{
Self {
handler: Arc::new(move || Box::pin(handler())),
}
}
}
impl SubmitExecutor for MockExecutor {
fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
Box::pin(async { Ok(()) })
}
#[allow(clippy::too_many_arguments)]
fn submit_order(
&self,
_instrument_id: InstrumentId,
_client_order_id: ClientOrderId,
_order_side: OrderSide,
_order_type: OrderType,
_quantity: Quantity,
_time_in_force: TimeInForce,
_price: Option<Price>,
_trigger_price: Option<Price>,
_trigger_type: Option<TriggerType>,
_trailing_offset: Option<f64>,
_trailing_offset_type: Option<TrailingOffsetType>,
_display_qty: Option<Quantity>,
_post_only: bool,
_reduce_only: bool,
_order_list_id: Option<OrderListId>,
_contingency_type: Option<ContingencyType>,
_peg_price_type: Option<BitmexPegPriceType>,
_peg_offset_value: Option<f64>,
) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
(self.handler)()
}
fn add_instrument(&self, _instrument: InstrumentAny) {
}
}
fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
OrderStatusReport {
account_id: AccountId::from("BITMEX-001"),
instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
venue_order_id: VenueOrderId::from(venue_order_id),
order_side: OrderSide::Buy,
order_type: OrderType::Limit,
time_in_force: TimeInForce::Gtc,
order_status: OrderStatus::Accepted,
price: Some(Price::new(50000.0, 2)),
quantity: Quantity::new(100.0, 0),
filled_qty: Quantity::new(0.0, 0),
report_id: UUID4::new(),
ts_accepted: 0.into(),
ts_last: 0.into(),
ts_init: 0.into(),
client_order_id: None,
avg_px: None,
trigger_price: None,
trigger_type: None,
contingency_type: ContingencyType::NoContingency,
expire_time: None,
order_list_id: None,
venue_position_id: None,
linked_order_ids: None,
parent_order_id: None,
display_qty: None,
limit_offset: None,
trailing_offset: None,
trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
post_only: false,
reduce_only: false,
cancel_reason: None,
ts_triggered: None,
}
}
fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
{
let executor = MockExecutor::new(handler);
TransportClient::new(executor, client_id.to_string())
}
#[tokio::test]
async fn test_broadcast_submit_immediate_success() {
let report = create_test_report("ORDER-1");
let report_clone = report.clone();
let transports = vec![
create_stub_transport("client-0", move || {
let report = report_clone.clone();
async move { Ok(report) }
}),
create_stub_transport("client-1", || async {
tokio::time::sleep(Duration::from_secs(10)).await;
anyhow::bail!("Should be aborted")
}),
];
let config = SubmitBroadcasterConfig::default();
let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
let result = broadcaster
.broadcast_submit(
instrument_id,
ClientOrderId::from("O-123"),
OrderSide::Buy,
OrderType::Limit,
Quantity::new(100.0, 0),
TimeInForce::Gtc,
Some(Price::new(50000.0, 2)),
None,
None,
None,
None,
None,
false,
false,
None,
None,
None,
None,
None,
)
.await;
assert!(result.is_ok());
let returned_report = result.unwrap();
assert_eq!(returned_report.venue_order_id, report.venue_order_id);
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.successful_submits, 1);
assert_eq!(metrics.failed_submits, 0);
assert_eq!(metrics.total_submits, 1);
}
#[tokio::test]
async fn test_broadcast_submit_duplicate_clordid_expected() {
let transports = vec![
create_stub_transport("client-0", || async { anyhow::bail!("Duplicate clOrdID") }),
create_stub_transport("client-1", || async {
tokio::time::sleep(Duration::from_secs(10)).await;
anyhow::bail!("Should be aborted")
}),
];
let config = SubmitBroadcasterConfig::default();
let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
let result = broadcaster
.broadcast_submit(
instrument_id,
ClientOrderId::from("O-123"),
OrderSide::Buy,
OrderType::Limit,
Quantity::new(100.0, 0),
TimeInForce::Gtc,
Some(Price::new(50000.0, 2)),
None,
None,
None,
None,
None,
false,
false,
None,
None,
None,
None,
None,
)
.await;
assert!(result.is_err());
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.expected_rejects, 1);
assert_eq!(metrics.successful_submits, 0);
assert_eq!(metrics.failed_submits, 1);
}
#[tokio::test]
async fn test_broadcast_submit_all_failures() {
let transports = vec![
create_stub_transport("client-0", || async { anyhow::bail!("502 Bad Gateway") }),
create_stub_transport("client-1", || async { anyhow::bail!("Connection refused") }),
];
let config = SubmitBroadcasterConfig::default();
let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
let result = broadcaster
.broadcast_submit(
instrument_id,
ClientOrderId::from("O-456"),
OrderSide::Sell,
OrderType::Market,
Quantity::new(50.0, 0),
TimeInForce::Ioc,
None,
None,
None,
None,
None,
None,
false,
false,
None,
None,
None,
None,
None,
)
.await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("All submit requests failed")
);
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.failed_submits, 1);
assert_eq!(metrics.successful_submits, 0);
}
#[tokio::test]
async fn test_broadcast_submit_no_healthy_clients() {
let transport =
create_stub_transport("client-0", || async { Ok(create_test_report("ORDER-1")) });
transport.healthy.store(false, Ordering::Relaxed);
let config = SubmitBroadcasterConfig::default();
let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![transport]);
let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
let result = broadcaster
.broadcast_submit(
instrument_id,
ClientOrderId::from("O-789"),
OrderSide::Buy,
OrderType::Limit,
Quantity::new(100.0, 0),
TimeInForce::Gtc,
Some(Price::new(50000.0, 2)),
None,
None,
None,
None,
None,
false,
false,
None,
None,
None,
None,
None,
)
.await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("No healthy transport clients available")
);
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.failed_submits, 1);
}
#[tokio::test]
async fn test_default_config() {
let report = create_test_report("ORDER-1");
let transports: Vec<TransportClient> = (0..3)
.map(|i| {
let r = report.clone();
create_stub_transport(&format!("client-{i}"), move || {
let r = r.clone();
async move { Ok(r) }
})
})
.collect();
let config = SubmitBroadcasterConfig::default();
let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.total_clients, 3);
}
#[tokio::test]
async fn test_broadcaster_lifecycle() {
let report = create_test_report("ORDER-1");
let transports: Vec<TransportClient> = (0..2)
.map(|i| {
let r = report.clone();
create_stub_transport(&format!("client-{i}"), move || {
let r = r.clone();
async move { Ok(r) }
})
})
.collect();
let config = SubmitBroadcasterConfig::default();
let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
assert!(!broadcaster.running.load(Ordering::Relaxed));
let start_result = broadcaster.start().await;
assert!(start_result.is_ok());
assert!(broadcaster.running.load(Ordering::Relaxed));
let start_again = broadcaster.start().await;
assert!(start_again.is_ok());
broadcaster.stop().await;
assert!(!broadcaster.running.load(Ordering::Relaxed));
broadcaster.stop().await;
assert!(!broadcaster.running.load(Ordering::Relaxed));
}
#[tokio::test]
async fn test_broadcast_submit_metrics_increment() {
let report = create_test_report("ORDER-1");
let report_clone = report.clone();
let transports = vec![create_stub_transport("client-0", move || {
let report = report_clone.clone();
async move { Ok(report) }
})];
let config = SubmitBroadcasterConfig::default();
let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
let _ = broadcaster
.broadcast_submit(
instrument_id,
ClientOrderId::from("O-123"),
OrderSide::Buy,
OrderType::Limit,
Quantity::new(100.0, 0),
TimeInForce::Gtc,
Some(Price::new(50000.0, 2)),
None,
None,
None,
None,
None,
false,
false,
None,
None,
None,
None,
None,
)
.await;
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.total_submits, 1);
assert_eq!(metrics.successful_submits, 1);
assert_eq!(metrics.failed_submits, 0);
}
#[tokio::test]
async fn test_broadcaster_creation_with_pool() {
let report = create_test_report("ORDER-1");
let transports: Vec<TransportClient> = (0..4)
.map(|i| {
let r = report.clone();
create_stub_transport(&format!("client-{i}"), move || {
let r = r.clone();
async move { Ok(r) }
})
})
.collect();
let config = SubmitBroadcasterConfig::default();
let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.total_clients, 4);
}
#[tokio::test]
async fn test_client_stats_collection() {
let transports = vec![
create_stub_transport("client-0", || async { anyhow::bail!("Timeout error") }),
create_stub_transport("client-1", || async { anyhow::bail!("Connection error") }),
];
let config = SubmitBroadcasterConfig::default();
let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
let _ = broadcaster
.broadcast_submit(
instrument_id,
ClientOrderId::from("O-123"),
OrderSide::Buy,
OrderType::Limit,
Quantity::new(100.0, 0),
TimeInForce::Gtc,
Some(Price::new(50000.0, 2)),
None,
None,
None,
None,
None,
false,
false,
None,
None,
None,
None,
None,
)
.await;
let stats = broadcaster.get_client_stats_async().await;
assert_eq!(stats.len(), 2);
let client0 = stats.iter().find(|s| s.client_id == "client-0").unwrap();
assert_eq!(client0.submit_count, 1);
assert_eq!(client0.error_count, 1);
let client1 = stats.iter().find(|s| s.client_id == "client-1").unwrap();
assert_eq!(client1.submit_count, 1);
assert_eq!(client1.error_count, 1);
}
#[tokio::test]
async fn test_testnet_config_sets_base_url() {
let config = SubmitBroadcasterConfig {
pool_size: 1,
api_key: Some("test_key".to_string()),
api_secret: Some("test_secret".to_string()),
testnet: true,
base_url: None,
..Default::default()
};
let broadcaster = SubmitBroadcaster::new(config);
assert!(broadcaster.is_ok());
}
#[tokio::test]
async fn test_constructor_honors_default_pool_size() {
let config = SubmitBroadcasterConfig {
api_key: Some("test_key".to_string()),
api_secret: Some("test_secret".to_string()),
base_url: Some("http://127.0.0.1:19999".to_string()),
..Default::default()
};
let expected_pool = config.pool_size;
let broadcaster = SubmitBroadcaster::new(config).unwrap();
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.total_clients, expected_pool);
}
#[tokio::test]
async fn test_clone_for_async() {
let report = create_test_report("ORDER-1");
let transports = vec![create_stub_transport("client-0", move || {
let r = report.clone();
async move { Ok(r) }
})];
let config = SubmitBroadcasterConfig::default();
let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
let cloned = broadcaster.clone_for_async();
broadcaster.total_submits.fetch_add(1, Ordering::Relaxed);
assert_eq!(cloned.total_submits.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn test_pattern_matching() {
let config = SubmitBroadcasterConfig {
expected_reject_patterns: vec![
"Duplicate clOrdID".to_string(),
"Order already exists".to_string(),
],
..Default::default()
};
let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![]);
assert!(broadcaster.is_expected_reject("Error: Duplicate clOrdID for order"));
assert!(broadcaster.is_expected_reject("Order already exists in system"));
assert!(!broadcaster.is_expected_reject("Rate limit exceeded"));
assert!(!broadcaster.is_expected_reject("Internal server error"));
}
#[tokio::test]
async fn test_submit_metrics_with_mixed_responses() {
let report = create_test_report("ORDER-1");
let report_clone = report.clone();
let transports = vec![
create_stub_transport("client-0", move || {
let report = report_clone.clone();
async move { Ok(report) }
}),
create_stub_transport("client-1", || async { anyhow::bail!("Timeout") }),
];
let config = SubmitBroadcasterConfig::default();
let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
let result = broadcaster
.broadcast_submit(
instrument_id,
ClientOrderId::from("O-123"),
OrderSide::Buy,
OrderType::Limit,
Quantity::new(100.0, 0),
TimeInForce::Gtc,
Some(Price::new(50000.0, 2)),
None,
None,
None,
None,
None,
false,
false,
None,
None,
None,
None,
None,
)
.await;
assert!(result.is_ok());
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.total_submits, 1);
assert_eq!(metrics.successful_submits, 1);
assert_eq!(metrics.failed_submits, 0);
}
#[tokio::test]
async fn test_metrics_initialization_and_health() {
let report = create_test_report("ORDER-1");
let transports: Vec<TransportClient> = (0..2)
.map(|i| {
let r = report.clone();
create_stub_transport(&format!("client-{i}"), move || {
let r = r.clone();
async move { Ok(r) }
})
})
.collect();
let config = SubmitBroadcasterConfig::default();
let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.total_submits, 0);
assert_eq!(metrics.successful_submits, 0);
assert_eq!(metrics.failed_submits, 0);
assert_eq!(metrics.expected_rejects, 0);
assert_eq!(metrics.total_clients, 2);
assert_eq!(metrics.healthy_clients, 2);
}
#[tokio::test]
async fn test_health_check_task_lifecycle() {
let report = create_test_report("ORDER-1");
let transports: Vec<TransportClient> = (0..2)
.map(|i| {
let r = report.clone();
create_stub_transport(&format!("client-{i}"), move || {
let r = r.clone();
async move { Ok(r) }
})
})
.collect();
let config = SubmitBroadcasterConfig::default();
let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
broadcaster.start().await.unwrap();
assert!(broadcaster.running.load(Ordering::Relaxed));
assert!(
broadcaster
.health_check_task
.read()
.await
.as_ref()
.is_some()
);
broadcaster.stop().await;
assert!(!broadcaster.running.load(Ordering::Relaxed));
}
#[tokio::test]
async fn test_expected_reject_pattern_comprehensive() {
let transports = vec![
create_stub_transport("client-0", || async {
anyhow::bail!("Duplicate clOrdID: O-123 already exists")
}),
create_stub_transport("client-1", || async {
tokio::time::sleep(Duration::from_secs(10)).await;
anyhow::bail!("Should be aborted")
}),
];
let config = SubmitBroadcasterConfig::default();
let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
let result = broadcaster
.broadcast_submit(
instrument_id,
ClientOrderId::from("O-123"),
OrderSide::Buy,
OrderType::Limit,
Quantity::new(100.0, 0),
TimeInForce::Gtc,
Some(Price::new(50000.0, 2)),
None,
None,
None,
None,
None,
false,
false,
None,
None,
None,
None,
None,
)
.await;
assert!(result.is_err());
let metrics = broadcaster.get_metrics_async().await;
assert_eq!(metrics.expected_rejects, 1);
assert_eq!(metrics.failed_submits, 1);
assert_eq!(metrics.successful_submits, 0);
}
#[tokio::test]
async fn test_client_order_id_suffix_for_multiple_clients() {
use std::sync::{Arc, Mutex};
#[derive(Clone)]
struct CaptureExecutor {
captured_ids: Arc<Mutex<Vec<String>>>,
barrier: Arc<tokio::sync::Barrier>,
report: OrderStatusReport,
}
impl SubmitExecutor for CaptureExecutor {
fn health_check(
&self,
) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
Box::pin(async { Ok(()) })
}
#[allow(clippy::too_many_arguments)]
fn submit_order(
&self,
_instrument_id: InstrumentId,
client_order_id: ClientOrderId,
_order_side: OrderSide,
_order_type: OrderType,
_quantity: Quantity,
_time_in_force: TimeInForce,
_price: Option<Price>,
_trigger_price: Option<Price>,
_trigger_type: Option<TriggerType>,
_trailing_offset: Option<f64>,
_trailing_offset_type: Option<TrailingOffsetType>,
_display_qty: Option<Quantity>,
_post_only: bool,
_reduce_only: bool,
_order_list_id: Option<OrderListId>,
_contingency_type: Option<ContingencyType>,
_peg_price_type: Option<BitmexPegPriceType>,
_peg_offset_value: Option<f64>,
) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
{
self.captured_ids
.lock()
.unwrap()
.push(client_order_id.as_str().to_string());
let report = self.report.clone();
let barrier = Arc::clone(&self.barrier);
Box::pin(async move {
barrier.wait().await;
Ok(report)
})
}
fn add_instrument(&self, _instrument: InstrumentAny) {}
}
let captured_ids = Arc::new(Mutex::new(Vec::new()));
let barrier = Arc::new(tokio::sync::Barrier::new(3));
let report = create_test_report("ORDER-1");
let transports = vec![
TransportClient::new(
CaptureExecutor {
captured_ids: Arc::clone(&captured_ids),
barrier: Arc::clone(&barrier),
report: report.clone(),
},
"client-0".to_string(),
),
TransportClient::new(
CaptureExecutor {
captured_ids: Arc::clone(&captured_ids),
barrier: Arc::clone(&barrier),
report: report.clone(),
},
"client-1".to_string(),
),
TransportClient::new(
CaptureExecutor {
captured_ids: Arc::clone(&captured_ids),
barrier: Arc::clone(&barrier),
report: report.clone(),
},
"client-2".to_string(),
),
];
let config = SubmitBroadcasterConfig::default();
let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
let result = broadcaster
.broadcast_submit(
instrument_id,
ClientOrderId::from("O-123"),
OrderSide::Buy,
OrderType::Limit,
Quantity::new(100.0, 0),
TimeInForce::Gtc,
Some(Price::new(50000.0, 2)),
None,
None,
None,
None,
None,
false,
false,
None,
None,
None,
None,
None,
)
.await;
assert!(result.is_ok());
let ids = captured_ids.lock().unwrap();
assert_eq!(ids.len(), 3);
assert!(ids.iter().all(|id| id == "O-123")); }
#[tokio::test]
async fn test_client_order_id_suffix_with_partial_failure() {
use std::sync::{Arc, Mutex};
#[derive(Clone)]
struct CaptureAndFailExecutor {
captured_ids: Arc<Mutex<Vec<String>>>,
barrier: Arc<tokio::sync::Barrier>,
should_succeed: bool,
}
impl SubmitExecutor for CaptureAndFailExecutor {
fn health_check(
&self,
) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
Box::pin(async { Ok(()) })
}
#[allow(clippy::too_many_arguments)]
fn submit_order(
&self,
_instrument_id: InstrumentId,
client_order_id: ClientOrderId,
_order_side: OrderSide,
_order_type: OrderType,
_quantity: Quantity,
_time_in_force: TimeInForce,
_price: Option<Price>,
_trigger_price: Option<Price>,
_trigger_type: Option<TriggerType>,
_trailing_offset: Option<f64>,
_trailing_offset_type: Option<TrailingOffsetType>,
_display_qty: Option<Quantity>,
_post_only: bool,
_reduce_only: bool,
_order_list_id: Option<OrderListId>,
_contingency_type: Option<ContingencyType>,
_peg_price_type: Option<BitmexPegPriceType>,
_peg_offset_value: Option<f64>,
) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
{
self.captured_ids
.lock()
.unwrap()
.push(client_order_id.as_str().to_string());
let barrier = Arc::clone(&self.barrier);
let should_succeed = self.should_succeed;
Box::pin(async move {
barrier.wait().await;
if should_succeed {
Ok(create_test_report("ORDER-1"))
} else {
anyhow::bail!("Network error")
}
})
}
fn add_instrument(&self, _instrument: InstrumentAny) {}
}
let captured_ids = Arc::new(Mutex::new(Vec::new()));
let barrier = Arc::new(tokio::sync::Barrier::new(2));
let transports = vec![
TransportClient::new(
CaptureAndFailExecutor {
captured_ids: Arc::clone(&captured_ids),
barrier: Arc::clone(&barrier),
should_succeed: false,
},
"client-0".to_string(),
),
TransportClient::new(
CaptureAndFailExecutor {
captured_ids: Arc::clone(&captured_ids),
barrier: Arc::clone(&barrier),
should_succeed: true,
},
"client-1".to_string(),
),
];
let config = SubmitBroadcasterConfig::default();
let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
let result = broadcaster
.broadcast_submit(
instrument_id,
ClientOrderId::from("O-456"),
OrderSide::Sell,
OrderType::Market,
Quantity::new(50.0, 0),
TimeInForce::Ioc,
None,
None,
None,
None,
None,
None,
false,
false,
None,
None,
None,
None,
None,
)
.await;
assert!(result.is_ok());
let ids = captured_ids.lock().unwrap();
assert_eq!(ids.len(), 2);
assert!(ids.iter().all(|id| id == "O-456")); }
#[tokio::test]
async fn test_proxy_urls_populated_from_config() {
let config = SubmitBroadcasterConfig {
pool_size: 3,
api_key: Some("test_key".to_string()),
api_secret: Some("test_secret".to_string()),
proxy_urls: vec![
Some("http://proxy1:8080".to_string()),
Some("http://proxy2:8080".to_string()),
Some("http://proxy3:8080".to_string()),
],
..Default::default()
};
assert_eq!(config.proxy_urls.len(), 3);
assert_eq!(config.proxy_urls[0], Some("http://proxy1:8080".to_string()));
assert_eq!(config.proxy_urls[1], Some("http://proxy2:8080".to_string()));
assert_eq!(config.proxy_urls[2], Some("http://proxy3:8080".to_string()));
}
}