Skip to main content

bat_markets/
advanced.rs

1use tokio::sync::broadcast;
2
3use bat_markets_core::{
4    AccountSummary, Balance, BookTop, CommandLaneEvent, CommandOperation, CommandReceipt,
5    ErrorKind, Execution, FundingRate, HealthNotification, InstrumentId, InstrumentSpec,
6    Liquidation, MarkPrice, OpenInterest, Order, Position, PrivateLaneEvent, PublicLaneEvent,
7    ReconcileReport, ReconcileTrigger, RequestId, Result, Ticker, TradeTick,
8};
9
10use crate::{
11    client::BatMarkets, diagnostics::RuntimeDiagnosticsSnapshot, native::NativeClient, runtime,
12};
13
14/// Low-level escape hatch for replay tools, custom transports, and diagnostics.
15///
16/// The primary API is exposed directly on [`BatMarkets`]. Use `advanced()` only
17/// when an integration needs raw lane events, fixture ingestion, command
18/// response classification, manual reconciliation, diagnostics, or
19/// venue-specific native adapter access.
20pub struct AdvancedClient<'a> {
21    inner: &'a BatMarkets,
22}
23
24impl<'a> AdvancedClient<'a> {
25    pub(crate) const fn new(inner: &'a BatMarkets) -> Self {
26        Self { inner }
27    }
28
29    /// Decode a venue public websocket payload and merge its events into state.
30    pub fn ingest_public_json(&self, payload: &str) -> Result<Vec<PublicLaneEvent>> {
31        let events = self.inner.adapter.as_adapter().parse_public(payload)?;
32        self.inner.shared.apply_public_events(&events)?;
33        Ok(events)
34    }
35
36    /// Decode a venue private websocket payload and merge its events into state.
37    pub fn ingest_private_json(&self, payload: &str) -> Result<Vec<PrivateLaneEvent>> {
38        let events = self.inner.adapter.as_adapter().parse_private(payload)?;
39        self.inner.shared.apply_private_events(&events);
40        Ok(events)
41    }
42
43    /// Subscribe to raw public-lane events emitted by ingest or live runtime.
44    #[must_use]
45    pub fn subscribe_public_events(&self) -> broadcast::Receiver<PublicLaneEvent> {
46        self.inner.shared.subscribe_public_events()
47    }
48
49    /// Subscribe to raw private-lane events emitted by ingest or live runtime.
50    #[must_use]
51    pub fn subscribe_private_events(&self) -> broadcast::Receiver<PrivateLaneEvent> {
52        self.inner.shared.subscribe_private_events()
53    }
54
55    /// Subscribe to raw command-lane lifecycle and receipt events.
56    #[must_use]
57    pub fn subscribe_command_events(&self) -> broadcast::Receiver<CommandLaneEvent> {
58        self.inner.shared.subscribe_command_events()
59    }
60
61    /// Subscribe to transition-style health notifications.
62    ///
63    /// Notifications are emitted for structural runtime changes, not for every
64    /// market-data tick. Most applications should prefer [`BatMarkets::watch_status`].
65    #[must_use]
66    pub fn subscribe_health_notifications(&self) -> broadcast::Receiver<HealthNotification> {
67        self.inner.shared.subscribe_health_notifications()
68    }
69
70    /// Return metadata for a known instrument or an `Unsupported` error.
71    pub fn require_instrument(&self, instrument_id: &InstrumentId) -> Result<InstrumentSpec> {
72        self.inner
73            .instrument_specs()
74            .into_iter()
75            .find(|spec| &spec.instrument_id == instrument_id)
76            .ok_or_else(|| {
77                bat_markets_core::MarketError::new(
78                    ErrorKind::Unsupported,
79                    format!("unknown instrument {instrument_id}"),
80                )
81            })
82    }
83
84    /// Return the latest cached ticker for an instrument.
85    #[must_use]
86    pub fn cached_ticker(&self, instrument_id: &InstrumentId) -> Option<Ticker> {
87        self.inner
88            .read_state(|state| state.ticker(instrument_id).cloned())
89    }
90
91    /// Return cached recent public trades for an instrument.
92    #[must_use]
93    pub fn cached_recent_trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
94        self.inner
95            .read_state(|state| state.recent_trades(instrument_id))
96    }
97
98    /// Return the latest cached top-of-book snapshot for an instrument.
99    #[must_use]
100    pub fn cached_book_top(&self, instrument_id: &InstrumentId) -> Option<BookTop> {
101        self.inner
102            .read_state(|state| state.book_top(instrument_id).cloned())
103    }
104
105    /// Return the latest cached funding-rate snapshot for an instrument.
106    #[must_use]
107    pub fn cached_funding_rate(&self, instrument_id: &InstrumentId) -> Option<FundingRate> {
108        self.inner
109            .read_state(|state| state.funding_rate(instrument_id).cloned())
110    }
111
112    /// Return the latest cached mark-price snapshot for an instrument.
113    #[must_use]
114    pub fn cached_mark_price(&self, instrument_id: &InstrumentId) -> Option<MarkPrice> {
115        self.inner
116            .read_state(|state| state.mark_price(instrument_id).cloned())
117    }
118
119    /// Return the latest cached open-interest snapshot for an instrument.
120    #[must_use]
121    pub fn cached_open_interest(&self, instrument_id: &InstrumentId) -> Option<OpenInterest> {
122        self.inner
123            .read_state(|state| state.open_interest(instrument_id).cloned())
124    }
125
126    /// Return cached liquidation events for an instrument.
127    #[must_use]
128    pub fn cached_liquidations(&self, instrument_id: &InstrumentId) -> Option<Vec<Liquidation>> {
129        self.inner
130            .read_state(|state| state.liquidations(instrument_id))
131    }
132
133    /// Return cached balances from the private state projection.
134    #[must_use]
135    pub fn cached_balances(&self) -> Vec<Balance> {
136        self.inner
137            .read_state(bat_markets_core::EngineState::balances)
138    }
139
140    /// Return the cached account summary, if one has been observed or fetched.
141    #[must_use]
142    pub fn cached_account_summary(&self) -> Option<AccountSummary> {
143        self.inner
144            .read_state(bat_markets_core::EngineState::account_summary)
145    }
146
147    /// Return cached positions from the private state projection.
148    #[must_use]
149    pub fn cached_positions(&self) -> Vec<Position> {
150        self.inner
151            .read_state(bat_markets_core::EngineState::positions)
152    }
153
154    /// Return all cached orders known to the engine state.
155    #[must_use]
156    pub fn cached_orders(&self) -> Vec<Order> {
157        self.inner.read_state(bat_markets_core::EngineState::orders)
158    }
159
160    /// Return cached orders that are currently open.
161    #[must_use]
162    pub fn cached_open_orders(&self) -> Vec<Order> {
163        self.inner
164            .read_state(bat_markets_core::EngineState::open_orders)
165    }
166
167    /// Return cached private execution fills known to the engine state.
168    #[must_use]
169    pub fn cached_executions(&self) -> Vec<Execution> {
170        self.inner
171            .read_state(bat_markets_core::EngineState::executions)
172    }
173
174    /// Classify a raw command response payload and apply the resulting state hint.
175    ///
176    /// This hook is intended for custom transports. Normal write flows should
177    /// use root command methods such as [`BatMarkets::create_order`].
178    pub fn classify_command_json(
179        &self,
180        operation: CommandOperation,
181        payload: Option<&str>,
182        request_id: Option<RequestId>,
183    ) -> Result<CommandReceipt> {
184        let receipt = self
185            .inner
186            .adapter
187            .as_adapter()
188            .classify_command(operation, payload, request_id)?;
189        self.inner
190            .write_state(|state| state.apply_command_receipt(&receipt));
191        self.inner
192            .shared
193            .emit_command_event(CommandLaneEvent::Receipt(receipt.clone()));
194        Ok(receipt)
195    }
196
197    /// Run a manual REST-backed private-state reconciliation pass.
198    pub async fn reconcile(&self) -> Result<ReconcileReport> {
199        runtime::reconcile_private(&self.inner.live_context(), ReconcileTrigger::Manual).await
200    }
201
202    /// Return local runtime and state-lock diagnostics.
203    #[must_use]
204    pub fn diagnostics(&self) -> RuntimeDiagnosticsSnapshot {
205        let mut snapshot = self.inner.runtime_state.diagnostics.snapshot();
206        snapshot.state_reads = self.inner.shared.read_diagnostics();
207        snapshot.state_writes = self.inner.shared.write_diagnostics();
208        snapshot
209    }
210
211    /// Access venue-specific adapter functionality.
212    #[must_use]
213    pub fn native(&self) -> NativeClient<'_> {
214        NativeClient::new(&self.inner.adapter)
215    }
216}