Skip to main content

rustrade/
bot.rs

1//! The [`Bot`] entry point and its [`BotConfig`] builder.
2//!
3//! `Bot::new` validates configuration and constructs a supervised runtime.
4//! `Bot::run_until_shutdown` starts the framework services and blocks
5//! until shutdown is triggered (via signal or [`BotHandle::shutdown`]).
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use rustrade_core::{
12    AssetClass, Brain, CandleSource, Capability, Error, ExchangeClient, FillSource, MarketDataBus,
13    MarketSource, MetricsSink, NoopSink, Position, Result, SignalBus, StateStore, Symbol,
14};
15use rustrade_risk::{CircuitBreakerConfig, PortfolioRiskConfig, SessionPnlConfig, SizingConfig};
16use rustrade_supervisor::{Supervisor, SupervisorConfig};
17use tokio_util::sync::CancellationToken;
18
19use crate::execution::{ExecutionContext, ExecutionService};
20use crate::handle::BotHandle;
21use crate::order_tracker::{OrderReaperService, OrderTracker};
22use crate::risk_state::{
23    PortfolioRiskState, PositionCache, RiskPersister, RiskStateMap, build_portfolio_risk,
24    build_position_cache, build_risk_state,
25};
26use crate::risk_sweep::RiskSweepService;
27use crate::services::{CandlePollerService, FillRoutingService, MarketFeedService};
28
29const DEFAULT_MARKET_BUS_CAPACITY: usize = 1024;
30const DEFAULT_SIGNAL_BUS_CAPACITY: usize = 256;
31/// How often the risk sweep runs: detects the 00:00 UTC rollover for the
32/// per-symbol and account risk state, and refreshes the portfolio daily-loss
33/// latch. Coarse on purpose — these are day-scale controls.
34const RISK_SWEEP_CADENCE: Duration = Duration::from_secs(15);
35
36/// Risk-layer config for a symbol: session-PnL cap, circuit breaker, and
37/// position sizing. Used both as the bot-wide default
38/// ([`BotConfig::risk`]) and as a per-symbol override
39/// ([`BotConfig::per_symbol_risk`], set via
40/// [`BotConfigBuilder::symbol_risk`]).
41#[derive(Debug, Clone, Default)]
42pub struct RiskConfig {
43    /// Session PnL config applied to every configured symbol.
44    pub session_pnl: SessionPnlConfig,
45    /// Circuit-breaker config applied to every configured symbol.
46    pub circuit_breaker: CircuitBreakerConfig,
47    /// Position-sizing config used by the execution service.
48    pub sizing: SizingConfig,
49}
50
51impl RiskConfig {
52    /// Per-[`AssetClass`] starting presets. These are **sensible defaults to
53    /// tune**, not prescriptions — they mainly differ in leverage (the most
54    /// class-defining knob) and per-trade size; tighten or loosen for your
55    /// venue and risk appetite. Apply one per class via
56    /// [`BotConfigBuilder::class_risk`], or per symbol via
57    /// [`BotConfigBuilder::symbol_risk`].
58    ///
59    /// Crypto perpetuals: moderate 5× leverage (the framework default shape).
60    #[must_use]
61    pub fn crypto_perp() -> Self {
62        Self {
63            session_pnl: SessionPnlConfig { loss_limit: -50.0 },
64            circuit_breaker: CircuitBreakerConfig::default(),
65            sizing: SizingConfig {
66                margin_per_trade: 500.0,
67                leverage: 5,
68                max_contracts: 50,
69            },
70        }
71    }
72
73    /// Crypto spot: **no leverage** (1×) — one unit traded is one base unit.
74    #[must_use]
75    pub fn crypto_spot() -> Self {
76        Self {
77            session_pnl: SessionPnlConfig { loss_limit: -50.0 },
78            circuit_breaker: CircuitBreakerConfig::default(),
79            sizing: SizingConfig {
80                margin_per_trade: 500.0,
81                leverage: 1,
82                max_contracts: 50,
83            },
84        }
85    }
86
87    /// FX: higher leverage is conventional; a tighter sliding breaker.
88    #[must_use]
89    pub fn fx() -> Self {
90        Self {
91            session_pnl: SessionPnlConfig { loss_limit: -100.0 },
92            circuit_breaker: CircuitBreakerConfig {
93                loss_limit: 3,
94                window_secs: 7_200,
95                cooldown_secs: 3_600,
96            },
97            sizing: SizingConfig {
98                margin_per_trade: 1_000.0,
99                leverage: 20,
100                max_contracts: 50,
101            },
102        }
103    }
104
105    /// Dated futures: bigger contracts, moderate leverage, a wider daily cap.
106    #[must_use]
107    pub fn futures() -> Self {
108        Self {
109            session_pnl: SessionPnlConfig { loss_limit: -200.0 },
110            circuit_breaker: CircuitBreakerConfig::default(),
111            sizing: SizingConfig {
112                margin_per_trade: 1_000.0,
113                leverage: 10,
114                max_contracts: 20,
115            },
116        }
117    }
118
119    /// Cash equities: **no leverage** (1×), conservative size.
120    #[must_use]
121    pub fn equity() -> Self {
122        Self {
123            session_pnl: SessionPnlConfig { loss_limit: -200.0 },
124            circuit_breaker: CircuitBreakerConfig::default(),
125            sizing: SizingConfig {
126                margin_per_trade: 1_000.0,
127                leverage: 1,
128                max_contracts: 20,
129            },
130        }
131    }
132
133    /// The starting preset for an [`AssetClass`]: `CryptoSpot` →
134    /// [`Self::crypto_spot`], `Fx` → [`Self::fx`], `Future` → [`Self::futures`],
135    /// `Equity` → [`Self::equity`]; `CryptoPerp` and any other class fall back
136    /// to [`Self::crypto_perp`].
137    #[must_use]
138    pub fn preset_for(class: AssetClass) -> Self {
139        match class {
140            AssetClass::CryptoSpot => Self::crypto_spot(),
141            AssetClass::Fx => Self::fx(),
142            AssetClass::Future => Self::futures(),
143            AssetClass::Equity => Self::equity(),
144            AssetClass::CryptoPerp => Self::crypto_perp(),
145            // `AssetClass` is #[non_exhaustive]: `Other` and any future class
146            // fall back to the crypto-perp shape.
147            _ => Self::crypto_perp(),
148        }
149    }
150}
151
152/// Configuration for a [`Bot`].
153///
154/// Construct via [`BotConfig::builder`]. The builder validates every
155/// field on [`BotConfigBuilder::build`] and returns `Error::Config` on
156/// any violation — the framework never panics on bad config. `Bot::new`
157/// does a final brain-count check on top.
158///
159/// # Example
160///
161/// ```
162/// use std::time::Duration;
163/// use rustrade::BotConfig;
164///
165/// let config = BotConfig::builder()
166///     .name("market-maker")
167///     .symbols(["BTCUSDT", "ETHUSDT"])
168///     .shutdown_timeout(Duration::from_secs(5))
169///     .without_signal_handler() // tests + embedded use
170///     .build()
171///     .unwrap();
172///
173/// assert_eq!(config.name, "market-maker");
174/// assert_eq!(config.symbols.len(), 2);
175/// ```
176#[derive(Debug, Clone)]
177pub struct BotConfig {
178    /// Human-readable name used in logs, tracing spans, and supervisor
179    /// service identification.
180    pub name: String,
181    /// Symbols this bot trades. Every symbol gets a pre-seeded entry in
182    /// the risk-state map and the position cache. Must be non-empty —
183    /// the position cache and risk-state map would otherwise be empty,
184    /// which is a silent footgun.
185    pub symbols: Vec<Symbol>,
186    /// Maximum time to wait for services to drain on shutdown. Must be
187    /// `> 0`; the supervisor's drain logic needs a non-zero deadline.
188    pub shutdown_timeout: Duration,
189    /// Whether the supervisor installs its own Ctrl-C / SIGTERM handler.
190    /// Disable when the host service drives shutdown via [`BotHandle::shutdown`].
191    pub install_signal_handler: bool,
192    /// Capacity of the in-process market-data broadcast bus. Backed by
193    /// `tokio::sync::broadcast`, which has **drop-oldest** semantics: a
194    /// slow subscriber that falls behind by more than `capacity` events
195    /// sees `RecvError::Lagged(n)` and the oldest dropped events are
196    /// gone. Size this to absorb the worst-case latency between
197    /// publish and slowest subscriber's `recv`.
198    pub market_bus_capacity: usize,
199    /// Capacity of the in-process signal broadcast bus. Same drop-oldest
200    /// semantics as `market_bus_capacity`. Typically smaller — signals
201    /// are emitted ~once per non-Hold decision, far less frequent than
202    /// market events.
203    pub signal_bus_capacity: usize,
204    /// On shutdown, attempt to close any open position for each symbol
205    /// before exit, using `ExchangeClient::close_position`. Best-effort:
206    /// failures are logged but do not propagate.
207    pub close_positions_on_shutdown: bool,
208    /// Risk-layer defaults applied to every configured symbol that has no
209    /// entry in [`Self::per_symbol_risk`].
210    pub risk: RiskConfig,
211    /// Per-symbol risk overrides. A symbol present here uses its own
212    /// [`RiskConfig`] (session-PnL cap, circuit breaker, and sizing) instead
213    /// of [`Self::risk`] — e.g. a tighter drawdown cap on a volatile alt, or
214    /// a larger size on a flagship symbol. Symbols absent here use the
215    /// default. Resolve with [`Self::resolve_risk`].
216    pub per_symbol_risk: HashMap<Symbol, RiskConfig>,
217    /// Per-[`AssetClass`] risk overrides. A symbol whose
218    /// [`InstrumentSpec`](rustrade_core::InstrumentSpec) reports a class present
219    /// here uses that class's [`RiskConfig`] — unless a per-symbol override
220    /// also exists, which wins. Lets one bot apply crypto-perp / spot / FX /
221    /// futures rules side by side. See [`RiskConfig::preset_for`] for starting
222    /// presets and [`Self::resolve_risk`] for the precedence.
223    pub per_class_risk: HashMap<AssetClass, RiskConfig>,
224    /// Account-wide risk applied across **all** symbols: a daily-loss halt,
225    /// a max-concurrent-positions cap, and a gross-exposure cap. Complements
226    /// the per-symbol [`RiskConfig`] gates. Defaults to all-off (opt-in), so
227    /// a bot that doesn't set it behaves exactly as before.
228    pub portfolio: PortfolioRiskConfig,
229}
230
231impl BotConfig {
232    /// Begin building a config with sensible defaults.
233    pub fn builder() -> BotConfigBuilder {
234        BotConfigBuilder::default()
235    }
236
237    /// The effective [`RiskConfig`] for `symbol`, ignoring asset class: its
238    /// per-symbol override if set, else the bot-wide default ([`Self::risk`]).
239    /// Prefer [`Self::resolve_risk`], which also honours per-class overrides.
240    pub fn risk_for(&self, symbol: &Symbol) -> &RiskConfig {
241        self.per_symbol_risk.get(symbol).unwrap_or(&self.risk)
242    }
243
244    /// The effective [`RiskConfig`] for `symbol` of the given `asset_class`,
245    /// applying the precedence **per-symbol → per-class → default**. The
246    /// framework resolves `asset_class` from
247    /// [`ExchangeClient::instrument_spec`] at startup, so a multi-asset bot
248    /// gets the right rules per symbol without listing every one.
249    pub fn resolve_risk(&self, symbol: &Symbol, asset_class: AssetClass) -> &RiskConfig {
250        self.per_symbol_risk
251            .get(symbol)
252            .or_else(|| self.per_class_risk.get(&asset_class))
253            .unwrap_or(&self.risk)
254    }
255}
256
257/// Builder for [`BotConfig`].
258#[derive(Debug, Clone, Default)]
259pub struct BotConfigBuilder {
260    name: Option<String>,
261    symbols: Vec<Symbol>,
262    shutdown_timeout: Option<Duration>,
263    install_signal_handler: Option<bool>,
264    market_bus_capacity: Option<usize>,
265    signal_bus_capacity: Option<usize>,
266    close_positions_on_shutdown: Option<bool>,
267    risk: RiskConfig,
268    per_symbol_risk: HashMap<Symbol, RiskConfig>,
269    per_class_risk: HashMap<AssetClass, RiskConfig>,
270    portfolio: PortfolioRiskConfig,
271}
272
273impl BotConfigBuilder {
274    /// Human-readable bot name (logs, supervisor identification). Required.
275    pub fn name(mut self, name: impl Into<String>) -> Self {
276        self.name = Some(name.into());
277        self
278    }
279
280    /// Add a single symbol. Repeated calls accumulate.
281    pub fn symbol(mut self, sym: impl Into<Symbol>) -> Self {
282        self.symbols.push(sym.into());
283        self
284    }
285
286    /// Add many symbols at once. Repeated calls accumulate.
287    pub fn symbols<I, S>(mut self, syms: I) -> Self
288    where
289        I: IntoIterator<Item = S>,
290        S: Into<Symbol>,
291    {
292        self.symbols.extend(syms.into_iter().map(Into::into));
293        self
294    }
295
296    /// Maximum time to wait for services to drain on shutdown.
297    pub fn shutdown_timeout(mut self, dur: Duration) -> Self {
298        self.shutdown_timeout = Some(dur);
299        self
300    }
301
302    /// Disable the supervisor's signal handler — host drives shutdown.
303    pub fn without_signal_handler(mut self) -> Self {
304        self.install_signal_handler = Some(false);
305        self
306    }
307
308    /// Override the in-process market-data bus capacity (default 1024).
309    pub fn market_bus_capacity(mut self, cap: usize) -> Self {
310        self.market_bus_capacity = Some(cap);
311        self
312    }
313
314    /// Override the in-process signal-bus capacity (default 256).
315    pub fn signal_bus_capacity(mut self, cap: usize) -> Self {
316        self.signal_bus_capacity = Some(cap);
317        self
318    }
319
320    /// Enable best-effort `exchange.close_position` for non-flat positions
321    /// after the supervisor drains.
322    pub fn close_positions_on_shutdown(mut self, b: bool) -> Self {
323        self.close_positions_on_shutdown = Some(b);
324        self
325    }
326
327    /// Override the session-PnL config used for every symbol.
328    pub fn session_pnl_config(mut self, cfg: SessionPnlConfig) -> Self {
329        self.risk.session_pnl = cfg;
330        self
331    }
332
333    /// Override the circuit-breaker config used for every symbol.
334    pub fn circuit_breaker_config(mut self, cfg: CircuitBreakerConfig) -> Self {
335        self.risk.circuit_breaker = cfg;
336        self
337    }
338
339    /// Override the position-sizing config used for every symbol.
340    pub fn sizing_config(mut self, cfg: SizingConfig) -> Self {
341        self.risk.sizing = cfg;
342        self
343    }
344
345    /// Set the account-wide [`PortfolioRiskConfig`] — a daily-loss halt, a
346    /// max-concurrent-positions cap, and a gross-exposure cap applied across
347    /// all symbols. Unset, every limit is off (the per-symbol gates still apply).
348    ///
349    /// ```
350    /// use rustrade::{BotConfig, PortfolioRiskConfig};
351    ///
352    /// let cfg = BotConfig::builder()
353    ///     .name("bot")
354    ///     .symbols(["BTCUSDT", "ETHUSDT"])
355    ///     .without_signal_handler()
356    ///     .portfolio_config(PortfolioRiskConfig {
357    ///         max_daily_loss: -500.0,
358    ///         max_concurrent_positions: 3,
359    ///         max_gross_exposure: 50_000.0,
360    ///     })
361    ///     .build()
362    ///     .unwrap();
363    /// assert_eq!(cfg.portfolio.max_concurrent_positions, 3);
364    /// ```
365    pub fn portfolio_config(mut self, cfg: PortfolioRiskConfig) -> Self {
366        self.portfolio = cfg;
367        self
368    }
369
370    /// Set a per-symbol [`RiskConfig`] override. The given symbol then uses
371    /// this config (session-PnL cap, circuit breaker, sizing) instead of the
372    /// bot-wide default. Repeated calls for the same symbol replace the
373    /// previous override. Symbols without an override use the default.
374    ///
375    /// ```
376    /// use rustrade::{BotConfig, RiskConfig};
377    /// use rustrade::SessionPnlConfig;
378    ///
379    /// let cfg = BotConfig::builder()
380    ///     .name("bot")
381    ///     .symbols(["BTCUSDT", "DOGEUSDT"])
382    ///     .without_signal_handler()
383    ///     // tighter daily loss cap on the volatile alt
384    ///     .symbol_risk("DOGEUSDT", RiskConfig {
385    ///         session_pnl: SessionPnlConfig { loss_limit: -20.0 },
386    ///         ..Default::default()
387    ///     })
388    ///     .build()
389    ///     .unwrap();
390    /// assert_eq!(cfg.risk_for(&"DOGEUSDT".into()).session_pnl.loss_limit, -20.0);
391    /// ```
392    pub fn symbol_risk(mut self, symbol: impl Into<Symbol>, cfg: RiskConfig) -> Self {
393        self.per_symbol_risk.insert(symbol.into(), cfg);
394        self
395    }
396
397    /// Set a per-[`AssetClass`] [`RiskConfig`] override. Symbols whose
398    /// instrument reports this class use `cfg` (unless a per-symbol override
399    /// also exists, which wins). See [`RiskConfig::preset_for`] for ready-made
400    /// presets to start from.
401    ///
402    /// ```
403    /// use rustrade::{AssetClass, BotConfig, RiskConfig};
404    ///
405    /// let cfg = BotConfig::builder()
406    ///     .name("multi-asset")
407    ///     .symbols(["XBTUSDTM", "EURUSD"])
408    ///     .without_signal_handler()
409    ///     .class_risk(AssetClass::CryptoPerp, RiskConfig::crypto_perp())
410    ///     .class_risk(AssetClass::Fx, RiskConfig::fx())
411    ///     .build()
412    ///     .unwrap();
413    /// // FX symbols resolve to the FX preset (20× leverage) absent a
414    /// // per-symbol override.
415    /// assert_eq!(
416    ///     cfg.resolve_risk(&"EURUSD".into(), AssetClass::Fx).sizing.leverage,
417    ///     20
418    /// );
419    /// ```
420    pub fn class_risk(mut self, class: AssetClass, cfg: RiskConfig) -> Self {
421        self.per_class_risk.insert(class, cfg);
422        self
423    }
424
425    /// Validate and build. Returns `Error::Config` on any constraint
426    /// violation — the framework never panics on bad config.
427    pub fn build(self) -> Result<BotConfig> {
428        let name = self
429            .name
430            .filter(|n| !n.trim().is_empty())
431            .ok_or_else(|| Error::config("BotConfig.name is required and must not be empty"))?;
432
433        if self.symbols.is_empty() {
434            return Err(Error::config(
435                "BotConfig.symbols must contain at least one Symbol — \
436                 the position cache and risk-state map are pre-seeded per symbol",
437            ));
438        }
439
440        let market_bus_capacity = self
441            .market_bus_capacity
442            .unwrap_or(DEFAULT_MARKET_BUS_CAPACITY);
443        if market_bus_capacity == 0 {
444            return Err(Error::config(
445                "BotConfig.market_bus_capacity must be > 0 (broadcast channel cannot have 0 slots)",
446            ));
447        }
448
449        let signal_bus_capacity = self
450            .signal_bus_capacity
451            .unwrap_or(DEFAULT_SIGNAL_BUS_CAPACITY);
452        if signal_bus_capacity == 0 {
453            return Err(Error::config(
454                "BotConfig.signal_bus_capacity must be > 0 (broadcast channel cannot have 0 slots)",
455            ));
456        }
457
458        let shutdown_timeout = self.shutdown_timeout.unwrap_or(Duration::from_secs(30));
459        if shutdown_timeout.is_zero() {
460            return Err(Error::config(
461                "BotConfig.shutdown_timeout must be > 0 — drain needs a non-zero deadline",
462            ));
463        }
464
465        // Validate the default risk config and every per-symbol / per-class
466        // override with the same rules — bad config never reaches the runtime.
467        validate_risk(&self.risk, "BotConfig.risk")?;
468        for (sym, cfg) in &self.per_symbol_risk {
469            validate_risk(cfg, &format!("BotConfig.per_symbol_risk[{sym}]"))?;
470        }
471        for (class, cfg) in &self.per_class_risk {
472            validate_risk(cfg, &format!("BotConfig.per_class_risk[{class:?}]"))?;
473        }
474        validate_portfolio(&self.portfolio)?;
475
476        Ok(BotConfig {
477            name,
478            symbols: self.symbols,
479            shutdown_timeout,
480            install_signal_handler: self.install_signal_handler.unwrap_or(true),
481            market_bus_capacity,
482            signal_bus_capacity,
483            close_positions_on_shutdown: self.close_positions_on_shutdown.unwrap_or(false),
484            risk: self.risk,
485            per_symbol_risk: self.per_symbol_risk,
486            per_class_risk: self.per_class_risk,
487            portfolio: self.portfolio,
488        })
489    }
490}
491
492/// Validate a [`RiskConfig`] (the default or a per-symbol override).
493/// `ctx` names the offending field in the error.
494fn validate_risk(risk: &RiskConfig, ctx: &str) -> Result<()> {
495    if risk.session_pnl.loss_limit.is_nan() {
496        return Err(Error::config(format!(
497            "{ctx}.session_pnl.loss_limit must not be NaN"
498        )));
499    }
500    if !risk.sizing.margin_per_trade.is_finite() || risk.sizing.margin_per_trade < 0.0 {
501        return Err(Error::config(format!(
502            "{ctx}.sizing.margin_per_trade must be a finite non-negative number"
503        )));
504    }
505    Ok(())
506}
507
508/// Validate the account-wide [`PortfolioRiskConfig`]. `NaN` limits would make
509/// every comparison silently false (a disabled limit that looks enabled), so
510/// they're rejected; `±∞` is allowed as the explicit "disabled" sentinel.
511fn validate_portfolio(p: &PortfolioRiskConfig) -> Result<()> {
512    if p.max_daily_loss.is_nan() {
513        return Err(Error::config(
514            "BotConfig.portfolio.max_daily_loss must not be NaN (use f64::NEG_INFINITY to disable)",
515        ));
516    }
517    if p.max_gross_exposure.is_nan() {
518        return Err(Error::config(
519            "BotConfig.portfolio.max_gross_exposure must not be NaN (use f64::INFINITY to disable)",
520        ));
521    }
522    Ok(())
523}
524
525/// The embedded trading bot.
526///
527/// Owns a [`Supervisor`], an [`ExchangeClient`], one or more [`Brain`]s,
528/// and the in-process [`MarketDataBus`]. Created via [`Bot::new`]; run
529/// via [`Bot::run_until_shutdown`]; observed and steered via the
530/// [`BotHandle`] returned from [`Bot::handle`].
531///
532/// # Example
533///
534/// ```no_run
535/// use std::sync::Arc;
536/// use rustrade::{Bot, BotConfig};
537/// # struct MyBrain;
538/// # #[async_trait::async_trait]
539/// # impl rustrade_core::Brain for MyBrain {
540/// #     fn name(&self) -> &str { "x" }
541/// #     async fn on_event(&self, _: &rustrade_core::MarketDataEvent, _: &rustrade_core::Position)
542/// #         -> rustrade_core::Result<rustrade_core::Decision> {
543/// #         Ok(rustrade_core::Decision::hold())
544/// #     }
545/// #     async fn health(&self) -> rustrade_core::BrainHealth { rustrade_core::BrainHealth::ok() }
546/// # }
547/// # async fn run(exchange: Arc<dyn rustrade_core::ExchangeClient>) -> anyhow::Result<()> {
548/// let config = BotConfig::builder()
549///     .name("my-bot")
550///     .symbol("BTCUSDT")
551///     .build()?;
552///
553/// let bot = Bot::new(config, exchange, vec![Arc::new(MyBrain) as Arc<dyn rustrade_core::Brain>])?;
554/// let handle = bot.handle();
555///
556/// // Spawn the bot; ask it to shut down from elsewhere.
557/// let task = tokio::spawn(async move { bot.run_until_shutdown().await });
558/// handle.shutdown();
559/// task.await??;
560/// # Ok(())
561/// # }
562/// ```
563pub struct Bot {
564    config: BotConfig,
565    supervisor: Arc<Supervisor>,
566    exchange: Arc<dyn ExchangeClient>,
567    brains: Arc<Vec<Arc<dyn Brain>>>,
568    market_bus: MarketDataBus,
569    signal_bus: SignalBus,
570    positions: PositionCache,
571    risk: RiskStateMap,
572    metrics: Arc<dyn MetricsSink>,
573    state_store: Option<Arc<dyn StateStore>>,
574    persister_slot: crate::handle::PersisterSlot,
575    handle: BotHandle,
576    external_cancel: Option<CancellationToken>,
577    market_source: Option<Arc<dyn MarketSource>>,
578    fill_source: Option<Arc<dyn FillSource>>,
579    candle_pollers: Vec<CandlePollerSpec>,
580    order_tracker: OrderTracker,
581    order_tracking: Option<OrderTrackingSpec>,
582}
583
584/// Settings captured by `Bot::with_order_tracking`.
585struct OrderTrackingSpec {
586    ttl: Duration,
587    poll_cadence: Duration,
588}
589
590/// One registered `(symbol, interval, cadence)` for `Bot::with_candle_poller`.
591struct CandlePollerSpec {
592    source: Arc<dyn CandleSource>,
593    symbol: Symbol,
594    interval: Duration,
595    poll_cadence: Duration,
596    limit: usize,
597}
598
599impl Bot {
600    /// Construct a `Bot`. Validates that at least one brain is provided.
601    ///
602    /// The exchange client and brain set are immutable for the bot's
603    /// lifetime — to change them, build a new `Bot`.
604    pub fn new(
605        config: BotConfig,
606        exchange: Arc<dyn ExchangeClient>,
607        brains: Vec<Arc<dyn Brain>>,
608    ) -> Result<Self> {
609        if brains.is_empty() {
610            return Err(Error::config(
611                "Bot::new requires at least one Brain — empty brain list",
612            ));
613        }
614
615        // Multi-brain arbitration guard: no two brains may claim ownership of
616        // the same symbol via `Brain::owned_symbols` — that would let two
617        // strategies fight over one position. Brains returning `None` opt out
618        // and self-coordinate (current behaviour).
619        let mut claimed: HashMap<Symbol, String> = HashMap::new();
620        for brain in &brains {
621            let Some(syms) = brain.owned_symbols() else {
622                continue;
623            };
624            for sym in syms.into_iter().collect::<std::collections::HashSet<_>>() {
625                if let Some(other) = claimed.insert(sym.clone(), brain.name().to_string()) {
626                    return Err(Error::config(format!(
627                        "brains '{other}' and '{}' both declare ownership of {sym} — \
628                         overlapping owned_symbols would fight over one position",
629                        brain.name()
630                    )));
631                }
632            }
633        }
634
635        let supervisor = Arc::new(Supervisor::new(
636            SupervisorConfig::default()
637                .with_shutdown_timeout(config.shutdown_timeout)
638                .with_default_backoff(Default::default())
639                .pipe(|c| {
640                    if config.install_signal_handler {
641                        c
642                    } else {
643                        c.without_signal_handler()
644                    }
645                }),
646        ));
647
648        let market_bus = MarketDataBus::with_capacity(config.market_bus_capacity);
649        let signal_bus = SignalBus::with_capacity(config.signal_bus_capacity);
650        let positions = build_position_cache(&config.symbols);
651        let risk = build_risk_state(&config.symbols, |sym| {
652            // Resolve per-symbol → per-class (by the adapter's asset class) →
653            // default, so a multi-asset bot gets the right gates per symbol.
654            let class = exchange.instrument_spec(sym).asset_class;
655            let r = config.resolve_risk(sym, class);
656            (r.session_pnl.clone(), r.circuit_breaker.clone())
657        });
658
659        let brains = Arc::new(brains);
660        let persister_slot: crate::handle::PersisterSlot = Arc::new(std::sync::OnceLock::new());
661        let order_tracker = OrderTracker::new();
662        let handle = BotHandle::new(
663            supervisor.clone(),
664            brains.clone(),
665            risk.clone(),
666            positions.clone(),
667            signal_bus.clone(),
668            persister_slot.clone(),
669            order_tracker.clone(),
670        );
671
672        Ok(Self {
673            config,
674            supervisor,
675            exchange,
676            brains,
677            market_bus,
678            signal_bus,
679            positions,
680            risk,
681            metrics: Arc::new(NoopSink),
682            state_store: None,
683            persister_slot,
684            order_tracker,
685            handle,
686            external_cancel: None,
687            market_source: None,
688            fill_source: None,
689            candle_pollers: Vec::new(),
690            order_tracking: None,
691        })
692    }
693
694    /// Install a [`MetricsSink`]. The framework's services emit
695    /// counters and histograms to this sink on every observable event;
696    /// the default is [`NoopSink`], which discards everything.
697    pub fn with_metrics(mut self, sink: Arc<dyn MetricsSink>) -> Self {
698        self.metrics = sink;
699        self
700    }
701
702    /// Install a [`StateStore`] so per-symbol risk state (session PnL and
703    /// circuit breaker) survives restarts.
704    ///
705    /// Without a store, risk state is in-memory only: a crash mid-session
706    /// resets the daily drawdown cap and the loss-streak breaker. With one
707    /// wired, the bot:
708    ///
709    /// - **Restores** each symbol's snapshot on startup, then applies the
710    ///   stale-snapshot policy — a session from an earlier UTC day rolls
711    ///   over to fresh, and a breaker whose cooldown elapsed during
712    ///   downtime auto-resets.
713    /// - **Persists** after every realised trade (whether fed via
714    ///   [`BotHandle::record_trade_outcome`](crate::BotHandle::record_trade_outcome)
715    ///   or auto-routed by the `FillRoutingService`).
716    /// - **Flushes** on graceful shutdown.
717    ///
718    /// Use [`rustrade_core::InMemoryStore`] for a non-durable default, or a
719    /// disk-/database-backed implementation from a downstream crate for
720    /// real durability. Snapshots are keyed by `(bot name, symbol)`, so
721    /// distinct bots can share one backend without collision.
722    pub fn with_state_store(mut self, store: Arc<dyn StateStore>) -> Self {
723        self.state_store = Some(store);
724        self
725    }
726
727    /// Enable resting-order lifecycle tracking.
728    ///
729    /// The [`ExecutionService`] records every resting order it places (limit
730    /// / post-only / IOC / FOK — market orders never rest, so they're
731    /// skipped), and a supervised [`OrderReaperService`] periodically:
732    ///
733    /// - **reconciles** the tracker against `exchange.get_open_orders` (so
734    ///   orders filled or cancelled out-of-band stop being tracked), and
735    /// - **cancels** any resting order older than `ttl` via
736    ///   `exchange.cancel_order`.
737    ///
738    /// `poll_cadence` is how often a sweep runs. The reaper is only spawned
739    /// if the adapter advertises [`Capability::OrderTracking`]; otherwise the
740    /// call is a no-op (with a warning at startup), since the framework can't
741    /// list or cancel orders without it. Live tracked orders are visible via
742    /// [`BotHandle::tracked_orders`](crate::BotHandle::tracked_orders).
743    pub fn with_order_tracking(mut self, ttl: Duration, poll_cadence: Duration) -> Self {
744        self.order_tracking = Some(OrderTrackingSpec { ttl, poll_cadence });
745        self
746    }
747
748    /// Register a [`CandleSource`] to be polled every `poll_cadence` for
749    /// `(symbol, interval)`. Polled candles are deduplicated by
750    /// timestamp and published to the bot's [`MarketDataBus`]. Repeated
751    /// calls accumulate — one supervised service per registered tuple.
752    pub fn with_candle_poller(
753        mut self,
754        source: Arc<dyn CandleSource>,
755        symbol: impl Into<Symbol>,
756        interval: Duration,
757        poll_cadence: Duration,
758        limit: usize,
759    ) -> Self {
760        self.candle_pollers.push(CandlePollerSpec {
761            source,
762            symbol: symbol.into(),
763            interval,
764            poll_cadence,
765            limit,
766        });
767        self
768    }
769
770    /// Tie this bot's shutdown to an externally-owned cancellation token.
771    ///
772    /// When the external token is cancelled, the bot's supervisor token
773    /// is cancelled too — equivalent to calling [`BotHandle::shutdown`]
774    /// but without spawning a linker task in the host.
775    ///
776    /// The reverse is not true: cancelling the bot does not cancel the
777    /// external token.
778    pub fn with_external_cancel(mut self, token: CancellationToken) -> Self {
779        self.external_cancel = Some(token);
780        self
781    }
782
783    /// Attach a [`MarketSource`] to be driven by a supervised
784    /// [`MarketFeedService`]. Source implementors are responsible for
785    /// publishing to the bot's [`MarketDataBus`] (obtain via
786    /// `bot.market_data_bus().clone()` before constructing the source).
787    pub fn with_market_source(mut self, source: Arc<dyn MarketSource>) -> Self {
788        self.market_source = Some(source);
789        self
790    }
791
792    /// Attach a [`FillSource`] to be driven by a supervised
793    /// [`FillRoutingService`]. Fills are routed to every brain via
794    /// `Brain::on_fill` and the position cache is refreshed from the
795    /// exchange after each one.
796    pub fn with_fill_source(mut self, source: Arc<dyn FillSource>) -> Self {
797        self.fill_source = Some(source);
798        self
799    }
800
801    /// Cheap cloneable handle for host services. Can be obtained at any
802    /// point — call before [`Self::run_until_shutdown`] so the host can
803    /// drive shutdown while the bot is running.
804    pub fn handle(&self) -> BotHandle {
805        self.handle.clone()
806    }
807
808    /// Reference to the bot's configuration.
809    pub fn config(&self) -> &BotConfig {
810        &self.config
811    }
812
813    /// Borrow the in-process market-data bus. Host services and adapters
814    /// publish here; the bot's framework services subscribe.
815    pub fn market_data_bus(&self) -> &MarketDataBus {
816        &self.market_bus
817    }
818
819    /// Borrow the in-process signal bus. The execution service publishes
820    /// a [`Signal`](rustrade_core::Signal) to this bus on every
821    /// non-`Hold` decision the brain emits; host services subscribe via
822    /// [`BotHandle::subscribe_signals`].
823    pub fn signal_bus(&self) -> &SignalBus {
824        &self.signal_bus
825    }
826
827    /// Spawn the framework services and run until shutdown.
828    ///
829    /// Returns after all spawned services have drained (or the configured
830    /// shutdown timeout elapses). Consumes `self` to make the
831    /// "construct → run → exit" lifecycle explicit; persistent observation
832    /// of the running bot is done via the [`BotHandle`] obtained earlier.
833    ///
834    /// # Runtime requirements
835    ///
836    /// - **Multi-thread tokio runtime.** The supervisor spawns each
837    ///   service onto `tokio::spawn`. A current-thread runtime works
838    ///   for small loads but loses the per-service parallelism the
839    ///   framework is designed for. Use
840    ///   `#[tokio::main(flavor = "multi_thread")]` or
841    ///   `tokio::runtime::Builder::new_multi_thread()` in the host.
842    /// - **`tokio::spawn` is used internally.** Anywhere the host
843    ///   embeds this method, a tokio runtime context must be active.
844    /// - **No nested runtimes.** `Bot::run_until_shutdown` is async; do
845    ///   not call `block_on` on it from inside another runtime.
846    ///
847    /// # Resource expectations
848    ///
849    /// - **Memory per active symbol:** O(few hundred bytes) for the
850    ///   position cache entry, `SymbolRisk` (a `SessionPnl` plus a
851    ///   `CircuitBreaker` whose ring buffer is bounded by
852    ///   `loss_limit`), plus the per-symbol slot in any host-owned
853    ///   subscriber.
854    /// - **Channel buffers:** `market_bus_capacity` + `signal_bus_capacity`
855    ///   slots per bus, each slot holding a clone of `MarketDataEvent` /
856    ///   `Signal`. Drop-oldest semantics — back-pressure is *not*
857    ///   propagated to publishers.
858    /// - **Expected shutdown time:** ≤ `shutdown_timeout`. A
859    ///   well-behaved service responds to its cancel token in
860    ///   milliseconds; the timeout is the worst-case bound, not the
861    ///   typical case.
862    /// - **Restart-after-crash latency:** bounded by
863    ///   [`BackoffConfig`](rustrade_supervisor::BackoffConfig). Defaults:
864    ///   100 ms base, 60 s cap, 10 retries within a 10-minute window
865    ///   before the circuit breaker trips.
866    pub async fn run_until_shutdown(self) -> anyhow::Result<()> {
867        tracing::info!(
868            bot = %self.config.name,
869            brains = self.brains.len(),
870            symbols = self.config.symbols.len(),
871            exchange = %self.exchange.name(),
872            "rustrade Bot starting"
873        );
874
875        // Best-effort position prefetch — failures don't block startup.
876        self.prefetch_positions().await;
877
878        // Restore persisted risk state (if a store is wired) before any
879        // service runs, then publish the persister so the per-trade paths
880        // can save through it.
881        let persister = self
882            .state_store
883            .clone()
884            .map(|store| RiskPersister::new(store, self.config.name.clone()));
885        if let Some(p) = &persister {
886            p.restore_into(&self.risk).await;
887            // OnceLock: set is infallible the first (only) time per bot.
888            let _ = self.persister_slot.set(p.clone());
889        }
890
891        // Order tracking: only active when wired AND the adapter can list +
892        // cancel orders. Gate once here so both the execution service (which
893        // records resting orders) and the reaper agree.
894        let order_tracking_active =
895            self.order_tracking.is_some() && self.exchange.supports(Capability::OrderTracking);
896        if self.order_tracking.is_some() && !order_tracking_active {
897            tracing::warn!(
898                exchange = %self.exchange.name(),
899                "order tracking requested but adapter lacks Capability::OrderTracking — \
900                 resting orders will NOT be tracked or aged out"
901            );
902        }
903
904        // Bracket (SL+TP / OCO) orders need three things: place protective
905        // stops (StopOrders), cancel the sibling on fill (OrderTracking), and
906        // detect that fill (a fill source). When all hold, a shared OCO
907        // registry links each bracket's legs; otherwise a brain emitting both
908        // SL+TP falls back to a single attached stop-loss (see
909        // `ExecutionService::attach_protection`).
910        let brackets_active = self.exchange.supports(Capability::StopOrders)
911            && self.exchange.supports(Capability::OrderTracking)
912            && self.fill_source.is_some();
913        let oco = brackets_active.then(crate::order_tracker::OcoRegistry::new);
914        tracing::info!(brackets_active, "bracket (SL+TP/OCO) support");
915
916        // Resolve each symbol's effective sizing (per-symbol → per-class by
917        // asset class → default) so multi-asset bots size each class correctly.
918        let sizing = Arc::new(crate::execution::SymbolSizing::new(
919            self.config.risk.sizing.clone(),
920            self.config
921                .symbols
922                .iter()
923                .map(|s| {
924                    let class = self.exchange.instrument_spec(s).asset_class;
925                    (s.clone(), self.config.resolve_risk(s, class).sizing.clone())
926                })
927                .collect(),
928        ));
929        // Account-wide portfolio risk, shared between the execution gate
930        // (reads it) and the risk sweep (maintains its daily-loss latch).
931        let portfolio: PortfolioRiskState = build_portfolio_risk(self.config.portfolio.clone());
932
933        let ctx = ExecutionContext {
934            exchange: self.exchange.clone(),
935            bus: self.market_bus.clone(),
936            signals: self.signal_bus.clone(),
937            positions: self.positions.clone(),
938            risk: self.risk.clone(),
939            portfolio: portfolio.clone(),
940            sizing,
941            order_tracker: order_tracking_active.then(|| self.order_tracker.clone()),
942            oco: oco.clone(),
943        };
944
945        for brain in self.brains.iter() {
946            let svc = ExecutionService::new(brain.clone(), ctx.clone());
947            self.supervisor.spawn_service(Box::new(svc));
948        }
949
950        // Periodic risk sweep: rolls per-symbol SessionPnl / CircuitBreaker and
951        // the portfolio halt over at 00:00 UTC during a live run (without it,
952        // tick() only fires on restart), and re-derives the account daily-loss
953        // latch from the per-symbol session PnLs.
954        self.supervisor
955            .spawn_service(Box::new(RiskSweepService::new(
956                self.risk.clone(),
957                portfolio.clone(),
958                RISK_SWEEP_CADENCE,
959            )));
960
961        if order_tracking_active {
962            // Safe: order_tracking_active implies order_tracking is Some.
963            let spec = self.order_tracking.as_ref().unwrap();
964            self.supervisor
965                .spawn_service(Box::new(OrderReaperService::new(
966                    self.exchange.clone(),
967                    self.order_tracker.clone(),
968                    self.config.symbols.clone(),
969                    spec.ttl,
970                    spec.poll_cadence,
971                    self.metrics.clone(),
972                )));
973        }
974
975        if let Some(source) = self.market_source.clone() {
976            self.supervisor
977                .spawn_service(Box::new(MarketFeedService::new(source)));
978        }
979
980        if let Some(source) = self.fill_source.clone() {
981            self.supervisor
982                .spawn_service(Box::new(FillRoutingService::new(
983                    source,
984                    self.brains.clone(),
985                    self.exchange.clone(),
986                    self.positions.clone(),
987                    self.risk.clone(),
988                    self.metrics.clone(),
989                    persister.clone(),
990                    oco.clone(),
991                )));
992        }
993
994        for spec in &self.candle_pollers {
995            self.supervisor
996                .spawn_service(Box::new(CandlePollerService::new(
997                    spec.source.clone(),
998                    spec.symbol.clone(),
999                    spec.interval,
1000                    spec.poll_cadence,
1001                    spec.limit,
1002                    self.market_bus.clone(),
1003                    self.metrics.clone(),
1004                )));
1005        }
1006
1007        // External cancellation linker: when the host's token fires,
1008        // cancel the supervisor's root token. The reverse is not wired.
1009        if let Some(external) = self.external_cancel.clone() {
1010            let supervisor = self.supervisor.clone();
1011            tokio::spawn(async move {
1012                external.cancelled().await;
1013                tracing::info!("external cancellation received; triggering bot shutdown");
1014                supervisor.trigger_shutdown();
1015            });
1016        }
1017
1018        let run_result = self.supervisor.run_until_shutdown().await;
1019
1020        if self.config.close_positions_on_shutdown {
1021            self.close_open_positions().await;
1022        }
1023
1024        // Persist final risk state + flush before exit so the next boot
1025        // restores an up-to-date snapshot.
1026        if let Some(p) = &persister {
1027            p.persist_all(&self.risk).await;
1028        }
1029
1030        for brain in self.brains.iter() {
1031            let health = brain.health().await;
1032            tracing::info!(
1033                brain = %brain.name(),
1034                healthy = health.healthy,
1035                events = health.events_processed,
1036                non_hold = health.non_hold_decisions,
1037                "final brain health"
1038            );
1039        }
1040
1041        tracing::info!(bot = %self.config.name, "rustrade Bot exited");
1042        run_result
1043    }
1044
1045    async fn prefetch_positions(&self) {
1046        for symbol in &self.config.symbols {
1047            match self.exchange.get_position(symbol).await {
1048                Ok(pos) => {
1049                    self.positions.write().await.insert(symbol.clone(), pos);
1050                    tracing::debug!(
1051                        symbol = %symbol,
1052                        qty = pos.qty,
1053                        "prefetched position from exchange"
1054                    );
1055                }
1056                Err(e) => {
1057                    tracing::warn!(
1058                        symbol = %symbol,
1059                        error = %e,
1060                        "failed to prefetch position; cache defaults to FLAT"
1061                    );
1062                }
1063            }
1064        }
1065    }
1066
1067    async fn close_open_positions(&self) {
1068        let snapshot: Vec<(Symbol, Position)> = {
1069            let map = self.positions.read().await;
1070            map.iter()
1071                .filter(|(_, p)| !p.is_flat())
1072                .map(|(s, p)| (s.clone(), *p))
1073                .collect()
1074        };
1075
1076        if snapshot.is_empty() {
1077            tracing::info!("close_positions_on_shutdown: no open positions");
1078            return;
1079        }
1080
1081        for (symbol, position) in snapshot {
1082            match self.exchange.close_position(&symbol, &position).await {
1083                Ok(order_id) => tracing::info!(
1084                    symbol = %symbol,
1085                    qty = position.qty,
1086                    order_id = %order_id,
1087                    "close_positions_on_shutdown: closed"
1088                ),
1089                Err(e) => tracing::error!(
1090                    symbol = %symbol,
1091                    qty = position.qty,
1092                    error = %e,
1093                    "close_positions_on_shutdown: failed (best-effort)"
1094                ),
1095            }
1096        }
1097    }
1098}
1099
1100// Tiny `pipe` helper local to this module for builder ergonomics — keeps
1101// the `Bot::new` body readable when conditionally applying builder methods.
1102trait Pipe: Sized {
1103    fn pipe<F: FnOnce(Self) -> Self>(self, f: F) -> Self {
1104        f(self)
1105    }
1106}
1107impl<T> Pipe for T {}
1108
1109#[cfg(test)]
1110mod tests {
1111    use super::*;
1112    use async_trait::async_trait;
1113    use rustrade_core::{Fill, MarketDataEvent, Order, Position};
1114
1115    #[test]
1116    fn presets_differ_in_leverage_by_class() {
1117        assert_eq!(RiskConfig::crypto_perp().sizing.leverage, 5);
1118        assert_eq!(RiskConfig::crypto_spot().sizing.leverage, 1);
1119        assert_eq!(RiskConfig::fx().sizing.leverage, 20);
1120        assert_eq!(RiskConfig::futures().sizing.leverage, 10);
1121        assert_eq!(RiskConfig::equity().sizing.leverage, 1);
1122    }
1123
1124    #[test]
1125    fn preset_for_maps_each_class_with_perp_fallback() {
1126        assert_eq!(
1127            RiskConfig::preset_for(AssetClass::CryptoSpot)
1128                .sizing
1129                .leverage,
1130            1
1131        );
1132        assert_eq!(RiskConfig::preset_for(AssetClass::Fx).sizing.leverage, 20);
1133        assert_eq!(
1134            RiskConfig::preset_for(AssetClass::Future).sizing.leverage,
1135            10
1136        );
1137        assert_eq!(
1138            RiskConfig::preset_for(AssetClass::Equity).sizing.leverage,
1139            1
1140        );
1141        // CryptoPerp and the catch-all (`Other` / future classes) → perp shape.
1142        assert_eq!(
1143            RiskConfig::preset_for(AssetClass::CryptoPerp)
1144                .sizing
1145                .leverage,
1146            5
1147        );
1148        assert_eq!(RiskConfig::preset_for(AssetClass::Other).sizing.leverage, 5);
1149    }
1150
1151    #[test]
1152    fn resolve_risk_precedence_symbol_then_class_then_default() {
1153        let cfg = BotConfig::builder()
1154            .name("t")
1155            .symbols(["XBTUSDTM", "EURUSD", "ETHUSDTM"])
1156            .without_signal_handler()
1157            .class_risk(AssetClass::Fx, RiskConfig::fx())
1158            .symbol_risk(
1159                "ETHUSDTM",
1160                RiskConfig {
1161                    sizing: SizingConfig {
1162                        margin_per_trade: 1.0,
1163                        leverage: 7,
1164                        max_contracts: 1,
1165                    },
1166                    ..Default::default()
1167                },
1168            )
1169            .build()
1170            .unwrap();
1171
1172        // Per-symbol override wins even when a class override would also match.
1173        assert_eq!(
1174            cfg.resolve_risk(&"ETHUSDTM".into(), AssetClass::Fx)
1175                .sizing
1176                .leverage,
1177            7
1178        );
1179        // No per-symbol override → the per-class override applies.
1180        assert_eq!(
1181            cfg.resolve_risk(&"EURUSD".into(), AssetClass::Fx)
1182                .sizing
1183                .leverage,
1184            20
1185        );
1186        // Neither → the bot-wide default.
1187        let default_lev = RiskConfig::default().sizing.leverage;
1188        assert_eq!(
1189            cfg.resolve_risk(&"XBTUSDTM".into(), AssetClass::CryptoPerp)
1190                .sizing
1191                .leverage,
1192            default_lev
1193        );
1194    }
1195
1196    struct NoopBrain;
1197    #[async_trait]
1198    impl Brain for NoopBrain {
1199        fn name(&self) -> &str {
1200            "noop"
1201        }
1202        async fn on_event(
1203            &self,
1204            _e: &MarketDataEvent,
1205            _p: &Position,
1206        ) -> Result<rustrade_core::Decision> {
1207            Ok(rustrade_core::Decision::hold())
1208        }
1209    }
1210
1211    /// Brain that declares ownership of a fixed symbol set, for the
1212    /// multi-brain arbitration guard tests.
1213    struct OwningBrain {
1214        name: &'static str,
1215        owns: Vec<Symbol>,
1216    }
1217    #[async_trait]
1218    impl Brain for OwningBrain {
1219        fn name(&self) -> &str {
1220            self.name
1221        }
1222        fn owned_symbols(&self) -> Option<Vec<Symbol>> {
1223            Some(self.owns.clone())
1224        }
1225        async fn on_event(
1226            &self,
1227            _e: &MarketDataEvent,
1228            _p: &Position,
1229        ) -> Result<rustrade_core::Decision> {
1230            Ok(rustrade_core::Decision::hold())
1231        }
1232    }
1233
1234    struct NoopExchange;
1235    #[async_trait]
1236    impl ExchangeClient for NoopExchange {
1237        fn name(&self) -> &str {
1238            "noop"
1239        }
1240        async fn place_order(&self, _o: &Order) -> Result<String> {
1241            Ok("noop-1".into())
1242        }
1243        async fn cancel_all(&self, _s: &Symbol) -> Result<usize> {
1244            Ok(0)
1245        }
1246        async fn close_position(&self, _s: &Symbol, _p: &Position) -> Result<String> {
1247            Ok("noop-close".into())
1248        }
1249        async fn get_position(&self, _s: &Symbol) -> Result<Position> {
1250            Ok(Position::FLAT)
1251        }
1252        async fn get_balance(&self, _c: &str) -> Result<f64> {
1253            Ok(0.0)
1254        }
1255    }
1256
1257    fn cfg() -> BotConfig {
1258        BotConfig::builder()
1259            .name("test")
1260            .symbol("BTCUSDT")
1261            .without_signal_handler()
1262            .build()
1263            .unwrap()
1264    }
1265
1266    #[test]
1267    fn builder_requires_name() {
1268        let err = BotConfig::builder().build().unwrap_err();
1269        assert!(matches!(err, Error::Config(_)), "got {err:?}");
1270    }
1271
1272    #[test]
1273    fn builder_rejects_blank_name() {
1274        let err = BotConfig::builder().name("   ").build().unwrap_err();
1275        assert!(matches!(err, Error::Config(_)), "got {err:?}");
1276    }
1277
1278    #[test]
1279    fn builder_rejects_zero_market_bus_capacity() {
1280        let err = BotConfig::builder()
1281            .name("x")
1282            .symbol("BTCUSDT")
1283            .market_bus_capacity(0)
1284            .build()
1285            .unwrap_err();
1286        assert!(matches!(err, Error::Config(_)));
1287    }
1288
1289    #[test]
1290    fn builder_rejects_zero_signal_bus_capacity() {
1291        let err = BotConfig::builder()
1292            .name("x")
1293            .symbol("BTCUSDT")
1294            .signal_bus_capacity(0)
1295            .build()
1296            .unwrap_err();
1297        assert!(matches!(err, Error::Config(_)));
1298    }
1299
1300    #[test]
1301    fn builder_rejects_empty_symbol_list() {
1302        let err = BotConfig::builder().name("x").build().unwrap_err();
1303        assert!(matches!(err, Error::Config(_)));
1304    }
1305
1306    #[test]
1307    fn builder_rejects_zero_shutdown_timeout() {
1308        let err = BotConfig::builder()
1309            .name("x")
1310            .symbol("BTCUSDT")
1311            .shutdown_timeout(Duration::ZERO)
1312            .build()
1313            .unwrap_err();
1314        assert!(matches!(err, Error::Config(_)));
1315    }
1316
1317    #[test]
1318    fn builder_rejects_nan_loss_limit() {
1319        let err = BotConfig::builder()
1320            .name("x")
1321            .symbol("BTCUSDT")
1322            .session_pnl_config(SessionPnlConfig {
1323                loss_limit: f64::NAN,
1324            })
1325            .build()
1326            .unwrap_err();
1327        assert!(matches!(err, Error::Config(_)));
1328    }
1329
1330    #[test]
1331    fn builder_rejects_non_finite_margin() {
1332        let err = BotConfig::builder()
1333            .name("x")
1334            .symbol("BTCUSDT")
1335            .sizing_config(SizingConfig {
1336                margin_per_trade: f64::INFINITY,
1337                leverage: 1,
1338                max_contracts: 1,
1339            })
1340            .build()
1341            .unwrap_err();
1342        assert!(matches!(err, Error::Config(_)));
1343    }
1344
1345    #[test]
1346    fn builder_accumulates_symbols() {
1347        let c = BotConfig::builder()
1348            .name("x")
1349            .symbol("A")
1350            .symbols(["B", "C"])
1351            .build()
1352            .unwrap();
1353        assert_eq!(c.symbols.len(), 3);
1354        assert_eq!(c.symbols[0], Symbol::new("A"));
1355        assert_eq!(c.symbols[2], Symbol::new("C"));
1356    }
1357
1358    #[test]
1359    fn builder_accepts_risk_overrides() {
1360        let c = BotConfig::builder()
1361            .name("x")
1362            .symbol("BTCUSDT")
1363            .session_pnl_config(SessionPnlConfig { loss_limit: -123.0 })
1364            .sizing_config(SizingConfig {
1365                margin_per_trade: 250.0,
1366                leverage: 10,
1367                max_contracts: 5,
1368            })
1369            .build()
1370            .unwrap();
1371        assert_eq!(c.risk.session_pnl.loss_limit, -123.0);
1372        assert_eq!(c.risk.sizing.leverage, 10);
1373    }
1374
1375    #[test]
1376    fn per_symbol_risk_override_and_fallback() {
1377        let c = BotConfig::builder()
1378            .name("x")
1379            .symbols(["BTCUSDT", "ETHUSDT"])
1380            .session_pnl_config(SessionPnlConfig { loss_limit: -100.0 }) // default
1381            .symbol_risk(
1382                "BTCUSDT",
1383                RiskConfig {
1384                    session_pnl: SessionPnlConfig { loss_limit: -25.0 },
1385                    ..Default::default()
1386                },
1387            )
1388            .build()
1389            .unwrap();
1390        // BTC uses its override; ETH (no override) uses the default.
1391        assert_eq!(
1392            c.risk_for(&Symbol::from("BTCUSDT")).session_pnl.loss_limit,
1393            -25.0
1394        );
1395        assert_eq!(
1396            c.risk_for(&Symbol::from("ETHUSDT")).session_pnl.loss_limit,
1397            -100.0
1398        );
1399    }
1400
1401    #[test]
1402    fn builder_rejects_invalid_per_symbol_override() {
1403        let err = BotConfig::builder()
1404            .name("x")
1405            .symbol("BTCUSDT")
1406            .symbol_risk(
1407                "BTCUSDT",
1408                RiskConfig {
1409                    session_pnl: SessionPnlConfig {
1410                        loss_limit: f64::NAN,
1411                    },
1412                    ..Default::default()
1413                },
1414            )
1415            .build()
1416            .unwrap_err();
1417        assert!(matches!(err, Error::Config(_)), "got {err:?}");
1418    }
1419
1420    #[test]
1421    fn builder_has_separate_default_bus_capacities() {
1422        let c = BotConfig::builder()
1423            .name("x")
1424            .symbol("BTCUSDT")
1425            .build()
1426            .unwrap();
1427        assert_eq!(c.market_bus_capacity, 1024);
1428        assert_eq!(c.signal_bus_capacity, 256);
1429    }
1430
1431    #[tokio::test]
1432    async fn bot_requires_at_least_one_brain() {
1433        match Bot::new(cfg(), Arc::new(NoopExchange), vec![]) {
1434            Err(Error::Config(_)) => {}
1435            other => panic!(
1436                "expected Error::Config for empty brain list, got {:?}",
1437                other.map(|_| "Ok(Bot)").map_err(|e| format!("Err({e})"))
1438            ),
1439        }
1440    }
1441
1442    #[tokio::test]
1443    async fn bot_constructs_and_exposes_handle() {
1444        let bot = Bot::new(cfg(), Arc::new(NoopExchange), vec![Arc::new(NoopBrain)]).unwrap();
1445        let handle = bot.handle();
1446        assert!(!handle.is_shutting_down());
1447        assert_eq!(bot.config().name, "test");
1448        let h2 = handle.clone();
1449        assert!(!h2.is_shutting_down());
1450    }
1451
1452    #[tokio::test]
1453    async fn rejects_overlapping_owned_symbols() {
1454        // Two brains both claim BTCUSDT → config error.
1455        let cfg = BotConfig::builder()
1456            .name("x")
1457            .symbols(["BTCUSDT", "ETHUSDT"])
1458            .without_signal_handler()
1459            .build()
1460            .unwrap();
1461        let a = Arc::new(OwningBrain {
1462            name: "a",
1463            owns: vec![Symbol::from("BTCUSDT")],
1464        });
1465        let b = Arc::new(OwningBrain {
1466            name: "b",
1467            owns: vec![Symbol::from("BTCUSDT"), Symbol::from("ETHUSDT")],
1468        });
1469        assert!(
1470            matches!(
1471                Bot::new(cfg, Arc::new(NoopExchange), vec![a, b]),
1472                Err(Error::Config(_))
1473            ),
1474            "overlapping owned_symbols must be rejected"
1475        );
1476    }
1477
1478    #[tokio::test]
1479    async fn accepts_disjoint_owned_symbols() {
1480        // Disjoint ownership is fine.
1481        let cfg = BotConfig::builder()
1482            .name("x")
1483            .symbols(["BTCUSDT", "ETHUSDT"])
1484            .without_signal_handler()
1485            .build()
1486            .unwrap();
1487        let a = Arc::new(OwningBrain {
1488            name: "a",
1489            owns: vec![Symbol::from("BTCUSDT")],
1490        });
1491        let b = Arc::new(OwningBrain {
1492            name: "b",
1493            owns: vec![Symbol::from("ETHUSDT")],
1494        });
1495        assert!(Bot::new(cfg, Arc::new(NoopExchange), vec![a, b]).is_ok());
1496    }
1497
1498    #[tokio::test]
1499    async fn none_owners_are_not_guarded() {
1500        // Two `None` (catch-all) brains coexist — they opt out of the guard.
1501        let bot = Bot::new(
1502            cfg(),
1503            Arc::new(NoopExchange),
1504            vec![Arc::new(NoopBrain), Arc::new(NoopBrain)],
1505        );
1506        assert!(bot.is_ok());
1507    }
1508
1509    #[allow(dead_code)]
1510    fn _noop_fill_compiles(_: &Fill) {}
1511}