Skip to main content

rustrade/
handle.rs

1//! Cheap cloneable handle into a running [`Bot`](crate::Bot).
2//!
3//! Host services hold a `BotHandle` to:
4//!
5//! - Query aggregated health via [`BotHandle::health`].
6//! - Trigger shutdown via [`BotHandle::shutdown`].
7//! - Await shutdown via [`BotHandle::await_shutdown`].
8//! - Feed realised trade outcomes into the risk gates via
9//!   [`BotHandle::record_trade_outcome`].
10//!
11//! The handle is `Clone` so multiple parts of the host service (an HTTP
12//! handler, a metrics endpoint, a shutdown coordinator) can hold one
13//! without contention. All shared state is `Arc`-wrapped, so a clone is
14//! an atomic-ref-count bump.
15
16use std::sync::{Arc, OnceLock};
17
18use rustrade_core::{Brain, Position, Signal, SignalBus, Symbol};
19use rustrade_supervisor::{ServiceLifecycleSnapshot, Supervisor};
20use tokio::sync::broadcast;
21use tokio_util::sync::CancellationToken;
22
23use crate::risk_state::{PositionCache, RiskPersister, RiskStateMap};
24
25/// Shared slot for the risk persister. Populated at `run_until_shutdown`
26/// startup (after `with_state_store`), so handle clones taken before the
27/// bot runs still observe it once set.
28pub(crate) type PersisterSlot = Arc<OnceLock<RiskPersister>>;
29
30/// Aggregated health snapshot returned by [`BotHandle::health`].
31#[derive(Debug, Clone)]
32pub struct BotHealth {
33    /// `true` iff every brain reports healthy AND no service is in a
34    /// non-alive (terminated) state.
35    pub healthy: bool,
36    /// Whether shutdown has been triggered (signal or `handle.shutdown()`).
37    pub shutting_down: bool,
38    /// Per-service lifecycle snapshots from the supervisor.
39    pub services: Vec<ServiceLifecycleSnapshot>,
40    /// One entry per brain, in the order they were passed to `Bot::new`.
41    pub brains: Vec<BrainHealthSnapshot>,
42}
43
44/// Per-brain health information surfaced in [`BotHealth::brains`].
45#[derive(Debug, Clone)]
46pub struct BrainHealthSnapshot {
47    /// Brain name (see `Brain::name`).
48    pub name: String,
49    /// Whether the brain reports itself healthy.
50    pub healthy: bool,
51    /// Total events processed since startup.
52    pub events_processed: u64,
53    /// Number of non-`Hold` decisions emitted.
54    pub non_hold_decisions: u64,
55    /// Free-form details reported by the brain.
56    pub details: serde_json::Value,
57}
58
59/// Cheap cloneable handle into a running [`Bot`](crate::Bot).
60///
61/// See [the module docs](self) for the host-side contract.
62///
63/// # Example
64///
65/// ```no_run
66/// # use std::sync::Arc;
67/// # use rustrade::{Bot, BotConfig, BotHandle};
68/// # async fn run(
69/// #     exchange: Arc<dyn rustrade_core::ExchangeClient>,
70/// #     brains: Vec<Arc<dyn rustrade_core::Brain>>,
71/// # ) -> anyhow::Result<()> {
72/// let config = BotConfig::builder()
73///     .name("demo")
74///     .symbol("BTCUSDT")
75///     .without_signal_handler()
76///     .build()?;
77/// let bot = Bot::new(config, exchange, brains)?;
78/// let handle: BotHandle = bot.handle();
79///
80/// // Subscribe to non-Hold decisions before the bot starts.
81/// let mut signals = handle.subscribe_signals();
82///
83/// // Drive the bot from one task, observe from another.
84/// let task = tokio::spawn(async move { bot.run_until_shutdown().await });
85/// tokio::spawn({
86///     let handle = handle.clone();
87///     async move {
88///         while let Ok(sig) = signals.recv().await {
89///             tracing::info!(?sig, "saw a signal");
90///         }
91///         handle.shutdown();
92///     }
93/// });
94/// task.await??;
95/// # Ok(())
96/// # }
97/// ```
98#[derive(Clone)]
99pub struct BotHandle {
100    cancel: CancellationToken,
101    supervisor: Arc<Supervisor>,
102    brains: Arc<Vec<Arc<dyn Brain>>>,
103    risk: RiskStateMap,
104    positions: PositionCache,
105    signals: SignalBus,
106    persister: PersisterSlot,
107    order_tracker: crate::order_tracker::OrderTracker,
108}
109
110impl BotHandle {
111    pub(crate) fn new(
112        supervisor: Arc<Supervisor>,
113        brains: Arc<Vec<Arc<dyn Brain>>>,
114        risk: RiskStateMap,
115        positions: PositionCache,
116        signals: SignalBus,
117        persister: PersisterSlot,
118        order_tracker: crate::order_tracker::OrderTracker,
119    ) -> Self {
120        Self {
121            cancel: supervisor.cancel_token().clone(),
122            supervisor,
123            brains,
124            risk,
125            positions,
126            signals,
127            persister,
128            order_tracker,
129        }
130    }
131
132    /// Snapshot of the resting orders the framework is currently tracking.
133    ///
134    /// Empty unless order tracking is wired via
135    /// [`Bot::with_order_tracking`](crate::Bot::with_order_tracking) and the
136    /// adapter advertises
137    /// [`Capability::OrderTracking`](rustrade_core::Capability::OrderTracking).
138    pub async fn tracked_orders(&self) -> Vec<crate::order_tracker::TrackedOrder> {
139        self.order_tracker.snapshot().await
140    }
141
142    /// Trigger a graceful shutdown. Fire-and-forget; idempotent.
143    pub fn shutdown(&self) {
144        self.cancel.cancel();
145    }
146
147    /// Has shutdown been triggered?
148    pub fn is_shutting_down(&self) -> bool {
149        self.cancel.is_cancelled()
150    }
151
152    /// Resolves once shutdown has been triggered by anyone (signal,
153    /// `shutdown()` call on this or any other handle clone, or programmatic
154    /// supervisor cancellation).
155    pub async fn await_shutdown(&self) {
156        self.cancel.cancelled().await;
157    }
158
159    /// Feed a realised trade outcome into the per-symbol risk state.
160    ///
161    /// Called by the host (or a brain) when a position closes. Updates
162    /// the symbol's `SessionPnl` and records a win/loss on the
163    /// `CircuitBreaker` based on the **net** PnL.
164    ///
165    /// Non-finite `gross_pnl` / `fee` values are **rejected** (logged at
166    /// error level, risk state unchanged) — a NaN would otherwise make
167    /// the accumulated PnL NaN and permanently disable the loss-limit
168    /// gate.
169    ///
170    /// Phase 2b does not automate this from the fill stream — that's
171    /// `FillRoutingService` territory in Phase 2c.
172    pub async fn record_trade_outcome(&self, symbol: &Symbol, gross_pnl: f64, fee: f64) {
173        // A NaN fed into `record_close` makes the accumulated PnL NaN, and
174        // every subsequent `net_pnl() <= loss_limit` comparison false — the
175        // session halt would silently never fire again. Reject it loudly.
176        if !gross_pnl.is_finite() || !fee.is_finite() {
177            tracing::error!(
178                symbol = %symbol,
179                gross_pnl,
180                fee,
181                "record_trade_outcome: non-finite PnL rejected — risk state unchanged"
182            );
183            return;
184        }
185        {
186            let mut map = self.risk.write().await;
187            let Some(risk) = map.get_mut(symbol) else {
188                tracing::warn!(
189                    symbol = %symbol,
190                    "record_trade_outcome: symbol not in risk state map (was it configured?)"
191                );
192                return;
193            };
194            risk.session_pnl.record_close(gross_pnl, fee);
195            let net = gross_pnl - fee;
196            if net > 0.0 {
197                risk.circuit_breaker.record_win();
198            } else if net < 0.0 {
199                risk.circuit_breaker.record_loss();
200            }
201        }
202        // Persist the updated risk state if a store is wired, so a crash
203        // right after this trade doesn't lose its effect on the gates.
204        if let Some(persister) = self.persister.get() {
205            persister.persist_symbol(&self.risk, symbol).await;
206        }
207    }
208
209    /// Read the current cached position for a symbol, or `Position::FLAT`
210    /// if the symbol isn't tracked.
211    pub async fn position(&self, symbol: &Symbol) -> Position {
212        self.positions
213            .read()
214            .await
215            .get(symbol)
216            .copied()
217            .unwrap_or(Position::FLAT)
218    }
219
220    /// Overwrite the cached position for a symbol. Typically called by the
221    /// host's fill-handling code; Phase 2c's `FillRoutingService` will do
222    /// this automatically.
223    pub async fn set_position(&self, symbol: &Symbol, position: Position) {
224        self.positions
225            .write()
226            .await
227            .insert(symbol.clone(), position);
228    }
229
230    /// Subscribe to the bot's signal stream.
231    ///
232    /// The [`ExecutionService`](crate::execution::ExecutionService)
233    /// publishes a [`Signal`] on every non-`Hold` decision a brain emits,
234    /// *before* the risk gates run. Subscribers see the strategic intent;
235    /// whether each signal was acted on is observable from order logs and
236    /// metrics.
237    ///
238    /// The underlying channel is `tokio::sync::broadcast`, so slow
239    /// subscribers will see `RecvError::Lagged(n)` if they fall behind.
240    ///
241    /// # Subscriber lifetime
242    ///
243    /// Because `BotHandle` keeps a `Sender` clone alive, the channel
244    /// does **not** close when the bot exits. A subscriber that loops on
245    /// `recv()` will block forever after shutdown unless it also watches
246    /// a cancellation signal:
247    ///
248    /// ```ignore
249    /// let mut rx = handle.subscribe_signals();
250    /// let shutdown = host.shutdown_token();
251    /// loop {
252    ///     tokio::select! {
253    ///         _ = shutdown.cancelled() => break,
254    ///         r = rx.recv() => match r {
255    ///             Ok(sig) => handle_signal(sig),
256    ///             Err(_)  => break,
257    ///         },
258    ///     }
259    /// }
260    /// ```
261    pub fn subscribe_signals(&self) -> broadcast::Receiver<Signal> {
262        self.signals.subscribe()
263    }
264
265    /// Number of currently-attached signal subscribers.
266    pub fn signal_subscriber_count(&self) -> usize {
267        self.signals.subscriber_count()
268    }
269
270    /// Snapshot of bot-wide health.
271    pub async fn health(&self) -> BotHealth {
272        let services = self.supervisor.lifecycle_snapshots().await;
273
274        let mut brains = Vec::with_capacity(self.brains.len());
275        for brain in self.brains.iter() {
276            let h = brain.health().await;
277            brains.push(BrainHealthSnapshot {
278                name: brain.name().to_string(),
279                healthy: h.healthy,
280                events_processed: h.events_processed,
281                non_hold_decisions: h.non_hold_decisions,
282                details: h.details,
283            });
284        }
285
286        let all_services_alive = services
287            .iter()
288            .all(|s| !matches!(s.phase, rustrade_supervisor::ServicePhase::Terminated));
289        let all_brains_healthy = brains.iter().all(|b| b.healthy);
290
291        BotHealth {
292            healthy: all_services_alive && all_brains_healthy,
293            shutting_down: self.is_shutting_down(),
294            services,
295            brains,
296        }
297    }
298}
299
300impl std::fmt::Debug for BotHandle {
301    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
302        f.debug_struct("BotHandle")
303            .field("shutting_down", &self.is_shutting_down())
304            .field("brain_count", &self.brains.len())
305            .finish_non_exhaustive()
306    }
307}