Skip to main content

rustrade/
handle.rs

1//! Cheap cloneable handle into a running [`Bot`](crate::Bot).
2//!
3//! Host services hold a `BotHandle` to:
4//!
5//! - Query aggregated health via [`BotHandle::health`].
6//! - Trigger shutdown via [`BotHandle::shutdown`].
7//! - Await shutdown via [`BotHandle::await_shutdown`].
8//! - Feed realised trade outcomes into the risk gates via
9//!   [`BotHandle::record_trade_outcome`].
10//!
11//! The handle is `Clone` so multiple parts of the host service (an HTTP
12//! handler, a metrics endpoint, a shutdown coordinator) can hold one
13//! without contention. All shared state is `Arc`-wrapped, so a clone is
14//! an atomic-ref-count bump.
15
16use std::sync::{Arc, OnceLock};
17
18use rustrade_core::{Brain, Position, Signal, SignalBus, Symbol};
19use rustrade_supervisor::{ServiceLifecycleSnapshot, Supervisor};
20use tokio::sync::broadcast;
21use tokio_util::sync::CancellationToken;
22
23use crate::risk_state::{PositionCache, RiskPersister, RiskStateMap};
24
25/// Shared slot for the risk persister. Populated at `run_until_shutdown`
26/// startup (after `with_state_store`), so handle clones taken before the
27/// bot runs still observe it once set.
28pub(crate) type PersisterSlot = Arc<OnceLock<RiskPersister>>;
29
30/// Aggregated health snapshot returned by [`BotHandle::health`].
31#[derive(Debug, Clone)]
32pub struct BotHealth {
33    /// `true` iff every brain reports healthy AND no service is in a
34    /// non-alive (terminated) state.
35    pub healthy: bool,
36    /// Whether shutdown has been triggered (signal or `handle.shutdown()`).
37    pub shutting_down: bool,
38    /// Per-service lifecycle snapshots from the supervisor.
39    pub services: Vec<ServiceLifecycleSnapshot>,
40    /// One entry per brain, in the order they were passed to `Bot::new`.
41    pub brains: Vec<BrainHealthSnapshot>,
42}
43
44/// Per-brain health information surfaced in [`BotHealth::brains`].
45#[derive(Debug, Clone)]
46pub struct BrainHealthSnapshot {
47    /// Brain name (see `Brain::name`).
48    pub name: String,
49    /// Whether the brain reports itself healthy.
50    pub healthy: bool,
51    /// Total events processed since startup.
52    pub events_processed: u64,
53    /// Number of non-`Hold` decisions emitted.
54    pub non_hold_decisions: u64,
55    /// Free-form details reported by the brain.
56    pub details: serde_json::Value,
57}
58
59/// Cheap cloneable handle into a running [`Bot`](crate::Bot).
60///
61/// See [the module docs](self) for the host-side contract.
62///
63/// # Example
64///
65/// ```no_run
66/// # use std::sync::Arc;
67/// # use rustrade::{Bot, BotConfig, BotHandle};
68/// # async fn run(
69/// #     exchange: Arc<dyn rustrade_core::ExchangeClient>,
70/// #     brains: Vec<Arc<dyn rustrade_core::Brain>>,
71/// # ) -> anyhow::Result<()> {
72/// let config = BotConfig::builder()
73///     .name("demo")
74///     .symbol("BTCUSDT")
75///     .without_signal_handler()
76///     .build()?;
77/// let bot = Bot::new(config, exchange, brains)?;
78/// let handle: BotHandle = bot.handle();
79///
80/// // Subscribe to non-Hold decisions before the bot starts.
81/// let mut signals = handle.subscribe_signals();
82///
83/// // Drive the bot from one task, observe from another.
84/// let task = tokio::spawn(async move { bot.run_until_shutdown().await });
85/// tokio::spawn({
86///     let handle = handle.clone();
87///     async move {
88///         while let Ok(sig) = signals.recv().await {
89///             tracing::info!(?sig, "saw a signal");
90///         }
91///         handle.shutdown();
92///     }
93/// });
94/// task.await??;
95/// # Ok(())
96/// # }
97/// ```
98#[derive(Clone)]
99pub struct BotHandle {
100    cancel: CancellationToken,
101    supervisor: Arc<Supervisor>,
102    brains: Arc<Vec<Arc<dyn Brain>>>,
103    risk: RiskStateMap,
104    positions: PositionCache,
105    signals: SignalBus,
106    persister: PersisterSlot,
107    order_tracker: crate::order_tracker::OrderTracker,
108}
109
110impl BotHandle {
111    pub(crate) fn new(
112        supervisor: Arc<Supervisor>,
113        brains: Arc<Vec<Arc<dyn Brain>>>,
114        risk: RiskStateMap,
115        positions: PositionCache,
116        signals: SignalBus,
117        persister: PersisterSlot,
118        order_tracker: crate::order_tracker::OrderTracker,
119    ) -> Self {
120        Self {
121            cancel: supervisor.cancel_token().clone(),
122            supervisor,
123            brains,
124            risk,
125            positions,
126            signals,
127            persister,
128            order_tracker,
129        }
130    }
131
132    /// Snapshot of the resting orders the framework is currently tracking.
133    ///
134    /// Empty unless order tracking is wired via
135    /// [`Bot::with_order_tracking`](crate::Bot::with_order_tracking) and the
136    /// adapter advertises
137    /// [`Capability::OrderTracking`](rustrade_core::Capability::OrderTracking).
138    pub async fn tracked_orders(&self) -> Vec<crate::order_tracker::TrackedOrder> {
139        self.order_tracker.snapshot().await
140    }
141
142    /// Trigger a graceful shutdown. Fire-and-forget; idempotent.
143    pub fn shutdown(&self) {
144        self.cancel.cancel();
145    }
146
147    /// Has shutdown been triggered?
148    pub fn is_shutting_down(&self) -> bool {
149        self.cancel.is_cancelled()
150    }
151
152    /// Resolves once shutdown has been triggered by anyone (signal,
153    /// `shutdown()` call on this or any other handle clone, or programmatic
154    /// supervisor cancellation).
155    pub async fn await_shutdown(&self) {
156        self.cancel.cancelled().await;
157    }
158
159    /// Feed a realised trade outcome into the per-symbol risk state.
160    ///
161    /// Called by the host (or a brain) when a position closes. Updates
162    /// the symbol's `SessionPnl` and records a win/loss on the
163    /// `CircuitBreaker` based on the **net** PnL.
164    ///
165    /// Phase 2b does not automate this from the fill stream — that's
166    /// `FillRoutingService` territory in Phase 2c.
167    pub async fn record_trade_outcome(&self, symbol: &Symbol, gross_pnl: f64, fee: f64) {
168        {
169            let mut map = self.risk.write().await;
170            let Some(risk) = map.get_mut(symbol) else {
171                tracing::warn!(
172                    symbol = %symbol,
173                    "record_trade_outcome: symbol not in risk state map (was it configured?)"
174                );
175                return;
176            };
177            risk.session_pnl.record_close(gross_pnl, fee);
178            let net = gross_pnl - fee;
179            if net > 0.0 {
180                risk.circuit_breaker.record_win();
181            } else if net < 0.0 {
182                risk.circuit_breaker.record_loss();
183            }
184        }
185        // Persist the updated risk state if a store is wired, so a crash
186        // right after this trade doesn't lose its effect on the gates.
187        if let Some(persister) = self.persister.get() {
188            persister.persist_symbol(&self.risk, symbol).await;
189        }
190    }
191
192    /// Read the current cached position for a symbol, or `Position::FLAT`
193    /// if the symbol isn't tracked.
194    pub async fn position(&self, symbol: &Symbol) -> Position {
195        self.positions
196            .read()
197            .await
198            .get(symbol)
199            .copied()
200            .unwrap_or(Position::FLAT)
201    }
202
203    /// Overwrite the cached position for a symbol. Typically called by the
204    /// host's fill-handling code; Phase 2c's `FillRoutingService` will do
205    /// this automatically.
206    pub async fn set_position(&self, symbol: &Symbol, position: Position) {
207        self.positions
208            .write()
209            .await
210            .insert(symbol.clone(), position);
211    }
212
213    /// Subscribe to the bot's signal stream.
214    ///
215    /// The [`ExecutionService`](crate::execution::ExecutionService)
216    /// publishes a [`Signal`] on every non-`Hold` decision a brain emits,
217    /// *before* the risk gates run. Subscribers see the strategic intent;
218    /// whether each signal was acted on is observable from order logs and
219    /// metrics.
220    ///
221    /// The underlying channel is `tokio::sync::broadcast`, so slow
222    /// subscribers will see `RecvError::Lagged(n)` if they fall behind.
223    ///
224    /// # Subscriber lifetime
225    ///
226    /// Because `BotHandle` keeps a `Sender` clone alive, the channel
227    /// does **not** close when the bot exits. A subscriber that loops on
228    /// `recv()` will block forever after shutdown unless it also watches
229    /// a cancellation signal:
230    ///
231    /// ```ignore
232    /// let mut rx = handle.subscribe_signals();
233    /// let shutdown = host.shutdown_token();
234    /// loop {
235    ///     tokio::select! {
236    ///         _ = shutdown.cancelled() => break,
237    ///         r = rx.recv() => match r {
238    ///             Ok(sig) => handle_signal(sig),
239    ///             Err(_)  => break,
240    ///         },
241    ///     }
242    /// }
243    /// ```
244    pub fn subscribe_signals(&self) -> broadcast::Receiver<Signal> {
245        self.signals.subscribe()
246    }
247
248    /// Number of currently-attached signal subscribers.
249    pub fn signal_subscriber_count(&self) -> usize {
250        self.signals.subscriber_count()
251    }
252
253    /// Snapshot of bot-wide health.
254    pub async fn health(&self) -> BotHealth {
255        let services = self.supervisor.lifecycle_snapshots().await;
256
257        let mut brains = Vec::with_capacity(self.brains.len());
258        for brain in self.brains.iter() {
259            let h = brain.health().await;
260            brains.push(BrainHealthSnapshot {
261                name: brain.name().to_string(),
262                healthy: h.healthy,
263                events_processed: h.events_processed,
264                non_hold_decisions: h.non_hold_decisions,
265                details: h.details,
266            });
267        }
268
269        let all_services_alive = services
270            .iter()
271            .all(|s| !matches!(s.phase, rustrade_supervisor::ServicePhase::Terminated));
272        let all_brains_healthy = brains.iter().all(|b| b.healthy);
273
274        BotHealth {
275            healthy: all_services_alive && all_brains_healthy,
276            shutting_down: self.is_shutting_down(),
277            services,
278            brains,
279        }
280    }
281}
282
283impl std::fmt::Debug for BotHandle {
284    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
285        f.debug_struct("BotHandle")
286            .field("shutting_down", &self.is_shutting_down())
287            .field("brain_count", &self.brains.len())
288            .finish_non_exhaustive()
289    }
290}