rustrade/handle.rs
1//! Cheap cloneable handle into a running [`Bot`](crate::Bot).
2//!
3//! Host services hold a `BotHandle` to:
4//!
5//! - Query aggregated health via [`BotHandle::health`].
6//! - Trigger shutdown via [`BotHandle::shutdown`].
7//! - Await shutdown via [`BotHandle::await_shutdown`].
8//! - Feed realised trade outcomes into the risk gates via
9//! [`BotHandle::record_trade_outcome`].
10//!
11//! The handle is `Clone` so multiple parts of the host service (an HTTP
12//! handler, a metrics endpoint, a shutdown coordinator) can hold one
13//! without contention. All shared state is `Arc`-wrapped, so a clone is
14//! an atomic-ref-count bump.
15
16use std::sync::{Arc, OnceLock};
17
18use rustrade_core::{Brain, Position, Signal, SignalBus, Symbol};
19use rustrade_supervisor::{ServiceLifecycleSnapshot, Supervisor};
20use tokio::sync::broadcast;
21use tokio_util::sync::CancellationToken;
22
23use crate::risk_state::{PositionCache, RiskPersister, RiskStateMap};
24
25/// Shared slot for the risk persister. Populated at `run_until_shutdown`
26/// startup (after `with_state_store`), so handle clones taken before the
27/// bot runs still observe it once set.
28pub(crate) type PersisterSlot = Arc<OnceLock<RiskPersister>>;
29
30/// Aggregated health snapshot returned by [`BotHandle::health`].
31#[derive(Debug, Clone)]
32pub struct BotHealth {
33 /// `true` iff every brain reports healthy AND no service is in a
34 /// non-alive (terminated) state.
35 pub healthy: bool,
36 /// Whether shutdown has been triggered (signal or `handle.shutdown()`).
37 pub shutting_down: bool,
38 /// Per-service lifecycle snapshots from the supervisor.
39 pub services: Vec<ServiceLifecycleSnapshot>,
40 /// One entry per brain, in the order they were passed to `Bot::new`.
41 pub brains: Vec<BrainHealthSnapshot>,
42}
43
44/// Per-brain health information surfaced in [`BotHealth::brains`].
45#[derive(Debug, Clone)]
46pub struct BrainHealthSnapshot {
47 /// Brain name (see `Brain::name`).
48 pub name: String,
49 /// Whether the brain reports itself healthy.
50 pub healthy: bool,
51 /// Total events processed since startup.
52 pub events_processed: u64,
53 /// Number of non-`Hold` decisions emitted.
54 pub non_hold_decisions: u64,
55 /// Free-form details reported by the brain.
56 pub details: serde_json::Value,
57}
58
59/// Cheap cloneable handle into a running [`Bot`](crate::Bot).
60///
61/// See [the module docs](self) for the host-side contract.
62///
63/// # Example
64///
65/// ```no_run
66/// # use std::sync::Arc;
67/// # use rustrade::{Bot, BotConfig, BotHandle};
68/// # async fn run(
69/// # exchange: Arc<dyn rustrade_core::ExchangeClient>,
70/// # brains: Vec<Arc<dyn rustrade_core::Brain>>,
71/// # ) -> anyhow::Result<()> {
72/// let config = BotConfig::builder()
73/// .name("demo")
74/// .symbol("BTCUSDT")
75/// .without_signal_handler()
76/// .build()?;
77/// let bot = Bot::new(config, exchange, brains)?;
78/// let handle: BotHandle = bot.handle();
79///
80/// // Subscribe to non-Hold decisions before the bot starts.
81/// let mut signals = handle.subscribe_signals();
82///
83/// // Drive the bot from one task, observe from another.
84/// let task = tokio::spawn(async move { bot.run_until_shutdown().await });
85/// tokio::spawn({
86/// let handle = handle.clone();
87/// async move {
88/// while let Ok(sig) = signals.recv().await {
89/// tracing::info!(?sig, "saw a signal");
90/// }
91/// handle.shutdown();
92/// }
93/// });
94/// task.await??;
95/// # Ok(())
96/// # }
97/// ```
98#[derive(Clone)]
99pub struct BotHandle {
100 cancel: CancellationToken,
101 supervisor: Arc<Supervisor>,
102 brains: Arc<Vec<Arc<dyn Brain>>>,
103 risk: RiskStateMap,
104 positions: PositionCache,
105 signals: SignalBus,
106 persister: PersisterSlot,
107 order_tracker: crate::order_tracker::OrderTracker,
108}
109
110impl BotHandle {
111 pub(crate) fn new(
112 supervisor: Arc<Supervisor>,
113 brains: Arc<Vec<Arc<dyn Brain>>>,
114 risk: RiskStateMap,
115 positions: PositionCache,
116 signals: SignalBus,
117 persister: PersisterSlot,
118 order_tracker: crate::order_tracker::OrderTracker,
119 ) -> Self {
120 Self {
121 cancel: supervisor.cancel_token().clone(),
122 supervisor,
123 brains,
124 risk,
125 positions,
126 signals,
127 persister,
128 order_tracker,
129 }
130 }
131
132 /// Snapshot of the resting orders the framework is currently tracking.
133 ///
134 /// Empty unless order tracking is wired via
135 /// [`Bot::with_order_tracking`](crate::Bot::with_order_tracking) and the
136 /// adapter advertises
137 /// [`Capability::OrderTracking`](rustrade_core::Capability::OrderTracking).
138 pub async fn tracked_orders(&self) -> Vec<crate::order_tracker::TrackedOrder> {
139 self.order_tracker.snapshot().await
140 }
141
142 /// Trigger a graceful shutdown. Fire-and-forget; idempotent.
143 pub fn shutdown(&self) {
144 self.cancel.cancel();
145 }
146
147 /// Has shutdown been triggered?
148 pub fn is_shutting_down(&self) -> bool {
149 self.cancel.is_cancelled()
150 }
151
152 /// Resolves once shutdown has been triggered by anyone (signal,
153 /// `shutdown()` call on this or any other handle clone, or programmatic
154 /// supervisor cancellation).
155 pub async fn await_shutdown(&self) {
156 self.cancel.cancelled().await;
157 }
158
159 /// Feed a realised trade outcome into the per-symbol risk state.
160 ///
161 /// Called by the host (or a brain) when a position closes. Updates
162 /// the symbol's `SessionPnl` and records a win/loss on the
163 /// `CircuitBreaker` based on the **net** PnL.
164 ///
165 /// Non-finite `gross_pnl` / `fee` values are **rejected** (logged at
166 /// error level, risk state unchanged) — a NaN would otherwise make
167 /// the accumulated PnL NaN and permanently disable the loss-limit
168 /// gate.
169 ///
170 /// Phase 2b does not automate this from the fill stream — that's
171 /// `FillRoutingService` territory in Phase 2c.
172 pub async fn record_trade_outcome(&self, symbol: &Symbol, gross_pnl: f64, fee: f64) {
173 // A NaN fed into `record_close` makes the accumulated PnL NaN, and
174 // every subsequent `net_pnl() <= loss_limit` comparison false — the
175 // session halt would silently never fire again. Reject it loudly.
176 if !gross_pnl.is_finite() || !fee.is_finite() {
177 tracing::error!(
178 symbol = %symbol,
179 gross_pnl,
180 fee,
181 "record_trade_outcome: non-finite PnL rejected — risk state unchanged"
182 );
183 return;
184 }
185 {
186 let mut map = self.risk.write().await;
187 let Some(risk) = map.get_mut(symbol) else {
188 tracing::warn!(
189 symbol = %symbol,
190 "record_trade_outcome: symbol not in risk state map (was it configured?)"
191 );
192 return;
193 };
194 risk.session_pnl.record_close(gross_pnl, fee);
195 let net = gross_pnl - fee;
196 if net > 0.0 {
197 risk.circuit_breaker.record_win();
198 } else if net < 0.0 {
199 risk.circuit_breaker.record_loss();
200 }
201 }
202 // Persist the updated risk state if a store is wired, so a crash
203 // right after this trade doesn't lose its effect on the gates.
204 if let Some(persister) = self.persister.get() {
205 persister.persist_symbol(&self.risk, symbol).await;
206 }
207 }
208
209 /// Read the current cached position for a symbol, or `Position::FLAT`
210 /// if the symbol isn't tracked.
211 pub async fn position(&self, symbol: &Symbol) -> Position {
212 self.positions
213 .read()
214 .await
215 .get(symbol)
216 .copied()
217 .unwrap_or(Position::FLAT)
218 }
219
220 /// Overwrite the cached position for a symbol. Typically called by the
221 /// host's fill-handling code; Phase 2c's `FillRoutingService` will do
222 /// this automatically.
223 pub async fn set_position(&self, symbol: &Symbol, position: Position) {
224 self.positions
225 .write()
226 .await
227 .insert(symbol.clone(), position);
228 }
229
230 /// Subscribe to the bot's signal stream.
231 ///
232 /// The [`ExecutionService`](crate::execution::ExecutionService)
233 /// publishes a [`Signal`] on every non-`Hold` decision a brain emits,
234 /// *before* the risk gates run. Subscribers see the strategic intent;
235 /// whether each signal was acted on is observable from order logs and
236 /// metrics.
237 ///
238 /// The underlying channel is `tokio::sync::broadcast`, so slow
239 /// subscribers will see `RecvError::Lagged(n)` if they fall behind.
240 ///
241 /// # Subscriber lifetime
242 ///
243 /// Because `BotHandle` keeps a `Sender` clone alive, the channel
244 /// does **not** close when the bot exits. A subscriber that loops on
245 /// `recv()` will block forever after shutdown unless it also watches
246 /// a cancellation signal:
247 ///
248 /// ```ignore
249 /// let mut rx = handle.subscribe_signals();
250 /// let shutdown = host.shutdown_token();
251 /// loop {
252 /// tokio::select! {
253 /// _ = shutdown.cancelled() => break,
254 /// r = rx.recv() => match r {
255 /// Ok(sig) => handle_signal(sig),
256 /// Err(_) => break,
257 /// },
258 /// }
259 /// }
260 /// ```
261 pub fn subscribe_signals(&self) -> broadcast::Receiver<Signal> {
262 self.signals.subscribe()
263 }
264
265 /// Number of currently-attached signal subscribers.
266 pub fn signal_subscriber_count(&self) -> usize {
267 self.signals.subscriber_count()
268 }
269
270 /// Snapshot of bot-wide health.
271 pub async fn health(&self) -> BotHealth {
272 let services = self.supervisor.lifecycle_snapshots().await;
273
274 let mut brains = Vec::with_capacity(self.brains.len());
275 for brain in self.brains.iter() {
276 let h = brain.health().await;
277 brains.push(BrainHealthSnapshot {
278 name: brain.name().to_string(),
279 healthy: h.healthy,
280 events_processed: h.events_processed,
281 non_hold_decisions: h.non_hold_decisions,
282 details: h.details,
283 });
284 }
285
286 let all_services_alive = services
287 .iter()
288 .all(|s| !matches!(s.phase, rustrade_supervisor::ServicePhase::Terminated));
289 let all_brains_healthy = brains.iter().all(|b| b.healthy);
290
291 BotHealth {
292 healthy: all_services_alive && all_brains_healthy,
293 shutting_down: self.is_shutting_down(),
294 services,
295 brains,
296 }
297 }
298}
299
300impl std::fmt::Debug for BotHandle {
301 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
302 f.debug_struct("BotHandle")
303 .field("shutting_down", &self.is_shutting_down())
304 .field("brain_count", &self.brains.len())
305 .finish_non_exhaustive()
306 }
307}