use tokio::sync::broadcast;
use bat_markets_core::{
AccountSummary, Balance, BookTop, CommandLaneEvent, CommandOperation, CommandReceipt,
ErrorKind, Execution, FundingRate, HealthNotification, InstrumentId, InstrumentSpec,
Liquidation, MarkPrice, OpenInterest, Order, Position, PrivateLaneEvent, PublicLaneEvent,
ReconcileReport, ReconcileTrigger, RequestId, Result, Ticker, TradeTick,
};
use crate::{
client::BatMarkets, diagnostics::RuntimeDiagnosticsSnapshot, native::NativeClient, runtime,
};
pub struct AdvancedClient<'a> {
inner: &'a BatMarkets,
}
impl<'a> AdvancedClient<'a> {
pub(crate) const fn new(inner: &'a BatMarkets) -> Self {
Self { inner }
}
pub fn ingest_public_json(&self, payload: &str) -> Result<Vec<PublicLaneEvent>> {
let events = self.inner.adapter.as_adapter().parse_public(payload)?;
self.inner.shared.apply_public_events(&events)?;
Ok(events)
}
pub fn ingest_private_json(&self, payload: &str) -> Result<Vec<PrivateLaneEvent>> {
let events = self.inner.adapter.as_adapter().parse_private(payload)?;
self.inner.shared.apply_private_events(&events);
Ok(events)
}
#[must_use]
pub fn subscribe_public_events(&self) -> broadcast::Receiver<PublicLaneEvent> {
self.inner.shared.subscribe_public_events()
}
#[must_use]
pub fn subscribe_private_events(&self) -> broadcast::Receiver<PrivateLaneEvent> {
self.inner.shared.subscribe_private_events()
}
#[must_use]
pub fn subscribe_command_events(&self) -> broadcast::Receiver<CommandLaneEvent> {
self.inner.shared.subscribe_command_events()
}
#[must_use]
pub fn subscribe_health_notifications(&self) -> broadcast::Receiver<HealthNotification> {
self.inner.shared.subscribe_health_notifications()
}
pub fn require_instrument(&self, instrument_id: &InstrumentId) -> Result<InstrumentSpec> {
self.inner
.instrument_specs()
.into_iter()
.find(|spec| &spec.instrument_id == instrument_id)
.ok_or_else(|| {
bat_markets_core::MarketError::new(
ErrorKind::Unsupported,
format!("unknown instrument {instrument_id}"),
)
})
}
#[must_use]
pub fn cached_ticker(&self, instrument_id: &InstrumentId) -> Option<Ticker> {
self.inner
.read_state(|state| state.ticker(instrument_id).cloned())
}
#[must_use]
pub fn cached_recent_trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
self.inner
.read_state(|state| state.recent_trades(instrument_id))
}
#[must_use]
pub fn cached_book_top(&self, instrument_id: &InstrumentId) -> Option<BookTop> {
self.inner
.read_state(|state| state.book_top(instrument_id).cloned())
}
#[must_use]
pub fn cached_funding_rate(&self, instrument_id: &InstrumentId) -> Option<FundingRate> {
self.inner
.read_state(|state| state.funding_rate(instrument_id).cloned())
}
#[must_use]
pub fn cached_mark_price(&self, instrument_id: &InstrumentId) -> Option<MarkPrice> {
self.inner
.read_state(|state| state.mark_price(instrument_id).cloned())
}
#[must_use]
pub fn cached_open_interest(&self, instrument_id: &InstrumentId) -> Option<OpenInterest> {
self.inner
.read_state(|state| state.open_interest(instrument_id).cloned())
}
#[must_use]
pub fn cached_liquidations(&self, instrument_id: &InstrumentId) -> Option<Vec<Liquidation>> {
self.inner
.read_state(|state| state.liquidations(instrument_id))
}
#[must_use]
pub fn cached_balances(&self) -> Vec<Balance> {
self.inner
.read_state(bat_markets_core::EngineState::balances)
}
#[must_use]
pub fn cached_account_summary(&self) -> Option<AccountSummary> {
self.inner
.read_state(bat_markets_core::EngineState::account_summary)
}
#[must_use]
pub fn cached_positions(&self) -> Vec<Position> {
self.inner
.read_state(bat_markets_core::EngineState::positions)
}
#[must_use]
pub fn cached_orders(&self) -> Vec<Order> {
self.inner.read_state(bat_markets_core::EngineState::orders)
}
#[must_use]
pub fn cached_open_orders(&self) -> Vec<Order> {
self.inner
.read_state(bat_markets_core::EngineState::open_orders)
}
#[must_use]
pub fn cached_executions(&self) -> Vec<Execution> {
self.inner
.read_state(bat_markets_core::EngineState::executions)
}
pub fn classify_command_json(
&self,
operation: CommandOperation,
payload: Option<&str>,
request_id: Option<RequestId>,
) -> Result<CommandReceipt> {
let receipt = self
.inner
.adapter
.as_adapter()
.classify_command(operation, payload, request_id)?;
self.inner
.write_state(|state| state.apply_command_receipt(&receipt));
self.inner
.shared
.emit_command_event(CommandLaneEvent::Receipt(receipt.clone()));
Ok(receipt)
}
pub async fn reconcile(&self) -> Result<ReconcileReport> {
runtime::reconcile_private(&self.inner.live_context(), ReconcileTrigger::Manual).await
}
#[must_use]
pub fn diagnostics(&self) -> RuntimeDiagnosticsSnapshot {
let mut snapshot = self.inner.runtime_state.diagnostics.snapshot();
snapshot.state_reads = self.inner.shared.read_diagnostics();
snapshot.state_writes = self.inner.shared.write_diagnostics();
snapshot
}
#[must_use]
pub fn native(&self) -> NativeClient<'_> {
NativeClient::new(&self.inner.adapter)
}
}