use tokio::sync::broadcast;
use bat_markets_core::{
AmendOrderRequest, AmendOrdersRequest, CancelAllOrdersRequest, CancelOrderRequest,
CancelOrdersRequest, ClosePositionRequest, CommandAck, CommandLaneEvent, CommandLifecycleEvent,
CommandReceipt, CommandStatus, CommandTransport, CreateOrderRequest, CreateOrdersRequest,
ErrorKind, ReconcileOutcome, ReconcileReport, Result, SetLeverageRequest, SetMarginModeRequest,
SetPositionModeRequest, ValidateOrderRequest,
};
use crate::{client::BatMarkets, runtime};
pub struct PendingCommandHandle {
ack: CommandAck,
receiver: broadcast::Receiver<CommandLaneEvent>,
initial_receipt_pending: bool,
}
impl PendingCommandHandle {
pub(crate) fn from_ack(
ack: CommandAck,
receiver: broadcast::Receiver<CommandLaneEvent>,
) -> Self {
Self {
ack,
receiver,
initial_receipt_pending: true,
}
}
pub(crate) fn from_receipt(
receipt: CommandReceipt,
transport: CommandTransport,
receiver: broadcast::Receiver<CommandLaneEvent>,
) -> Self {
Self::from_ack(
CommandAck {
receipt,
transport,
acknowledged_at: timestamp_now_ms(),
},
receiver,
)
}
#[must_use]
pub const fn ack(&self) -> &CommandAck {
&self.ack
}
pub async fn receipt(&mut self) -> Result<CommandReceipt> {
if self.initial_receipt_pending {
self.initial_receipt_pending = false;
return Ok(self.ack.receipt.clone());
}
loop {
let event = self.receiver_mut().recv().await.map_err(|error| {
bat_markets_core::MarketError::new(
ErrorKind::TransportError,
format!("command receipt receive failed: {error}"),
)
})?;
let CommandLaneEvent::Receipt(receipt) = event else {
continue;
};
if matches_receipt(&self.ack.receipt, &receipt) {
return Ok(receipt);
}
}
}
pub async fn next_lifecycle(&mut self) -> Result<CommandLifecycleEvent> {
loop {
let event = self.receiver_mut().recv().await.map_err(|error| {
bat_markets_core::MarketError::new(
ErrorKind::TransportError,
format!("command lifecycle receive failed: {error}"),
)
})?;
let CommandLaneEvent::Lifecycle(lifecycle) = event else {
continue;
};
if matches_lifecycle(&self.ack.receipt, &lifecycle) {
return Ok(lifecycle);
}
}
}
pub async fn resolved(&mut self) -> Result<CommandReceipt> {
if self.ack.receipt.status != CommandStatus::UnknownExecution {
return Ok(self.ack.receipt.clone());
}
loop {
let lifecycle = self.next_lifecycle().await?;
if let CommandLifecycleEvent::RecoveryCompleted { report, .. } = lifecycle {
return Ok(receipt_after_recovery(&self.ack.receipt, &report));
}
}
}
fn receiver_mut(&mut self) -> &mut broadcast::Receiver<CommandLaneEvent> {
&mut self.receiver
}
}
pub struct EntryClient<'a> {
inner: &'a BatMarkets,
}
impl<'a> EntryClient<'a> {
pub(crate) const fn new(inner: &'a BatMarkets) -> Self {
Self { inner }
}
#[must_use]
pub fn subscribe(&self) -> broadcast::Receiver<CommandLaneEvent> {
self.inner.shared.subscribe_command_events()
}
pub async fn create_order(&self, request: &CreateOrderRequest) -> Result<PendingCommandHandle> {
let receiver = self.inner.shared.subscribe_command_events();
let ack = runtime::create_order(&self.inner.live_context(), request).await?;
Ok(PendingCommandHandle::from_ack(ack, receiver))
}
pub async fn create_orders(
&self,
request: &CreateOrdersRequest,
) -> Result<Vec<PendingCommandHandle>> {
let mut receivers = pre_subscribe_command_receivers(self.inner, request.orders.len());
let acks = runtime::create_orders(&self.inner.live_context(), request).await?;
Ok(acks
.into_iter()
.zip(receivers.drain(..))
.map(|(ack, receiver)| PendingCommandHandle::from_ack(ack, receiver))
.collect())
}
pub async fn amend_order(&self, request: &AmendOrderRequest) -> Result<PendingCommandHandle> {
let receiver = self.inner.shared.subscribe_command_events();
let ack = runtime::amend_order(&self.inner.live_context(), request).await?;
Ok(PendingCommandHandle::from_ack(ack, receiver))
}
pub async fn amend_orders(
&self,
request: &AmendOrdersRequest,
) -> Result<Vec<PendingCommandHandle>> {
let mut receivers = pre_subscribe_command_receivers(self.inner, request.orders.len());
let acks = runtime::amend_orders(&self.inner.live_context(), request).await?;
Ok(acks
.into_iter()
.zip(receivers.drain(..))
.map(|(ack, receiver)| PendingCommandHandle::from_ack(ack, receiver))
.collect())
}
pub async fn cancel_order(&self, request: &CancelOrderRequest) -> Result<PendingCommandHandle> {
let receiver = self.inner.shared.subscribe_command_events();
let ack = runtime::cancel_order(&self.inner.live_context(), request).await?;
Ok(PendingCommandHandle::from_ack(ack, receiver))
}
pub async fn cancel_orders(
&self,
request: &CancelOrdersRequest,
) -> Result<Vec<PendingCommandHandle>> {
let mut receivers = pre_subscribe_command_receivers(self.inner, request.orders.len());
let acks = runtime::cancel_orders(&self.inner.live_context(), request).await?;
Ok(acks
.into_iter()
.zip(receivers.drain(..))
.map(|(ack, receiver)| PendingCommandHandle::from_ack(ack, receiver))
.collect())
}
pub async fn cancel_all_orders(
&self,
request: &CancelAllOrdersRequest,
) -> Result<PendingCommandHandle> {
let receiver = self.inner.shared.subscribe_command_events();
let receipt = runtime::cancel_all_orders(&self.inner.live_context(), request).await?;
Ok(PendingCommandHandle::from_receipt(
receipt,
CommandTransport::Rest,
receiver,
))
}
pub async fn close_position(
&self,
request: &ClosePositionRequest,
) -> Result<PendingCommandHandle> {
let receiver = self.inner.shared.subscribe_command_events();
let ack = runtime::close_position(&self.inner.live_context(), request).await?;
Ok(PendingCommandHandle::from_ack(ack, receiver))
}
pub async fn validate_order(
&self,
request: &ValidateOrderRequest,
) -> Result<PendingCommandHandle> {
let receiver = self.inner.shared.subscribe_command_events();
let receipt = runtime::validate_order(&self.inner.live_context(), request).await?;
Ok(PendingCommandHandle::from_receipt(
receipt,
CommandTransport::Rest,
receiver,
))
}
pub async fn set_leverage(&self, request: &SetLeverageRequest) -> Result<PendingCommandHandle> {
let receiver = self.inner.shared.subscribe_command_events();
let receipt = runtime::set_leverage(&self.inner.live_context(), request).await?;
Ok(PendingCommandHandle::from_receipt(
receipt,
CommandTransport::Rest,
receiver,
))
}
pub async fn set_margin_mode(
&self,
request: &SetMarginModeRequest,
) -> Result<PendingCommandHandle> {
let receiver = self.inner.shared.subscribe_command_events();
let receipt = runtime::set_margin_mode(&self.inner.live_context(), request).await?;
Ok(PendingCommandHandle::from_receipt(
receipt,
CommandTransport::Rest,
receiver,
))
}
pub async fn set_position_mode(
&self,
request: &SetPositionModeRequest,
) -> Result<PendingCommandHandle> {
let receiver = self.inner.shared.subscribe_command_events();
let receipt = runtime::set_position_mode(&self.inner.live_context(), request).await?;
Ok(PendingCommandHandle::from_receipt(
receipt,
CommandTransport::Rest,
receiver,
))
}
}
fn pre_subscribe_command_receivers(
inner: &BatMarkets,
count: usize,
) -> Vec<broadcast::Receiver<CommandLaneEvent>> {
(0..count)
.map(|_| inner.shared.subscribe_command_events())
.collect()
}
fn matches_lifecycle(receipt: &CommandReceipt, lifecycle: &CommandLifecycleEvent) -> bool {
match lifecycle {
CommandLifecycleEvent::Ack(ack)
| CommandLifecycleEvent::RecoveryScheduled(ack)
| CommandLifecycleEvent::RecoveryCompleted { ack, .. } => {
matches_receipt(receipt, &ack.receipt)
}
CommandLifecycleEvent::Receipt(other) => matches_receipt(receipt, other),
}
}
fn matches_receipt(left: &CommandReceipt, right: &CommandReceipt) -> bool {
if left.operation != right.operation {
return false;
}
if let (Some(left_id), Some(right_id)) = (&left.order_id, &right.order_id) {
return left_id == right_id;
}
if let (Some(left_id), Some(right_id)) = (&left.client_order_id, &right.client_order_id) {
return left_id == right_id;
}
if let (Some(left_id), Some(right_id)) = (&left.request_id, &right.request_id) {
return left_id == right_id;
}
left.instrument_id == right.instrument_id
}
fn receipt_after_recovery(receipt: &CommandReceipt, report: &ReconcileReport) -> CommandReceipt {
let mut resolved = receipt.clone();
match report.outcome {
ReconcileOutcome::Synchronized => {
resolved.status = CommandStatus::Accepted;
resolved.retriable = false;
resolved.message = Some(
report
.note
.clone()
.unwrap_or_else(|| "command outcome resolved by reconcile".into()),
);
}
ReconcileOutcome::StillUncertain | ReconcileOutcome::Diverged => {
resolved.status = CommandStatus::UnknownExecution;
resolved.retriable = true;
resolved.message =
Some(report.note.clone().unwrap_or_else(|| {
"command outcome remains unresolved after reconcile".into()
}));
}
}
resolved
}
fn timestamp_now_ms() -> bat_markets_core::TimestampMs {
let millis = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|duration| duration.as_millis())
.unwrap_or_default()
.min(i64::MAX as u128) as i64;
bat_markets_core::TimestampMs::new(millis)
}
#[cfg(test)]
mod tests {
use bat_markets_core::{
CommandOperation, CommandReceipt, CommandStatus, Product, ReconcileOutcome,
ReconcileReport, ReconcileTrigger, TimestampMs, Venue,
};
#[test]
fn recovery_synchronized_returns_accepted_non_retriable_receipt() {
let receipt = unknown_receipt();
let report = report(
ReconcileOutcome::Synchronized,
"recent history resolved command",
);
let resolved = super::receipt_after_recovery(&receipt, &report);
assert_eq!(resolved.status, CommandStatus::Accepted);
assert!(!resolved.retriable);
assert_eq!(
resolved.message.as_deref(),
Some("recent history resolved command")
);
}
#[test]
fn recovery_still_uncertain_keeps_receipt_explicitly_unknown() {
let receipt = unknown_receipt();
let report = report(
ReconcileOutcome::StillUncertain,
"1 pending command outcomes still unresolved",
);
let resolved = super::receipt_after_recovery(&receipt, &report);
assert_eq!(resolved.status, CommandStatus::UnknownExecution);
assert!(resolved.retriable);
assert_eq!(
resolved.message.as_deref(),
Some("1 pending command outcomes still unresolved")
);
}
fn unknown_receipt() -> CommandReceipt {
CommandReceipt {
operation: CommandOperation::CreateOrder,
status: CommandStatus::UnknownExecution,
venue: Venue::Binance,
product: Product::LinearUsdt,
instrument_id: None,
order_id: None,
client_order_id: None,
request_id: None,
message: Some("command outcome requires reconcile".into()),
native_code: None,
retriable: true,
}
}
fn report(outcome: ReconcileOutcome, note: &str) -> ReconcileReport {
ReconcileReport {
trigger: ReconcileTrigger::UnknownExecution,
outcome,
repaired_at: TimestampMs::new(1),
note: Some(note.into()),
}
}
}