Skip to main content

rustrade/
bot.rs

1//! The [`Bot`] entry point and its [`BotConfig`] builder.
2//!
3//! `Bot::new` validates configuration and constructs a supervised runtime.
4//! `Bot::run_until_shutdown` starts the framework services and blocks
5//! until shutdown is triggered (via signal or [`BotHandle::shutdown`]).
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use rustrade_core::{
12    AssetClass, Brain, CandleSource, Capability, Error, ExchangeClient, FillSource, MarketDataBus,
13    MarketSource, MetricsSink, NoopSink, Position, Result, SignalBus, StateStore, Symbol,
14};
15use rustrade_risk::{CircuitBreakerConfig, PortfolioRiskConfig, SessionPnlConfig, SizingConfig};
16use rustrade_supervisor::{Supervisor, SupervisorConfig};
17use tokio_util::sync::CancellationToken;
18
19use crate::execution::{ExecutionContext, ExecutionService};
20use crate::handle::BotHandle;
21use crate::order_tracker::{OrderReaperService, OrderTracker};
22use crate::risk_state::{
23    PortfolioRiskState, PositionCache, RiskPersister, RiskStateMap, build_portfolio_risk,
24    build_position_cache, build_risk_state,
25};
26use crate::risk_sweep::RiskSweepService;
27use crate::services::{CandlePollerService, FillRoutingService, MarketFeedService};
28
29const DEFAULT_MARKET_BUS_CAPACITY: usize = 1024;
30const DEFAULT_SIGNAL_BUS_CAPACITY: usize = 256;
31/// How often the risk sweep runs: detects the 00:00 UTC rollover for the
32/// per-symbol and account risk state, and refreshes the portfolio daily-loss
33/// latch. Coarse on purpose — these are day-scale controls.
34const RISK_SWEEP_CADENCE: Duration = Duration::from_secs(15);
35
36/// How long a pending-entry reservation counts against the portfolio gate
37/// before it expires. Long enough for any realistic market-order fill →
38/// position-cache refresh round trip; short enough that a setup without a
39/// fill source (whose cache never updates) recovers on its own.
40const PENDING_ENTRY_TTL: Duration = Duration::from_secs(30);
41
42/// Risk-layer config for a symbol: session-PnL cap, circuit breaker, and
43/// position sizing. Used both as the bot-wide default
44/// ([`BotConfig::risk`]) and as a per-symbol override
45/// ([`BotConfig::per_symbol_risk`], set via
46/// [`BotConfigBuilder::symbol_risk`]).
47#[derive(Debug, Clone, Default)]
48pub struct RiskConfig {
49    /// Session PnL config applied to every configured symbol.
50    pub session_pnl: SessionPnlConfig,
51    /// Circuit-breaker config applied to every configured symbol.
52    pub circuit_breaker: CircuitBreakerConfig,
53    /// Position-sizing config used by the execution service.
54    pub sizing: SizingConfig,
55}
56
57impl RiskConfig {
58    /// Per-[`AssetClass`] starting presets. These are **sensible defaults to
59    /// tune**, not prescriptions — they mainly differ in leverage (the most
60    /// class-defining knob) and per-trade size; tighten or loosen for your
61    /// venue and risk appetite. Apply one per class via
62    /// [`BotConfigBuilder::class_risk`], or per symbol via
63    /// [`BotConfigBuilder::symbol_risk`].
64    ///
65    /// Crypto perpetuals: moderate 5× leverage (the framework default shape).
66    #[must_use]
67    pub fn crypto_perp() -> Self {
68        Self {
69            session_pnl: SessionPnlConfig { loss_limit: -50.0 },
70            circuit_breaker: CircuitBreakerConfig::default(),
71            sizing: SizingConfig {
72                margin_per_trade: 500.0,
73                leverage: 5,
74                max_contracts: 50,
75            },
76        }
77    }
78
79    /// Crypto spot: **no leverage** (1×) — one unit traded is one base unit.
80    #[must_use]
81    pub fn crypto_spot() -> Self {
82        Self {
83            session_pnl: SessionPnlConfig { loss_limit: -50.0 },
84            circuit_breaker: CircuitBreakerConfig::default(),
85            sizing: SizingConfig {
86                margin_per_trade: 500.0,
87                leverage: 1,
88                max_contracts: 50,
89            },
90        }
91    }
92
93    /// FX: higher leverage is conventional; a tighter sliding breaker.
94    #[must_use]
95    pub fn fx() -> Self {
96        Self {
97            session_pnl: SessionPnlConfig { loss_limit: -100.0 },
98            circuit_breaker: CircuitBreakerConfig {
99                loss_limit: 3,
100                window_secs: 7_200,
101                cooldown_secs: 3_600,
102            },
103            sizing: SizingConfig {
104                margin_per_trade: 1_000.0,
105                leverage: 20,
106                max_contracts: 50,
107            },
108        }
109    }
110
111    /// Dated futures: bigger contracts, moderate leverage, a wider daily cap.
112    #[must_use]
113    pub fn futures() -> Self {
114        Self {
115            session_pnl: SessionPnlConfig { loss_limit: -200.0 },
116            circuit_breaker: CircuitBreakerConfig::default(),
117            sizing: SizingConfig {
118                margin_per_trade: 1_000.0,
119                leverage: 10,
120                max_contracts: 20,
121            },
122        }
123    }
124
125    /// Cash equities: **no leverage** (1×), conservative size.
126    #[must_use]
127    pub fn equity() -> Self {
128        Self {
129            session_pnl: SessionPnlConfig { loss_limit: -200.0 },
130            circuit_breaker: CircuitBreakerConfig::default(),
131            sizing: SizingConfig {
132                margin_per_trade: 1_000.0,
133                leverage: 1,
134                max_contracts: 20,
135            },
136        }
137    }
138
139    /// The starting preset for an [`AssetClass`]: `CryptoSpot` →
140    /// [`Self::crypto_spot`], `Fx` → [`Self::fx`], `Future` → [`Self::futures`],
141    /// `Equity` → [`Self::equity`]; `CryptoPerp` and any other class fall back
142    /// to [`Self::crypto_perp`].
143    #[must_use]
144    pub fn preset_for(class: AssetClass) -> Self {
145        match class {
146            AssetClass::CryptoSpot => Self::crypto_spot(),
147            AssetClass::Fx => Self::fx(),
148            AssetClass::Future => Self::futures(),
149            AssetClass::Equity => Self::equity(),
150            AssetClass::CryptoPerp => Self::crypto_perp(),
151            // `AssetClass` is #[non_exhaustive]: `Other` and any future class
152            // fall back to the crypto-perp shape.
153            _ => Self::crypto_perp(),
154        }
155    }
156}
157
158/// What the execution service does when a bracket entry has been filled
159/// but its protective stop-loss leg could not be placed.
160///
161/// A bracket entry is placed *before* its SL + TP legs (the legs are
162/// reduce-only and need the position to exist). If the stop-loss leg is
163/// then rejected, the bot is holding a position with none of the
164/// protection the brain asked for. This policy decides what happens next.
165#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
166#[non_exhaustive]
167pub enum BracketFailurePolicy {
168    /// Immediately close the entry with a reduce-only market order
169    /// (default). The brain asked for a protected position; if the
170    /// framework can't protect it, it doesn't hold it. If the close
171    /// itself fails, an error is logged — at that point manual
172    /// intervention is required.
173    #[default]
174    CloseEntry,
175    /// Keep the position open and log an error. Use when the host has
176    /// its own protective layer (e.g. exchange-side account stops) and
177    /// an unprotected entry is acceptable.
178    KeepUnprotected,
179}
180
181/// Configuration for a [`Bot`].
182///
183/// Construct via [`BotConfig::builder`]. The builder validates every
184/// field on [`BotConfigBuilder::build`] and returns `Error::Config` on
185/// any violation — the framework never panics on bad config. `Bot::new`
186/// does a final brain-count check on top.
187///
188/// # Example
189///
190/// ```
191/// use std::time::Duration;
192/// use rustrade::BotConfig;
193///
194/// let config = BotConfig::builder()
195///     .name("market-maker")
196///     .symbols(["BTCUSDT", "ETHUSDT"])
197///     .shutdown_timeout(Duration::from_secs(5))
198///     .without_signal_handler() // tests + embedded use
199///     .build()
200///     .unwrap();
201///
202/// assert_eq!(config.name, "market-maker");
203/// assert_eq!(config.symbols.len(), 2);
204/// ```
205#[derive(Debug, Clone)]
206pub struct BotConfig {
207    /// Human-readable name used in logs, tracing spans, and supervisor
208    /// service identification.
209    pub name: String,
210    /// Symbols this bot trades. Every symbol gets a pre-seeded entry in
211    /// the risk-state map and the position cache. Must be non-empty —
212    /// the position cache and risk-state map would otherwise be empty,
213    /// which is a silent footgun.
214    pub symbols: Vec<Symbol>,
215    /// Maximum time to wait for services to drain on shutdown. Must be
216    /// `> 0`; the supervisor's drain logic needs a non-zero deadline.
217    pub shutdown_timeout: Duration,
218    /// Whether the supervisor installs its own Ctrl-C / SIGTERM handler.
219    /// Disable when the host service drives shutdown via [`BotHandle::shutdown`].
220    pub install_signal_handler: bool,
221    /// Capacity of the in-process market-data broadcast bus. Backed by
222    /// `tokio::sync::broadcast`, which has **drop-oldest** semantics: a
223    /// slow subscriber that falls behind by more than `capacity` events
224    /// sees `RecvError::Lagged(n)` and the oldest dropped events are
225    /// gone. Size this to absorb the worst-case latency between
226    /// publish and slowest subscriber's `recv`.
227    pub market_bus_capacity: usize,
228    /// Capacity of the in-process signal broadcast bus. Same drop-oldest
229    /// semantics as `market_bus_capacity`. Typically smaller — signals
230    /// are emitted ~once per non-Hold decision, far less frequent than
231    /// market events.
232    pub signal_bus_capacity: usize,
233    /// On shutdown, attempt to close any open position for each symbol
234    /// before exit, using `ExchangeClient::close_position`. Best-effort:
235    /// failures are logged but do not propagate.
236    pub close_positions_on_shutdown: bool,
237    /// Risk-layer defaults applied to every configured symbol that has no
238    /// entry in [`Self::per_symbol_risk`].
239    pub risk: RiskConfig,
240    /// Per-symbol risk overrides. A symbol present here uses its own
241    /// [`RiskConfig`] (session-PnL cap, circuit breaker, and sizing) instead
242    /// of [`Self::risk`] — e.g. a tighter drawdown cap on a volatile alt, or
243    /// a larger size on a flagship symbol. Symbols absent here use the
244    /// default. Resolve with [`Self::resolve_risk`].
245    pub per_symbol_risk: HashMap<Symbol, RiskConfig>,
246    /// Per-[`AssetClass`] risk overrides. A symbol whose
247    /// [`InstrumentSpec`](rustrade_core::InstrumentSpec) reports a class present
248    /// here uses that class's [`RiskConfig`] — unless a per-symbol override
249    /// also exists, which wins. Lets one bot apply crypto-perp / spot / FX /
250    /// futures rules side by side. See [`RiskConfig::preset_for`] for starting
251    /// presets and [`Self::resolve_risk`] for the precedence.
252    pub per_class_risk: HashMap<AssetClass, RiskConfig>,
253    /// Account-wide risk applied across **all** symbols: a daily-loss halt,
254    /// a max-concurrent-positions cap, and a gross-exposure cap. Complements
255    /// the per-symbol [`RiskConfig`] gates. Defaults to all-off (opt-in), so
256    /// a bot that doesn't set it behaves exactly as before.
257    pub portfolio: PortfolioRiskConfig,
258    /// What to do when a bracket entry fills but its protective stop-loss
259    /// leg fails to place. Defaults to [`BracketFailurePolicy::CloseEntry`]:
260    /// the unprotected entry is closed with a reduce-only market order.
261    pub bracket_failure_policy: BracketFailurePolicy,
262}
263
264impl BotConfig {
265    /// Begin building a config with sensible defaults.
266    pub fn builder() -> BotConfigBuilder {
267        BotConfigBuilder::default()
268    }
269
270    /// The effective [`RiskConfig`] for `symbol`, ignoring asset class: its
271    /// per-symbol override if set, else the bot-wide default ([`Self::risk`]).
272    /// Prefer [`Self::resolve_risk`], which also honours per-class overrides.
273    pub fn risk_for(&self, symbol: &Symbol) -> &RiskConfig {
274        self.per_symbol_risk.get(symbol).unwrap_or(&self.risk)
275    }
276
277    /// The effective [`RiskConfig`] for `symbol` of the given `asset_class`,
278    /// applying the precedence **per-symbol → per-class → default**. The
279    /// framework resolves `asset_class` from
280    /// [`ExchangeClient::instrument_spec`] at startup, so a multi-asset bot
281    /// gets the right rules per symbol without listing every one.
282    pub fn resolve_risk(&self, symbol: &Symbol, asset_class: AssetClass) -> &RiskConfig {
283        self.per_symbol_risk
284            .get(symbol)
285            .or_else(|| self.per_class_risk.get(&asset_class))
286            .unwrap_or(&self.risk)
287    }
288}
289
290/// Builder for [`BotConfig`].
291#[derive(Debug, Clone, Default)]
292pub struct BotConfigBuilder {
293    name: Option<String>,
294    symbols: Vec<Symbol>,
295    shutdown_timeout: Option<Duration>,
296    install_signal_handler: Option<bool>,
297    market_bus_capacity: Option<usize>,
298    signal_bus_capacity: Option<usize>,
299    close_positions_on_shutdown: Option<bool>,
300    risk: RiskConfig,
301    per_symbol_risk: HashMap<Symbol, RiskConfig>,
302    per_class_risk: HashMap<AssetClass, RiskConfig>,
303    portfolio: PortfolioRiskConfig,
304    bracket_failure_policy: Option<BracketFailurePolicy>,
305}
306
307impl BotConfigBuilder {
308    /// Human-readable bot name (logs, supervisor identification). Required.
309    pub fn name(mut self, name: impl Into<String>) -> Self {
310        self.name = Some(name.into());
311        self
312    }
313
314    /// Add a single symbol. Repeated calls accumulate.
315    pub fn symbol(mut self, sym: impl Into<Symbol>) -> Self {
316        self.symbols.push(sym.into());
317        self
318    }
319
320    /// Add many symbols at once. Repeated calls accumulate.
321    pub fn symbols<I, S>(mut self, syms: I) -> Self
322    where
323        I: IntoIterator<Item = S>,
324        S: Into<Symbol>,
325    {
326        self.symbols.extend(syms.into_iter().map(Into::into));
327        self
328    }
329
330    /// Maximum time to wait for services to drain on shutdown.
331    pub fn shutdown_timeout(mut self, dur: Duration) -> Self {
332        self.shutdown_timeout = Some(dur);
333        self
334    }
335
336    /// Disable the supervisor's signal handler — host drives shutdown.
337    pub fn without_signal_handler(mut self) -> Self {
338        self.install_signal_handler = Some(false);
339        self
340    }
341
342    /// Override the in-process market-data bus capacity (default 1024).
343    pub fn market_bus_capacity(mut self, cap: usize) -> Self {
344        self.market_bus_capacity = Some(cap);
345        self
346    }
347
348    /// Override the in-process signal-bus capacity (default 256).
349    pub fn signal_bus_capacity(mut self, cap: usize) -> Self {
350        self.signal_bus_capacity = Some(cap);
351        self
352    }
353
354    /// Enable best-effort `exchange.close_position` for non-flat positions
355    /// after the supervisor drains.
356    pub fn close_positions_on_shutdown(mut self, b: bool) -> Self {
357        self.close_positions_on_shutdown = Some(b);
358        self
359    }
360
361    /// Override the session-PnL config used for every symbol.
362    pub fn session_pnl_config(mut self, cfg: SessionPnlConfig) -> Self {
363        self.risk.session_pnl = cfg;
364        self
365    }
366
367    /// Override the circuit-breaker config used for every symbol.
368    pub fn circuit_breaker_config(mut self, cfg: CircuitBreakerConfig) -> Self {
369        self.risk.circuit_breaker = cfg;
370        self
371    }
372
373    /// Override the position-sizing config used for every symbol.
374    pub fn sizing_config(mut self, cfg: SizingConfig) -> Self {
375        self.risk.sizing = cfg;
376        self
377    }
378
379    /// Set the account-wide [`PortfolioRiskConfig`] — a daily-loss halt, a
380    /// max-concurrent-positions cap, and a gross-exposure cap applied across
381    /// all symbols. Unset, every limit is off (the per-symbol gates still apply).
382    ///
383    /// ```
384    /// use rustrade::{BotConfig, PortfolioRiskConfig};
385    ///
386    /// let cfg = BotConfig::builder()
387    ///     .name("bot")
388    ///     .symbols(["BTCUSDT", "ETHUSDT"])
389    ///     .without_signal_handler()
390    ///     .portfolio_config(PortfolioRiskConfig {
391    ///         max_daily_loss: -500.0,
392    ///         max_concurrent_positions: 3,
393    ///         max_gross_exposure: 50_000.0,
394    ///     })
395    ///     .build()
396    ///     .unwrap();
397    /// assert_eq!(cfg.portfolio.max_concurrent_positions, 3);
398    /// ```
399    pub fn portfolio_config(mut self, cfg: PortfolioRiskConfig) -> Self {
400        self.portfolio = cfg;
401        self
402    }
403
404    /// Set the [`BracketFailurePolicy`] — what happens when a bracket entry
405    /// fills but its protective stop-loss leg fails to place. Defaults to
406    /// [`BracketFailurePolicy::CloseEntry`].
407    pub fn bracket_failure_policy(mut self, policy: BracketFailurePolicy) -> Self {
408        self.bracket_failure_policy = Some(policy);
409        self
410    }
411
412    /// Set a per-symbol [`RiskConfig`] override. The given symbol then uses
413    /// this config (session-PnL cap, circuit breaker, sizing) instead of the
414    /// bot-wide default. Repeated calls for the same symbol replace the
415    /// previous override. Symbols without an override use the default.
416    ///
417    /// ```
418    /// use rustrade::{BotConfig, RiskConfig};
419    /// use rustrade::SessionPnlConfig;
420    ///
421    /// let cfg = BotConfig::builder()
422    ///     .name("bot")
423    ///     .symbols(["BTCUSDT", "DOGEUSDT"])
424    ///     .without_signal_handler()
425    ///     // tighter daily loss cap on the volatile alt
426    ///     .symbol_risk("DOGEUSDT", RiskConfig {
427    ///         session_pnl: SessionPnlConfig { loss_limit: -20.0 },
428    ///         ..Default::default()
429    ///     })
430    ///     .build()
431    ///     .unwrap();
432    /// assert_eq!(cfg.risk_for(&"DOGEUSDT".into()).session_pnl.loss_limit, -20.0);
433    /// ```
434    pub fn symbol_risk(mut self, symbol: impl Into<Symbol>, cfg: RiskConfig) -> Self {
435        self.per_symbol_risk.insert(symbol.into(), cfg);
436        self
437    }
438
439    /// Set a per-[`AssetClass`] [`RiskConfig`] override. Symbols whose
440    /// instrument reports this class use `cfg` (unless a per-symbol override
441    /// also exists, which wins). See [`RiskConfig::preset_for`] for ready-made
442    /// presets to start from.
443    ///
444    /// ```
445    /// use rustrade::{AssetClass, BotConfig, RiskConfig};
446    ///
447    /// let cfg = BotConfig::builder()
448    ///     .name("multi-asset")
449    ///     .symbols(["XBTUSDTM", "EURUSD"])
450    ///     .without_signal_handler()
451    ///     .class_risk(AssetClass::CryptoPerp, RiskConfig::crypto_perp())
452    ///     .class_risk(AssetClass::Fx, RiskConfig::fx())
453    ///     .build()
454    ///     .unwrap();
455    /// // FX symbols resolve to the FX preset (20× leverage) absent a
456    /// // per-symbol override.
457    /// assert_eq!(
458    ///     cfg.resolve_risk(&"EURUSD".into(), AssetClass::Fx).sizing.leverage,
459    ///     20
460    /// );
461    /// ```
462    pub fn class_risk(mut self, class: AssetClass, cfg: RiskConfig) -> Self {
463        self.per_class_risk.insert(class, cfg);
464        self
465    }
466
467    /// Validate and build. Returns `Error::Config` on any constraint
468    /// violation — the framework never panics on bad config.
469    pub fn build(self) -> Result<BotConfig> {
470        let name = self
471            .name
472            .filter(|n| !n.trim().is_empty())
473            .ok_or_else(|| Error::config("BotConfig.name is required and must not be empty"))?;
474
475        if self.symbols.is_empty() {
476            return Err(Error::config(
477                "BotConfig.symbols must contain at least one Symbol — \
478                 the position cache and risk-state map are pre-seeded per symbol",
479            ));
480        }
481
482        let market_bus_capacity = self
483            .market_bus_capacity
484            .unwrap_or(DEFAULT_MARKET_BUS_CAPACITY);
485        if market_bus_capacity == 0 {
486            return Err(Error::config(
487                "BotConfig.market_bus_capacity must be > 0 (broadcast channel cannot have 0 slots)",
488            ));
489        }
490
491        let signal_bus_capacity = self
492            .signal_bus_capacity
493            .unwrap_or(DEFAULT_SIGNAL_BUS_CAPACITY);
494        if signal_bus_capacity == 0 {
495            return Err(Error::config(
496                "BotConfig.signal_bus_capacity must be > 0 (broadcast channel cannot have 0 slots)",
497            ));
498        }
499
500        let shutdown_timeout = self.shutdown_timeout.unwrap_or(Duration::from_secs(30));
501        if shutdown_timeout.is_zero() {
502            return Err(Error::config(
503                "BotConfig.shutdown_timeout must be > 0 — drain needs a non-zero deadline",
504            ));
505        }
506
507        // Validate the default risk config and every per-symbol / per-class
508        // override with the same rules — bad config never reaches the runtime.
509        validate_risk(&self.risk, "BotConfig.risk")?;
510        for (sym, cfg) in &self.per_symbol_risk {
511            validate_risk(cfg, &format!("BotConfig.per_symbol_risk[{sym}]"))?;
512        }
513        for (class, cfg) in &self.per_class_risk {
514            validate_risk(cfg, &format!("BotConfig.per_class_risk[{class:?}]"))?;
515        }
516        validate_portfolio(&self.portfolio)?;
517
518        Ok(BotConfig {
519            name,
520            symbols: self.symbols,
521            shutdown_timeout,
522            install_signal_handler: self.install_signal_handler.unwrap_or(true),
523            market_bus_capacity,
524            signal_bus_capacity,
525            close_positions_on_shutdown: self.close_positions_on_shutdown.unwrap_or(false),
526            risk: self.risk,
527            per_symbol_risk: self.per_symbol_risk,
528            per_class_risk: self.per_class_risk,
529            portfolio: self.portfolio,
530            bracket_failure_policy: self.bracket_failure_policy.unwrap_or_default(),
531        })
532    }
533}
534
535/// Validate a [`RiskConfig`] (the default or a per-symbol override).
536/// `ctx` names the offending field in the error.
537fn validate_risk(risk: &RiskConfig, ctx: &str) -> Result<()> {
538    if risk.session_pnl.loss_limit.is_nan() {
539        return Err(Error::config(format!(
540            "{ctx}.session_pnl.loss_limit must not be NaN"
541        )));
542    }
543    if !risk.sizing.margin_per_trade.is_finite() || risk.sizing.margin_per_trade < 0.0 {
544        return Err(Error::config(format!(
545            "{ctx}.sizing.margin_per_trade must be a finite non-negative number"
546        )));
547    }
548    Ok(())
549}
550
551/// Validate the account-wide [`PortfolioRiskConfig`]. `NaN` limits would make
552/// every comparison silently false (a disabled limit that looks enabled), so
553/// they're rejected; `±∞` is allowed as the explicit "disabled" sentinel.
554fn validate_portfolio(p: &PortfolioRiskConfig) -> Result<()> {
555    if p.max_daily_loss.is_nan() {
556        return Err(Error::config(
557            "BotConfig.portfolio.max_daily_loss must not be NaN (use f64::NEG_INFINITY to disable)",
558        ));
559    }
560    if p.max_gross_exposure.is_nan() {
561        return Err(Error::config(
562            "BotConfig.portfolio.max_gross_exposure must not be NaN (use f64::INFINITY to disable)",
563        ));
564    }
565    Ok(())
566}
567
568/// The embedded trading bot.
569///
570/// Owns a [`Supervisor`], an [`ExchangeClient`], one or more [`Brain`]s,
571/// and the in-process [`MarketDataBus`]. Created via [`Bot::new`]; run
572/// via [`Bot::run_until_shutdown`]; observed and steered via the
573/// [`BotHandle`] returned from [`Bot::handle`].
574///
575/// # Example
576///
577/// ```no_run
578/// use std::sync::Arc;
579/// use rustrade::{Bot, BotConfig};
580/// # struct MyBrain;
581/// # #[async_trait::async_trait]
582/// # impl rustrade_core::Brain for MyBrain {
583/// #     fn name(&self) -> &str { "x" }
584/// #     async fn on_event(&self, _: &rustrade_core::MarketDataEvent, _: &rustrade_core::Position)
585/// #         -> rustrade_core::Result<rustrade_core::Decision> {
586/// #         Ok(rustrade_core::Decision::hold())
587/// #     }
588/// #     async fn health(&self) -> rustrade_core::BrainHealth { rustrade_core::BrainHealth::ok() }
589/// # }
590/// # async fn run(exchange: Arc<dyn rustrade_core::ExchangeClient>) -> anyhow::Result<()> {
591/// let config = BotConfig::builder()
592///     .name("my-bot")
593///     .symbol("BTCUSDT")
594///     .build()?;
595///
596/// let bot = Bot::new(config, exchange, vec![Arc::new(MyBrain) as Arc<dyn rustrade_core::Brain>])?;
597/// let handle = bot.handle();
598///
599/// // Spawn the bot; ask it to shut down from elsewhere.
600/// let task = tokio::spawn(async move { bot.run_until_shutdown().await });
601/// handle.shutdown();
602/// task.await??;
603/// # Ok(())
604/// # }
605/// ```
606pub struct Bot {
607    config: BotConfig,
608    supervisor: Arc<Supervisor>,
609    exchange: Arc<dyn ExchangeClient>,
610    brains: Arc<Vec<Arc<dyn Brain>>>,
611    market_bus: MarketDataBus,
612    signal_bus: SignalBus,
613    positions: PositionCache,
614    risk: RiskStateMap,
615    metrics: Arc<dyn MetricsSink>,
616    state_store: Option<Arc<dyn StateStore>>,
617    persister_slot: crate::handle::PersisterSlot,
618    handle: BotHandle,
619    external_cancel: Option<CancellationToken>,
620    market_source: Option<Arc<dyn MarketSource>>,
621    fill_source: Option<Arc<dyn FillSource>>,
622    candle_pollers: Vec<CandlePollerSpec>,
623    order_tracker: OrderTracker,
624    order_tracking: Option<OrderTrackingSpec>,
625}
626
627/// Settings captured by `Bot::with_order_tracking`.
628struct OrderTrackingSpec {
629    ttl: Duration,
630    poll_cadence: Duration,
631}
632
633/// One registered `(symbol, interval, cadence)` for `Bot::with_candle_poller`.
634struct CandlePollerSpec {
635    source: Arc<dyn CandleSource>,
636    symbol: Symbol,
637    interval: Duration,
638    poll_cadence: Duration,
639    limit: usize,
640}
641
642impl Bot {
643    /// Construct a `Bot`. Validates that at least one brain is provided.
644    ///
645    /// The exchange client and brain set are immutable for the bot's
646    /// lifetime — to change them, build a new `Bot`.
647    pub fn new(
648        config: BotConfig,
649        exchange: Arc<dyn ExchangeClient>,
650        brains: Vec<Arc<dyn Brain>>,
651    ) -> Result<Self> {
652        if brains.is_empty() {
653            return Err(Error::config(
654                "Bot::new requires at least one Brain — empty brain list",
655            ));
656        }
657
658        // Multi-brain arbitration guard: no two brains may claim ownership of
659        // the same symbol via `Brain::owned_symbols` — that would let two
660        // strategies fight over one position. Brains returning `None` opt out
661        // and self-coordinate (current behaviour).
662        let mut claimed: HashMap<Symbol, String> = HashMap::new();
663        for brain in &brains {
664            let Some(syms) = brain.owned_symbols() else {
665                continue;
666            };
667            for sym in syms.into_iter().collect::<std::collections::HashSet<_>>() {
668                if let Some(other) = claimed.insert(sym.clone(), brain.name().to_string()) {
669                    return Err(Error::config(format!(
670                        "brains '{other}' and '{}' both declare ownership of {sym} — \
671                         overlapping owned_symbols would fight over one position",
672                        brain.name()
673                    )));
674                }
675            }
676        }
677
678        let supervisor = Arc::new(Supervisor::new(
679            SupervisorConfig::default()
680                .with_shutdown_timeout(config.shutdown_timeout)
681                .with_default_backoff(Default::default())
682                .pipe(|c| {
683                    if config.install_signal_handler {
684                        c
685                    } else {
686                        c.without_signal_handler()
687                    }
688                }),
689        ));
690
691        let market_bus = MarketDataBus::with_capacity(config.market_bus_capacity);
692        let signal_bus = SignalBus::with_capacity(config.signal_bus_capacity);
693        let positions = build_position_cache(&config.symbols);
694        let risk = build_risk_state(&config.symbols, |sym| {
695            // Resolve per-symbol → per-class (by the adapter's asset class) →
696            // default, so a multi-asset bot gets the right gates per symbol.
697            let class = exchange.instrument_spec(sym).asset_class;
698            let r = config.resolve_risk(sym, class);
699            (r.session_pnl.clone(), r.circuit_breaker.clone())
700        });
701
702        let brains = Arc::new(brains);
703        let persister_slot: crate::handle::PersisterSlot = Arc::new(std::sync::OnceLock::new());
704        let order_tracker = OrderTracker::new();
705        let handle = BotHandle::new(
706            supervisor.clone(),
707            brains.clone(),
708            risk.clone(),
709            positions.clone(),
710            signal_bus.clone(),
711            persister_slot.clone(),
712            order_tracker.clone(),
713        );
714
715        Ok(Self {
716            config,
717            supervisor,
718            exchange,
719            brains,
720            market_bus,
721            signal_bus,
722            positions,
723            risk,
724            metrics: Arc::new(NoopSink),
725            state_store: None,
726            persister_slot,
727            order_tracker,
728            handle,
729            external_cancel: None,
730            market_source: None,
731            fill_source: None,
732            candle_pollers: Vec::new(),
733            order_tracking: None,
734        })
735    }
736
737    /// Install a [`MetricsSink`]. The framework's services emit
738    /// counters and histograms to this sink on every observable event;
739    /// the default is [`NoopSink`], which discards everything.
740    pub fn with_metrics(mut self, sink: Arc<dyn MetricsSink>) -> Self {
741        self.metrics = sink;
742        self
743    }
744
745    /// Install a [`StateStore`] so per-symbol risk state (session PnL and
746    /// circuit breaker) survives restarts.
747    ///
748    /// Without a store, risk state is in-memory only: a crash mid-session
749    /// resets the daily drawdown cap and the loss-streak breaker. With one
750    /// wired, the bot:
751    ///
752    /// - **Restores** each symbol's snapshot on startup, then applies the
753    ///   stale-snapshot policy — a session from an earlier UTC day rolls
754    ///   over to fresh, and a breaker whose cooldown elapsed during
755    ///   downtime auto-resets.
756    /// - **Persists** after every realised trade (whether fed via
757    ///   [`BotHandle::record_trade_outcome`](crate::BotHandle::record_trade_outcome)
758    ///   or auto-routed by the `FillRoutingService`).
759    /// - **Flushes** on graceful shutdown.
760    ///
761    /// Use [`rustrade_core::InMemoryStore`] for a non-durable default, or a
762    /// disk-/database-backed implementation from a downstream crate for
763    /// real durability. Snapshots are keyed by `(bot name, symbol)`, so
764    /// distinct bots can share one backend without collision.
765    pub fn with_state_store(mut self, store: Arc<dyn StateStore>) -> Self {
766        self.state_store = Some(store);
767        self
768    }
769
770    /// Enable resting-order lifecycle tracking.
771    ///
772    /// The [`ExecutionService`] records every resting order it places (limit
773    /// / post-only / IOC / FOK — market orders never rest, so they're
774    /// skipped), and a supervised [`OrderReaperService`] periodically:
775    ///
776    /// - **reconciles** the tracker against `exchange.get_open_orders` (so
777    ///   orders filled or cancelled out-of-band stop being tracked), and
778    /// - **cancels** any resting order older than `ttl` via
779    ///   `exchange.cancel_order`.
780    ///
781    /// `poll_cadence` is how often a sweep runs. The reaper is only spawned
782    /// if the adapter advertises [`Capability::OrderTracking`]; otherwise the
783    /// call is a no-op (with a warning at startup), since the framework can't
784    /// list or cancel orders without it. Live tracked orders are visible via
785    /// [`BotHandle::tracked_orders`](crate::BotHandle::tracked_orders).
786    pub fn with_order_tracking(mut self, ttl: Duration, poll_cadence: Duration) -> Self {
787        self.order_tracking = Some(OrderTrackingSpec { ttl, poll_cadence });
788        self
789    }
790
791    /// Register a [`CandleSource`] to be polled every `poll_cadence` for
792    /// `(symbol, interval)`. Polled candles are deduplicated by
793    /// timestamp and published to the bot's [`MarketDataBus`]. Repeated
794    /// calls accumulate — one supervised service per registered tuple.
795    pub fn with_candle_poller(
796        mut self,
797        source: Arc<dyn CandleSource>,
798        symbol: impl Into<Symbol>,
799        interval: Duration,
800        poll_cadence: Duration,
801        limit: usize,
802    ) -> Self {
803        self.candle_pollers.push(CandlePollerSpec {
804            source,
805            symbol: symbol.into(),
806            interval,
807            poll_cadence,
808            limit,
809        });
810        self
811    }
812
813    /// Tie this bot's shutdown to an externally-owned cancellation token.
814    ///
815    /// When the external token is cancelled, the bot's supervisor token
816    /// is cancelled too — equivalent to calling [`BotHandle::shutdown`]
817    /// but without spawning a linker task in the host.
818    ///
819    /// The reverse is not true: cancelling the bot does not cancel the
820    /// external token.
821    pub fn with_external_cancel(mut self, token: CancellationToken) -> Self {
822        self.external_cancel = Some(token);
823        self
824    }
825
826    /// Attach a [`MarketSource`] to be driven by a supervised
827    /// [`MarketFeedService`]. Source implementors are responsible for
828    /// publishing to the bot's [`MarketDataBus`] (obtain via
829    /// `bot.market_data_bus().clone()` before constructing the source).
830    pub fn with_market_source(mut self, source: Arc<dyn MarketSource>) -> Self {
831        self.market_source = Some(source);
832        self
833    }
834
835    /// Attach a [`FillSource`] to be driven by a supervised
836    /// [`FillRoutingService`]. Fills are routed to every brain via
837    /// `Brain::on_fill` and the position cache is refreshed from the
838    /// exchange after each one.
839    pub fn with_fill_source(mut self, source: Arc<dyn FillSource>) -> Self {
840        self.fill_source = Some(source);
841        self
842    }
843
844    /// Cheap cloneable handle for host services. Can be obtained at any
845    /// point — call before [`Self::run_until_shutdown`] so the host can
846    /// drive shutdown while the bot is running.
847    pub fn handle(&self) -> BotHandle {
848        self.handle.clone()
849    }
850
851    /// Reference to the bot's configuration.
852    pub fn config(&self) -> &BotConfig {
853        &self.config
854    }
855
856    /// Borrow the in-process market-data bus. Host services and adapters
857    /// publish here; the bot's framework services subscribe.
858    pub fn market_data_bus(&self) -> &MarketDataBus {
859        &self.market_bus
860    }
861
862    /// Borrow the in-process signal bus. The execution service publishes
863    /// a [`Signal`](rustrade_core::Signal) to this bus on every
864    /// non-`Hold` decision the brain emits; host services subscribe via
865    /// [`BotHandle::subscribe_signals`].
866    pub fn signal_bus(&self) -> &SignalBus {
867        &self.signal_bus
868    }
869
870    /// Spawn the framework services and run until shutdown.
871    ///
872    /// Returns after all spawned services have drained (or the configured
873    /// shutdown timeout elapses). Consumes `self` to make the
874    /// "construct → run → exit" lifecycle explicit; persistent observation
875    /// of the running bot is done via the [`BotHandle`] obtained earlier.
876    ///
877    /// # Runtime requirements
878    ///
879    /// - **Multi-thread tokio runtime.** The supervisor spawns each
880    ///   service onto `tokio::spawn`. A current-thread runtime works
881    ///   for small loads but loses the per-service parallelism the
882    ///   framework is designed for. Use
883    ///   `#[tokio::main(flavor = "multi_thread")]` or
884    ///   `tokio::runtime::Builder::new_multi_thread()` in the host.
885    /// - **`tokio::spawn` is used internally.** Anywhere the host
886    ///   embeds this method, a tokio runtime context must be active.
887    /// - **No nested runtimes.** `Bot::run_until_shutdown` is async; do
888    ///   not call `block_on` on it from inside another runtime.
889    ///
890    /// # Resource expectations
891    ///
892    /// - **Memory per active symbol:** O(few hundred bytes) for the
893    ///   position cache entry, `SymbolRisk` (a `SessionPnl` plus a
894    ///   `CircuitBreaker` whose ring buffer is bounded by
895    ///   `loss_limit`), plus the per-symbol slot in any host-owned
896    ///   subscriber.
897    /// - **Channel buffers:** `market_bus_capacity` + `signal_bus_capacity`
898    ///   slots per bus, each slot holding a clone of `MarketDataEvent` /
899    ///   `Signal`. Drop-oldest semantics — back-pressure is *not*
900    ///   propagated to publishers.
901    /// - **Expected shutdown time:** ≤ `shutdown_timeout`. A
902    ///   well-behaved service responds to its cancel token in
903    ///   milliseconds; the timeout is the worst-case bound, not the
904    ///   typical case.
905    /// - **Restart-after-crash latency:** bounded by
906    ///   [`BackoffConfig`](rustrade_supervisor::BackoffConfig). Defaults:
907    ///   100 ms base, 60 s cap, 10 retries within a 10-minute window
908    ///   before the circuit breaker trips.
909    pub async fn run_until_shutdown(self) -> anyhow::Result<()> {
910        tracing::info!(
911            bot = %self.config.name,
912            brains = self.brains.len(),
913            symbols = self.config.symbols.len(),
914            exchange = %self.exchange.name(),
915            "rustrade Bot starting"
916        );
917
918        // Best-effort position prefetch — failures don't block startup.
919        self.prefetch_positions().await;
920
921        // Restore persisted risk state (if a store is wired) before any
922        // service runs, then publish the persister so the per-trade paths
923        // can save through it.
924        let persister = self
925            .state_store
926            .clone()
927            .map(|store| RiskPersister::new(store, self.config.name.clone()));
928        if let Some(p) = &persister {
929            p.restore_into(&self.risk).await;
930            // OnceLock: set is infallible the first (only) time per bot.
931            let _ = self.persister_slot.set(p.clone());
932        }
933
934        // Order tracking: only active when wired AND the adapter can list +
935        // cancel orders. Gate once here so both the execution service (which
936        // records resting orders) and the reaper agree.
937        let order_tracking_active =
938            self.order_tracking.is_some() && self.exchange.supports(Capability::OrderTracking);
939        if self.order_tracking.is_some() && !order_tracking_active {
940            tracing::warn!(
941                exchange = %self.exchange.name(),
942                "order tracking requested but adapter lacks Capability::OrderTracking — \
943                 resting orders will NOT be tracked or aged out"
944            );
945        }
946
947        // Bracket (SL+TP / OCO) orders need three things: place protective
948        // stops (StopOrders), cancel the sibling on fill (OrderTracking), and
949        // detect that fill (a fill source). When all hold, a shared OCO
950        // registry links each bracket's legs; otherwise a brain emitting both
951        // SL+TP falls back to a single attached stop-loss (see
952        // `ExecutionService::attach_protection`).
953        let brackets_active = self.exchange.supports(Capability::StopOrders)
954            && self.exchange.supports(Capability::OrderTracking)
955            && self.fill_source.is_some();
956        let oco = brackets_active.then(crate::order_tracker::OcoRegistry::new);
957        tracing::info!(brackets_active, "bracket (SL+TP/OCO) support");
958
959        // Resolve each symbol's effective sizing (per-symbol → per-class by
960        // asset class → default) so multi-asset bots size each class correctly.
961        let sizing = Arc::new(crate::execution::SymbolSizing::new(
962            self.config.risk.sizing.clone(),
963            self.config
964                .symbols
965                .iter()
966                .map(|s| {
967                    let class = self.exchange.instrument_spec(s).asset_class;
968                    (s.clone(), self.config.resolve_risk(s, class).sizing.clone())
969                })
970                .collect(),
971        ));
972        // Account-wide portfolio risk, shared between the execution gate
973        // (reads it) and the risk sweep (maintains its daily-loss latch).
974        let portfolio: PortfolioRiskState = build_portfolio_risk(self.config.portfolio.clone());
975
976        // Pending-entry reservations shared by every execution service and
977        // the fill router — makes the portfolio gate check-and-reserve.
978        let pending = crate::pending::PendingEntryLedger::new(PENDING_ENTRY_TTL);
979
980        let ctx = ExecutionContext {
981            exchange: self.exchange.clone(),
982            bus: self.market_bus.clone(),
983            signals: self.signal_bus.clone(),
984            positions: self.positions.clone(),
985            risk: self.risk.clone(),
986            portfolio: portfolio.clone(),
987            sizing,
988            order_tracker: order_tracking_active.then(|| self.order_tracker.clone()),
989            oco: oco.clone(),
990            bracket_failure_policy: self.config.bracket_failure_policy,
991            pending: pending.clone(),
992        };
993
994        for brain in self.brains.iter() {
995            let svc = ExecutionService::new(brain.clone(), ctx.clone());
996            self.supervisor.spawn_service(Box::new(svc));
997        }
998
999        // Periodic risk sweep: rolls per-symbol SessionPnl / CircuitBreaker and
1000        // the portfolio halt over at 00:00 UTC during a live run (without it,
1001        // tick() only fires on restart), and re-derives the account daily-loss
1002        // latch from the per-symbol session PnLs.
1003        self.supervisor
1004            .spawn_service(Box::new(RiskSweepService::new(
1005                self.risk.clone(),
1006                portfolio.clone(),
1007                RISK_SWEEP_CADENCE,
1008            )));
1009
1010        if order_tracking_active {
1011            // Safe: order_tracking_active implies order_tracking is Some.
1012            let spec = self.order_tracking.as_ref().unwrap();
1013            self.supervisor
1014                .spawn_service(Box::new(OrderReaperService::new(
1015                    self.exchange.clone(),
1016                    self.order_tracker.clone(),
1017                    self.config.symbols.clone(),
1018                    spec.ttl,
1019                    spec.poll_cadence,
1020                    self.metrics.clone(),
1021                )));
1022        }
1023
1024        if let Some(source) = self.market_source.clone() {
1025            self.supervisor
1026                .spawn_service(Box::new(MarketFeedService::new(source)));
1027        }
1028
1029        if let Some(source) = self.fill_source.clone() {
1030            self.supervisor
1031                .spawn_service(Box::new(FillRoutingService::new(
1032                    source,
1033                    self.brains.clone(),
1034                    self.exchange.clone(),
1035                    self.positions.clone(),
1036                    self.risk.clone(),
1037                    self.metrics.clone(),
1038                    persister.clone(),
1039                    oco.clone(),
1040                    pending.clone(),
1041                )));
1042        }
1043
1044        for spec in &self.candle_pollers {
1045            self.supervisor
1046                .spawn_service(Box::new(CandlePollerService::new(
1047                    spec.source.clone(),
1048                    spec.symbol.clone(),
1049                    spec.interval,
1050                    spec.poll_cadence,
1051                    spec.limit,
1052                    self.market_bus.clone(),
1053                    self.metrics.clone(),
1054                )));
1055        }
1056
1057        // External cancellation linker: when the host's token fires,
1058        // cancel the supervisor's root token. The reverse is not wired.
1059        if let Some(external) = self.external_cancel.clone() {
1060            let supervisor = self.supervisor.clone();
1061            tokio::spawn(async move {
1062                external.cancelled().await;
1063                tracing::info!("external cancellation received; triggering bot shutdown");
1064                supervisor.trigger_shutdown();
1065            });
1066        }
1067
1068        let run_result = self.supervisor.run_until_shutdown().await;
1069
1070        if self.config.close_positions_on_shutdown {
1071            self.close_open_positions().await;
1072        }
1073
1074        // Persist final risk state + flush before exit so the next boot
1075        // restores an up-to-date snapshot.
1076        if let Some(p) = &persister {
1077            p.persist_all(&self.risk).await;
1078        }
1079
1080        for brain in self.brains.iter() {
1081            let health = brain.health().await;
1082            tracing::info!(
1083                brain = %brain.name(),
1084                healthy = health.healthy,
1085                events = health.events_processed,
1086                non_hold = health.non_hold_decisions,
1087                "final brain health"
1088            );
1089        }
1090
1091        tracing::info!(bot = %self.config.name, "rustrade Bot exited");
1092        run_result
1093    }
1094
1095    async fn prefetch_positions(&self) {
1096        for symbol in &self.config.symbols {
1097            match self.exchange.get_position(symbol).await {
1098                Ok(pos) => {
1099                    self.positions.write().await.insert(symbol.clone(), pos);
1100                    tracing::debug!(
1101                        symbol = %symbol,
1102                        qty = pos.qty,
1103                        "prefetched position from exchange"
1104                    );
1105                }
1106                Err(e) => {
1107                    tracing::warn!(
1108                        symbol = %symbol,
1109                        error = %e,
1110                        "failed to prefetch position; cache defaults to FLAT"
1111                    );
1112                }
1113            }
1114        }
1115    }
1116
1117    async fn close_open_positions(&self) {
1118        let snapshot: Vec<(Symbol, Position)> = {
1119            let map = self.positions.read().await;
1120            map.iter()
1121                .filter(|(_, p)| !p.is_flat())
1122                .map(|(s, p)| (s.clone(), *p))
1123                .collect()
1124        };
1125
1126        if snapshot.is_empty() {
1127            tracing::info!("close_positions_on_shutdown: no open positions");
1128            return;
1129        }
1130
1131        for (symbol, position) in snapshot {
1132            match self.exchange.close_position(&symbol, &position).await {
1133                Ok(order_id) => tracing::info!(
1134                    symbol = %symbol,
1135                    qty = position.qty,
1136                    order_id = %order_id,
1137                    "close_positions_on_shutdown: closed"
1138                ),
1139                Err(e) => tracing::error!(
1140                    symbol = %symbol,
1141                    qty = position.qty,
1142                    error = %e,
1143                    "close_positions_on_shutdown: failed (best-effort)"
1144                ),
1145            }
1146        }
1147    }
1148}
1149
1150// Tiny `pipe` helper local to this module for builder ergonomics — keeps
1151// the `Bot::new` body readable when conditionally applying builder methods.
1152trait Pipe: Sized {
1153    fn pipe<F: FnOnce(Self) -> Self>(self, f: F) -> Self {
1154        f(self)
1155    }
1156}
1157impl<T> Pipe for T {}
1158
1159#[cfg(test)]
1160mod tests {
1161    use super::*;
1162    use async_trait::async_trait;
1163    use rustrade_core::{Fill, MarketDataEvent, Order, Position};
1164
1165    #[test]
1166    fn presets_differ_in_leverage_by_class() {
1167        assert_eq!(RiskConfig::crypto_perp().sizing.leverage, 5);
1168        assert_eq!(RiskConfig::crypto_spot().sizing.leverage, 1);
1169        assert_eq!(RiskConfig::fx().sizing.leverage, 20);
1170        assert_eq!(RiskConfig::futures().sizing.leverage, 10);
1171        assert_eq!(RiskConfig::equity().sizing.leverage, 1);
1172    }
1173
1174    #[test]
1175    fn preset_for_maps_each_class_with_perp_fallback() {
1176        assert_eq!(
1177            RiskConfig::preset_for(AssetClass::CryptoSpot)
1178                .sizing
1179                .leverage,
1180            1
1181        );
1182        assert_eq!(RiskConfig::preset_for(AssetClass::Fx).sizing.leverage, 20);
1183        assert_eq!(
1184            RiskConfig::preset_for(AssetClass::Future).sizing.leverage,
1185            10
1186        );
1187        assert_eq!(
1188            RiskConfig::preset_for(AssetClass::Equity).sizing.leverage,
1189            1
1190        );
1191        // CryptoPerp and the catch-all (`Other` / future classes) → perp shape.
1192        assert_eq!(
1193            RiskConfig::preset_for(AssetClass::CryptoPerp)
1194                .sizing
1195                .leverage,
1196            5
1197        );
1198        assert_eq!(RiskConfig::preset_for(AssetClass::Other).sizing.leverage, 5);
1199    }
1200
1201    #[test]
1202    fn resolve_risk_precedence_symbol_then_class_then_default() {
1203        let cfg = BotConfig::builder()
1204            .name("t")
1205            .symbols(["XBTUSDTM", "EURUSD", "ETHUSDTM"])
1206            .without_signal_handler()
1207            .class_risk(AssetClass::Fx, RiskConfig::fx())
1208            .symbol_risk(
1209                "ETHUSDTM",
1210                RiskConfig {
1211                    sizing: SizingConfig {
1212                        margin_per_trade: 1.0,
1213                        leverage: 7,
1214                        max_contracts: 1,
1215                    },
1216                    ..Default::default()
1217                },
1218            )
1219            .build()
1220            .unwrap();
1221
1222        // Per-symbol override wins even when a class override would also match.
1223        assert_eq!(
1224            cfg.resolve_risk(&"ETHUSDTM".into(), AssetClass::Fx)
1225                .sizing
1226                .leverage,
1227            7
1228        );
1229        // No per-symbol override → the per-class override applies.
1230        assert_eq!(
1231            cfg.resolve_risk(&"EURUSD".into(), AssetClass::Fx)
1232                .sizing
1233                .leverage,
1234            20
1235        );
1236        // Neither → the bot-wide default.
1237        let default_lev = RiskConfig::default().sizing.leverage;
1238        assert_eq!(
1239            cfg.resolve_risk(&"XBTUSDTM".into(), AssetClass::CryptoPerp)
1240                .sizing
1241                .leverage,
1242            default_lev
1243        );
1244    }
1245
1246    struct NoopBrain;
1247    #[async_trait]
1248    impl Brain for NoopBrain {
1249        fn name(&self) -> &str {
1250            "noop"
1251        }
1252        async fn on_event(
1253            &self,
1254            _e: &MarketDataEvent,
1255            _p: &Position,
1256        ) -> Result<rustrade_core::Decision> {
1257            Ok(rustrade_core::Decision::hold())
1258        }
1259    }
1260
1261    /// Brain that declares ownership of a fixed symbol set, for the
1262    /// multi-brain arbitration guard tests.
1263    struct OwningBrain {
1264        name: &'static str,
1265        owns: Vec<Symbol>,
1266    }
1267    #[async_trait]
1268    impl Brain for OwningBrain {
1269        fn name(&self) -> &str {
1270            self.name
1271        }
1272        fn owned_symbols(&self) -> Option<Vec<Symbol>> {
1273            Some(self.owns.clone())
1274        }
1275        async fn on_event(
1276            &self,
1277            _e: &MarketDataEvent,
1278            _p: &Position,
1279        ) -> Result<rustrade_core::Decision> {
1280            Ok(rustrade_core::Decision::hold())
1281        }
1282    }
1283
1284    struct NoopExchange;
1285    #[async_trait]
1286    impl ExchangeClient for NoopExchange {
1287        fn name(&self) -> &str {
1288            "noop"
1289        }
1290        async fn place_order(&self, _o: &Order) -> Result<String> {
1291            Ok("noop-1".into())
1292        }
1293        async fn cancel_all(&self, _s: &Symbol) -> Result<usize> {
1294            Ok(0)
1295        }
1296        async fn close_position(&self, _s: &Symbol, _p: &Position) -> Result<String> {
1297            Ok("noop-close".into())
1298        }
1299        async fn get_position(&self, _s: &Symbol) -> Result<Position> {
1300            Ok(Position::FLAT)
1301        }
1302        async fn get_balance(&self, _c: &str) -> Result<f64> {
1303            Ok(0.0)
1304        }
1305    }
1306
1307    fn cfg() -> BotConfig {
1308        BotConfig::builder()
1309            .name("test")
1310            .symbol("BTCUSDT")
1311            .without_signal_handler()
1312            .build()
1313            .unwrap()
1314    }
1315
1316    #[test]
1317    fn builder_requires_name() {
1318        let err = BotConfig::builder().build().unwrap_err();
1319        assert!(matches!(err, Error::Config(_)), "got {err:?}");
1320    }
1321
1322    #[test]
1323    fn builder_rejects_blank_name() {
1324        let err = BotConfig::builder().name("   ").build().unwrap_err();
1325        assert!(matches!(err, Error::Config(_)), "got {err:?}");
1326    }
1327
1328    #[test]
1329    fn builder_rejects_zero_market_bus_capacity() {
1330        let err = BotConfig::builder()
1331            .name("x")
1332            .symbol("BTCUSDT")
1333            .market_bus_capacity(0)
1334            .build()
1335            .unwrap_err();
1336        assert!(matches!(err, Error::Config(_)));
1337    }
1338
1339    #[test]
1340    fn builder_rejects_zero_signal_bus_capacity() {
1341        let err = BotConfig::builder()
1342            .name("x")
1343            .symbol("BTCUSDT")
1344            .signal_bus_capacity(0)
1345            .build()
1346            .unwrap_err();
1347        assert!(matches!(err, Error::Config(_)));
1348    }
1349
1350    #[test]
1351    fn builder_rejects_empty_symbol_list() {
1352        let err = BotConfig::builder().name("x").build().unwrap_err();
1353        assert!(matches!(err, Error::Config(_)));
1354    }
1355
1356    #[test]
1357    fn builder_rejects_zero_shutdown_timeout() {
1358        let err = BotConfig::builder()
1359            .name("x")
1360            .symbol("BTCUSDT")
1361            .shutdown_timeout(Duration::ZERO)
1362            .build()
1363            .unwrap_err();
1364        assert!(matches!(err, Error::Config(_)));
1365    }
1366
1367    #[test]
1368    fn builder_rejects_nan_loss_limit() {
1369        let err = BotConfig::builder()
1370            .name("x")
1371            .symbol("BTCUSDT")
1372            .session_pnl_config(SessionPnlConfig {
1373                loss_limit: f64::NAN,
1374            })
1375            .build()
1376            .unwrap_err();
1377        assert!(matches!(err, Error::Config(_)));
1378    }
1379
1380    #[test]
1381    fn builder_rejects_non_finite_margin() {
1382        let err = BotConfig::builder()
1383            .name("x")
1384            .symbol("BTCUSDT")
1385            .sizing_config(SizingConfig {
1386                margin_per_trade: f64::INFINITY,
1387                leverage: 1,
1388                max_contracts: 1,
1389            })
1390            .build()
1391            .unwrap_err();
1392        assert!(matches!(err, Error::Config(_)));
1393    }
1394
1395    #[test]
1396    fn builder_accumulates_symbols() {
1397        let c = BotConfig::builder()
1398            .name("x")
1399            .symbol("A")
1400            .symbols(["B", "C"])
1401            .build()
1402            .unwrap();
1403        assert_eq!(c.symbols.len(), 3);
1404        assert_eq!(c.symbols[0], Symbol::new("A"));
1405        assert_eq!(c.symbols[2], Symbol::new("C"));
1406    }
1407
1408    #[test]
1409    fn builder_accepts_risk_overrides() {
1410        let c = BotConfig::builder()
1411            .name("x")
1412            .symbol("BTCUSDT")
1413            .session_pnl_config(SessionPnlConfig { loss_limit: -123.0 })
1414            .sizing_config(SizingConfig {
1415                margin_per_trade: 250.0,
1416                leverage: 10,
1417                max_contracts: 5,
1418            })
1419            .build()
1420            .unwrap();
1421        assert_eq!(c.risk.session_pnl.loss_limit, -123.0);
1422        assert_eq!(c.risk.sizing.leverage, 10);
1423    }
1424
1425    #[test]
1426    fn per_symbol_risk_override_and_fallback() {
1427        let c = BotConfig::builder()
1428            .name("x")
1429            .symbols(["BTCUSDT", "ETHUSDT"])
1430            .session_pnl_config(SessionPnlConfig { loss_limit: -100.0 }) // default
1431            .symbol_risk(
1432                "BTCUSDT",
1433                RiskConfig {
1434                    session_pnl: SessionPnlConfig { loss_limit: -25.0 },
1435                    ..Default::default()
1436                },
1437            )
1438            .build()
1439            .unwrap();
1440        // BTC uses its override; ETH (no override) uses the default.
1441        assert_eq!(
1442            c.risk_for(&Symbol::from("BTCUSDT")).session_pnl.loss_limit,
1443            -25.0
1444        );
1445        assert_eq!(
1446            c.risk_for(&Symbol::from("ETHUSDT")).session_pnl.loss_limit,
1447            -100.0
1448        );
1449    }
1450
1451    #[test]
1452    fn builder_rejects_invalid_per_symbol_override() {
1453        let err = BotConfig::builder()
1454            .name("x")
1455            .symbol("BTCUSDT")
1456            .symbol_risk(
1457                "BTCUSDT",
1458                RiskConfig {
1459                    session_pnl: SessionPnlConfig {
1460                        loss_limit: f64::NAN,
1461                    },
1462                    ..Default::default()
1463                },
1464            )
1465            .build()
1466            .unwrap_err();
1467        assert!(matches!(err, Error::Config(_)), "got {err:?}");
1468    }
1469
1470    #[test]
1471    fn builder_has_separate_default_bus_capacities() {
1472        let c = BotConfig::builder()
1473            .name("x")
1474            .symbol("BTCUSDT")
1475            .build()
1476            .unwrap();
1477        assert_eq!(c.market_bus_capacity, 1024);
1478        assert_eq!(c.signal_bus_capacity, 256);
1479    }
1480
1481    #[tokio::test]
1482    async fn bot_requires_at_least_one_brain() {
1483        match Bot::new(cfg(), Arc::new(NoopExchange), vec![]) {
1484            Err(Error::Config(_)) => {}
1485            other => panic!(
1486                "expected Error::Config for empty brain list, got {:?}",
1487                other.map(|_| "Ok(Bot)").map_err(|e| format!("Err({e})"))
1488            ),
1489        }
1490    }
1491
1492    #[tokio::test]
1493    async fn bot_constructs_and_exposes_handle() {
1494        let bot = Bot::new(cfg(), Arc::new(NoopExchange), vec![Arc::new(NoopBrain)]).unwrap();
1495        let handle = bot.handle();
1496        assert!(!handle.is_shutting_down());
1497        assert_eq!(bot.config().name, "test");
1498        let h2 = handle.clone();
1499        assert!(!h2.is_shutting_down());
1500    }
1501
1502    #[tokio::test]
1503    async fn rejects_overlapping_owned_symbols() {
1504        // Two brains both claim BTCUSDT → config error.
1505        let cfg = BotConfig::builder()
1506            .name("x")
1507            .symbols(["BTCUSDT", "ETHUSDT"])
1508            .without_signal_handler()
1509            .build()
1510            .unwrap();
1511        let a = Arc::new(OwningBrain {
1512            name: "a",
1513            owns: vec![Symbol::from("BTCUSDT")],
1514        });
1515        let b = Arc::new(OwningBrain {
1516            name: "b",
1517            owns: vec![Symbol::from("BTCUSDT"), Symbol::from("ETHUSDT")],
1518        });
1519        assert!(
1520            matches!(
1521                Bot::new(cfg, Arc::new(NoopExchange), vec![a, b]),
1522                Err(Error::Config(_))
1523            ),
1524            "overlapping owned_symbols must be rejected"
1525        );
1526    }
1527
1528    #[tokio::test]
1529    async fn accepts_disjoint_owned_symbols() {
1530        // Disjoint ownership is fine.
1531        let cfg = BotConfig::builder()
1532            .name("x")
1533            .symbols(["BTCUSDT", "ETHUSDT"])
1534            .without_signal_handler()
1535            .build()
1536            .unwrap();
1537        let a = Arc::new(OwningBrain {
1538            name: "a",
1539            owns: vec![Symbol::from("BTCUSDT")],
1540        });
1541        let b = Arc::new(OwningBrain {
1542            name: "b",
1543            owns: vec![Symbol::from("ETHUSDT")],
1544        });
1545        assert!(Bot::new(cfg, Arc::new(NoopExchange), vec![a, b]).is_ok());
1546    }
1547
1548    #[tokio::test]
1549    async fn none_owners_are_not_guarded() {
1550        // Two `None` (catch-all) brains coexist — they opt out of the guard.
1551        let bot = Bot::new(
1552            cfg(),
1553            Arc::new(NoopExchange),
1554            vec![Arc::new(NoopBrain), Arc::new(NoopBrain)],
1555        );
1556        assert!(bot.is_ok());
1557    }
1558
1559    #[allow(dead_code)]
1560    fn _noop_fill_compiles(_: &Fill) {}
1561}