rustrade/handle.rs
1//! Cheap cloneable handle into a running [`Bot`](crate::Bot).
2//!
3//! Host services hold a `BotHandle` to:
4//!
5//! - Query aggregated health via [`BotHandle::health`].
6//! - Trigger shutdown via [`BotHandle::shutdown`].
7//! - Await shutdown via [`BotHandle::await_shutdown`].
8//! - Feed realised trade outcomes into the risk gates via
9//! [`BotHandle::record_trade_outcome`].
10//!
11//! The handle is `Clone` so multiple parts of the host service (an HTTP
12//! handler, a metrics endpoint, a shutdown coordinator) can hold one
13//! without contention. All shared state is `Arc`-wrapped, so a clone is
14//! an atomic-ref-count bump.
15
16use std::sync::{Arc, OnceLock};
17
18use rustrade_core::{Brain, Position, Signal, SignalBus, Symbol};
19use rustrade_supervisor::{ServiceLifecycleSnapshot, Supervisor};
20use tokio::sync::broadcast;
21use tokio_util::sync::CancellationToken;
22
23use crate::risk_state::{PositionCache, RiskPersister, RiskStateMap};
24
25/// Shared slot for the risk persister. Populated at `run_until_shutdown`
26/// startup (after `with_state_store`), so handle clones taken before the
27/// bot runs still observe it once set.
28pub(crate) type PersisterSlot = Arc<OnceLock<RiskPersister>>;
29
30/// Aggregated health snapshot returned by [`BotHandle::health`].
31#[derive(Debug, Clone)]
32pub struct BotHealth {
33 /// `true` iff every brain reports healthy AND no service is in a
34 /// non-alive (terminated) state.
35 pub healthy: bool,
36 /// Whether shutdown has been triggered (signal or `handle.shutdown()`).
37 pub shutting_down: bool,
38 /// Per-service lifecycle snapshots from the supervisor.
39 pub services: Vec<ServiceLifecycleSnapshot>,
40 /// One entry per brain, in the order they were passed to `Bot::new`.
41 pub brains: Vec<BrainHealthSnapshot>,
42}
43
44/// Per-brain health information surfaced in [`BotHealth::brains`].
45#[derive(Debug, Clone)]
46pub struct BrainHealthSnapshot {
47 /// Brain name (see `Brain::name`).
48 pub name: String,
49 /// Whether the brain reports itself healthy.
50 pub healthy: bool,
51 /// Total events processed since startup.
52 pub events_processed: u64,
53 /// Number of non-`Hold` decisions emitted.
54 pub non_hold_decisions: u64,
55 /// Free-form details reported by the brain.
56 pub details: serde_json::Value,
57}
58
59/// Cheap cloneable handle into a running [`Bot`](crate::Bot).
60///
61/// See [the module docs](self) for the host-side contract.
62///
63/// # Example
64///
65/// ```no_run
66/// # use std::sync::Arc;
67/// # use rustrade::{Bot, BotConfig, BotHandle};
68/// # async fn run(
69/// # exchange: Arc<dyn rustrade_core::ExchangeClient>,
70/// # brains: Vec<Arc<dyn rustrade_core::Brain>>,
71/// # ) -> anyhow::Result<()> {
72/// let config = BotConfig::builder()
73/// .name("demo")
74/// .symbol("BTCUSDT")
75/// .without_signal_handler()
76/// .build()?;
77/// let bot = Bot::new(config, exchange, brains)?;
78/// let handle: BotHandle = bot.handle();
79///
80/// // Subscribe to non-Hold decisions before the bot starts.
81/// let mut signals = handle.subscribe_signals();
82///
83/// // Drive the bot from one task, observe from another.
84/// let task = tokio::spawn(async move { bot.run_until_shutdown().await });
85/// tokio::spawn({
86/// let handle = handle.clone();
87/// async move {
88/// while let Ok(sig) = signals.recv().await {
89/// tracing::info!(?sig, "saw a signal");
90/// }
91/// handle.shutdown();
92/// }
93/// });
94/// task.await??;
95/// # Ok(())
96/// # }
97/// ```
98#[derive(Clone)]
99pub struct BotHandle {
100 cancel: CancellationToken,
101 supervisor: Arc<Supervisor>,
102 brains: Arc<Vec<Arc<dyn Brain>>>,
103 risk: RiskStateMap,
104 positions: PositionCache,
105 signals: SignalBus,
106 persister: PersisterSlot,
107 order_tracker: crate::order_tracker::OrderTracker,
108}
109
110impl BotHandle {
111 pub(crate) fn new(
112 supervisor: Arc<Supervisor>,
113 brains: Arc<Vec<Arc<dyn Brain>>>,
114 risk: RiskStateMap,
115 positions: PositionCache,
116 signals: SignalBus,
117 persister: PersisterSlot,
118 order_tracker: crate::order_tracker::OrderTracker,
119 ) -> Self {
120 Self {
121 cancel: supervisor.cancel_token().clone(),
122 supervisor,
123 brains,
124 risk,
125 positions,
126 signals,
127 persister,
128 order_tracker,
129 }
130 }
131
132 /// Snapshot of the resting orders the framework is currently tracking.
133 ///
134 /// Empty unless order tracking is wired via
135 /// [`Bot::with_order_tracking`](crate::Bot::with_order_tracking) and the
136 /// adapter advertises
137 /// [`Capability::OrderTracking`](rustrade_core::Capability::OrderTracking).
138 pub async fn tracked_orders(&self) -> Vec<crate::order_tracker::TrackedOrder> {
139 self.order_tracker.snapshot().await
140 }
141
142 /// Trigger a graceful shutdown. Fire-and-forget; idempotent.
143 pub fn shutdown(&self) {
144 self.cancel.cancel();
145 }
146
147 /// Has shutdown been triggered?
148 pub fn is_shutting_down(&self) -> bool {
149 self.cancel.is_cancelled()
150 }
151
152 /// Resolves once shutdown has been triggered by anyone (signal,
153 /// `shutdown()` call on this or any other handle clone, or programmatic
154 /// supervisor cancellation).
155 pub async fn await_shutdown(&self) {
156 self.cancel.cancelled().await;
157 }
158
159 /// Feed a realised trade outcome into the per-symbol risk state.
160 ///
161 /// Called by the host (or a brain) when a position closes. Updates
162 /// the symbol's `SessionPnl` and records a win/loss on the
163 /// `CircuitBreaker` based on the **net** PnL.
164 ///
165 /// Phase 2b does not automate this from the fill stream — that's
166 /// `FillRoutingService` territory in Phase 2c.
167 pub async fn record_trade_outcome(&self, symbol: &Symbol, gross_pnl: f64, fee: f64) {
168 {
169 let mut map = self.risk.write().await;
170 let Some(risk) = map.get_mut(symbol) else {
171 tracing::warn!(
172 symbol = %symbol,
173 "record_trade_outcome: symbol not in risk state map (was it configured?)"
174 );
175 return;
176 };
177 risk.session_pnl.record_close(gross_pnl, fee);
178 let net = gross_pnl - fee;
179 if net > 0.0 {
180 risk.circuit_breaker.record_win();
181 } else if net < 0.0 {
182 risk.circuit_breaker.record_loss();
183 }
184 }
185 // Persist the updated risk state if a store is wired, so a crash
186 // right after this trade doesn't lose its effect on the gates.
187 if let Some(persister) = self.persister.get() {
188 persister.persist_symbol(&self.risk, symbol).await;
189 }
190 }
191
192 /// Read the current cached position for a symbol, or `Position::FLAT`
193 /// if the symbol isn't tracked.
194 pub async fn position(&self, symbol: &Symbol) -> Position {
195 self.positions
196 .read()
197 .await
198 .get(symbol)
199 .copied()
200 .unwrap_or(Position::FLAT)
201 }
202
203 /// Overwrite the cached position for a symbol. Typically called by the
204 /// host's fill-handling code; Phase 2c's `FillRoutingService` will do
205 /// this automatically.
206 pub async fn set_position(&self, symbol: &Symbol, position: Position) {
207 self.positions
208 .write()
209 .await
210 .insert(symbol.clone(), position);
211 }
212
213 /// Subscribe to the bot's signal stream.
214 ///
215 /// The [`ExecutionService`](crate::execution::ExecutionService)
216 /// publishes a [`Signal`] on every non-`Hold` decision a brain emits,
217 /// *before* the risk gates run. Subscribers see the strategic intent;
218 /// whether each signal was acted on is observable from order logs and
219 /// metrics.
220 ///
221 /// The underlying channel is `tokio::sync::broadcast`, so slow
222 /// subscribers will see `RecvError::Lagged(n)` if they fall behind.
223 ///
224 /// # Subscriber lifetime
225 ///
226 /// Because `BotHandle` keeps a `Sender` clone alive, the channel
227 /// does **not** close when the bot exits. A subscriber that loops on
228 /// `recv()` will block forever after shutdown unless it also watches
229 /// a cancellation signal:
230 ///
231 /// ```ignore
232 /// let mut rx = handle.subscribe_signals();
233 /// let shutdown = host.shutdown_token();
234 /// loop {
235 /// tokio::select! {
236 /// _ = shutdown.cancelled() => break,
237 /// r = rx.recv() => match r {
238 /// Ok(sig) => handle_signal(sig),
239 /// Err(_) => break,
240 /// },
241 /// }
242 /// }
243 /// ```
244 pub fn subscribe_signals(&self) -> broadcast::Receiver<Signal> {
245 self.signals.subscribe()
246 }
247
248 /// Number of currently-attached signal subscribers.
249 pub fn signal_subscriber_count(&self) -> usize {
250 self.signals.subscriber_count()
251 }
252
253 /// Snapshot of bot-wide health.
254 pub async fn health(&self) -> BotHealth {
255 let services = self.supervisor.lifecycle_snapshots().await;
256
257 let mut brains = Vec::with_capacity(self.brains.len());
258 for brain in self.brains.iter() {
259 let h = brain.health().await;
260 brains.push(BrainHealthSnapshot {
261 name: brain.name().to_string(),
262 healthy: h.healthy,
263 events_processed: h.events_processed,
264 non_hold_decisions: h.non_hold_decisions,
265 details: h.details,
266 });
267 }
268
269 let all_services_alive = services
270 .iter()
271 .all(|s| !matches!(s.phase, rustrade_supervisor::ServicePhase::Terminated));
272 let all_brains_healthy = brains.iter().all(|b| b.healthy);
273
274 BotHealth {
275 healthy: all_services_alive && all_brains_healthy,
276 shutting_down: self.is_shutting_down(),
277 services,
278 brains,
279 }
280 }
281}
282
283impl std::fmt::Debug for BotHandle {
284 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
285 f.debug_struct("BotHandle")
286 .field("shutting_down", &self.is_shutting_down())
287 .field("brain_count", &self.brains.len())
288 .finish_non_exhaustive()
289 }
290}