1use tokio::sync::broadcast;
2
3use bat_markets_core::{
4 AccountSummary, Balance, BookTop, CommandLaneEvent, CommandOperation, CommandReceipt,
5 ErrorKind, Execution, FundingRate, HealthNotification, InstrumentId, InstrumentSpec,
6 Liquidation, MarkPrice, OpenInterest, Order, Position, PrivateLaneEvent, PublicLaneEvent,
7 ReconcileReport, ReconcileTrigger, RequestId, Result, Ticker, TradeTick,
8};
9
10use crate::{
11 client::BatMarkets, diagnostics::RuntimeDiagnosticsSnapshot, native::NativeClient, runtime,
12};
13
14pub struct AdvancedClient<'a> {
21 inner: &'a BatMarkets,
22}
23
24impl<'a> AdvancedClient<'a> {
25 pub(crate) const fn new(inner: &'a BatMarkets) -> Self {
26 Self { inner }
27 }
28
29 pub fn ingest_public_json(&self, payload: &str) -> Result<Vec<PublicLaneEvent>> {
31 let events = self.inner.adapter.as_adapter().parse_public(payload)?;
32 self.inner.shared.apply_public_events(&events)?;
33 Ok(events)
34 }
35
36 pub fn ingest_private_json(&self, payload: &str) -> Result<Vec<PrivateLaneEvent>> {
38 let events = self.inner.adapter.as_adapter().parse_private(payload)?;
39 self.inner.shared.apply_private_events(&events);
40 Ok(events)
41 }
42
43 #[must_use]
45 pub fn subscribe_public_events(&self) -> broadcast::Receiver<PublicLaneEvent> {
46 self.inner.shared.subscribe_public_events()
47 }
48
49 #[must_use]
51 pub fn subscribe_private_events(&self) -> broadcast::Receiver<PrivateLaneEvent> {
52 self.inner.shared.subscribe_private_events()
53 }
54
55 #[must_use]
57 pub fn subscribe_command_events(&self) -> broadcast::Receiver<CommandLaneEvent> {
58 self.inner.shared.subscribe_command_events()
59 }
60
61 #[must_use]
66 pub fn subscribe_health_notifications(&self) -> broadcast::Receiver<HealthNotification> {
67 self.inner.shared.subscribe_health_notifications()
68 }
69
70 pub fn require_instrument(&self, instrument_id: &InstrumentId) -> Result<InstrumentSpec> {
72 self.inner
73 .instrument_specs()
74 .into_iter()
75 .find(|spec| &spec.instrument_id == instrument_id)
76 .ok_or_else(|| {
77 bat_markets_core::MarketError::new(
78 ErrorKind::Unsupported,
79 format!("unknown instrument {instrument_id}"),
80 )
81 })
82 }
83
84 #[must_use]
86 pub fn cached_ticker(&self, instrument_id: &InstrumentId) -> Option<Ticker> {
87 self.inner
88 .read_state(|state| state.ticker(instrument_id).cloned())
89 }
90
91 #[must_use]
93 pub fn cached_recent_trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
94 self.inner
95 .read_state(|state| state.recent_trades(instrument_id))
96 }
97
98 #[must_use]
100 pub fn cached_book_top(&self, instrument_id: &InstrumentId) -> Option<BookTop> {
101 self.inner
102 .read_state(|state| state.book_top(instrument_id).cloned())
103 }
104
105 #[must_use]
107 pub fn cached_funding_rate(&self, instrument_id: &InstrumentId) -> Option<FundingRate> {
108 self.inner
109 .read_state(|state| state.funding_rate(instrument_id).cloned())
110 }
111
112 #[must_use]
114 pub fn cached_mark_price(&self, instrument_id: &InstrumentId) -> Option<MarkPrice> {
115 self.inner
116 .read_state(|state| state.mark_price(instrument_id).cloned())
117 }
118
119 #[must_use]
121 pub fn cached_open_interest(&self, instrument_id: &InstrumentId) -> Option<OpenInterest> {
122 self.inner
123 .read_state(|state| state.open_interest(instrument_id).cloned())
124 }
125
126 #[must_use]
128 pub fn cached_liquidations(&self, instrument_id: &InstrumentId) -> Option<Vec<Liquidation>> {
129 self.inner
130 .read_state(|state| state.liquidations(instrument_id))
131 }
132
133 #[must_use]
135 pub fn cached_balances(&self) -> Vec<Balance> {
136 self.inner
137 .read_state(bat_markets_core::EngineState::balances)
138 }
139
140 #[must_use]
142 pub fn cached_account_summary(&self) -> Option<AccountSummary> {
143 self.inner
144 .read_state(bat_markets_core::EngineState::account_summary)
145 }
146
147 #[must_use]
149 pub fn cached_positions(&self) -> Vec<Position> {
150 self.inner
151 .read_state(bat_markets_core::EngineState::positions)
152 }
153
154 #[must_use]
156 pub fn cached_orders(&self) -> Vec<Order> {
157 self.inner.read_state(bat_markets_core::EngineState::orders)
158 }
159
160 #[must_use]
162 pub fn cached_open_orders(&self) -> Vec<Order> {
163 self.inner
164 .read_state(bat_markets_core::EngineState::open_orders)
165 }
166
167 #[must_use]
169 pub fn cached_executions(&self) -> Vec<Execution> {
170 self.inner
171 .read_state(bat_markets_core::EngineState::executions)
172 }
173
174 pub fn classify_command_json(
179 &self,
180 operation: CommandOperation,
181 payload: Option<&str>,
182 request_id: Option<RequestId>,
183 ) -> Result<CommandReceipt> {
184 let receipt = self
185 .inner
186 .adapter
187 .as_adapter()
188 .classify_command(operation, payload, request_id)?;
189 self.inner
190 .write_state(|state| state.apply_command_receipt(&receipt));
191 self.inner
192 .shared
193 .emit_command_event(CommandLaneEvent::Receipt(receipt.clone()));
194 Ok(receipt)
195 }
196
197 pub async fn reconcile(&self) -> Result<ReconcileReport> {
199 runtime::reconcile_private(&self.inner.live_context(), ReconcileTrigger::Manual).await
200 }
201
202 #[must_use]
204 pub fn diagnostics(&self) -> RuntimeDiagnosticsSnapshot {
205 let mut snapshot = self.inner.runtime_state.diagnostics.snapshot();
206 snapshot.state_reads = self.inner.shared.read_diagnostics();
207 snapshot.state_writes = self.inner.shared.write_diagnostics();
208 snapshot
209 }
210
211 #[must_use]
213 pub fn native(&self) -> NativeClient<'_> {
214 NativeClient::new(&self.inner.adapter)
215 }
216}