rustrade/bot.rs
1//! The [`Bot`] entry point and its [`BotConfig`] builder.
2//!
3//! `Bot::new` validates configuration and constructs a supervised runtime.
4//! `Bot::run_until_shutdown` starts the framework services and blocks
5//! until shutdown is triggered (via signal or [`BotHandle::shutdown`]).
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use rustrade_core::{
12 AssetClass, Brain, CandleSource, Capability, Error, ExchangeClient, FillSource, MarketDataBus,
13 MarketSource, MetricsSink, NoopSink, Position, Result, SignalBus, StateStore, Symbol,
14};
15use rustrade_risk::{CircuitBreakerConfig, PortfolioRiskConfig, SessionPnlConfig, SizingConfig};
16use rustrade_supervisor::{Supervisor, SupervisorConfig};
17use tokio_util::sync::CancellationToken;
18
19use crate::execution::{ExecutionContext, ExecutionService};
20use crate::handle::BotHandle;
21use crate::order_tracker::{OrderReaperService, OrderTracker};
22use crate::risk_state::{
23 PortfolioRiskState, PositionCache, RiskPersister, RiskStateMap, build_portfolio_risk,
24 build_position_cache, build_risk_state,
25};
26use crate::risk_sweep::RiskSweepService;
27use crate::services::{CandlePollerService, FillRoutingService, MarketFeedService};
28
29const DEFAULT_MARKET_BUS_CAPACITY: usize = 1024;
30const DEFAULT_SIGNAL_BUS_CAPACITY: usize = 256;
31/// How often the risk sweep runs: detects the 00:00 UTC rollover for the
32/// per-symbol and account risk state, and refreshes the portfolio daily-loss
33/// latch. Coarse on purpose — these are day-scale controls.
34const RISK_SWEEP_CADENCE: Duration = Duration::from_secs(15);
35
36/// How long a pending-entry reservation counts against the portfolio gate
37/// before it expires. Long enough for any realistic market-order fill →
38/// position-cache refresh round trip; short enough that a setup without a
39/// fill source (whose cache never updates) recovers on its own.
40const PENDING_ENTRY_TTL: Duration = Duration::from_secs(30);
41
42/// Risk-layer config for a symbol: session-PnL cap, circuit breaker, and
43/// position sizing. Used both as the bot-wide default
44/// ([`BotConfig::risk`]) and as a per-symbol override
45/// ([`BotConfig::per_symbol_risk`], set via
46/// [`BotConfigBuilder::symbol_risk`]).
47#[derive(Debug, Clone, Default)]
48pub struct RiskConfig {
49 /// Session PnL config applied to every configured symbol.
50 pub session_pnl: SessionPnlConfig,
51 /// Circuit-breaker config applied to every configured symbol.
52 pub circuit_breaker: CircuitBreakerConfig,
53 /// Position-sizing config used by the execution service.
54 pub sizing: SizingConfig,
55}
56
57impl RiskConfig {
58 /// Per-[`AssetClass`] starting presets. These are **sensible defaults to
59 /// tune**, not prescriptions — they mainly differ in leverage (the most
60 /// class-defining knob) and per-trade size; tighten or loosen for your
61 /// venue and risk appetite. Apply one per class via
62 /// [`BotConfigBuilder::class_risk`], or per symbol via
63 /// [`BotConfigBuilder::symbol_risk`].
64 ///
65 /// Crypto perpetuals: moderate 5× leverage (the framework default shape).
66 #[must_use]
67 pub fn crypto_perp() -> Self {
68 Self {
69 session_pnl: SessionPnlConfig { loss_limit: -50.0 },
70 circuit_breaker: CircuitBreakerConfig::default(),
71 sizing: SizingConfig {
72 margin_per_trade: 500.0,
73 leverage: 5,
74 max_contracts: 50,
75 },
76 }
77 }
78
79 /// Crypto spot: **no leverage** (1×) — one unit traded is one base unit.
80 #[must_use]
81 pub fn crypto_spot() -> Self {
82 Self {
83 session_pnl: SessionPnlConfig { loss_limit: -50.0 },
84 circuit_breaker: CircuitBreakerConfig::default(),
85 sizing: SizingConfig {
86 margin_per_trade: 500.0,
87 leverage: 1,
88 max_contracts: 50,
89 },
90 }
91 }
92
93 /// FX: higher leverage is conventional; a tighter sliding breaker.
94 #[must_use]
95 pub fn fx() -> Self {
96 Self {
97 session_pnl: SessionPnlConfig { loss_limit: -100.0 },
98 circuit_breaker: CircuitBreakerConfig {
99 loss_limit: 3,
100 window_secs: 7_200,
101 cooldown_secs: 3_600,
102 },
103 sizing: SizingConfig {
104 margin_per_trade: 1_000.0,
105 leverage: 20,
106 max_contracts: 50,
107 },
108 }
109 }
110
111 /// Dated futures: bigger contracts, moderate leverage, a wider daily cap.
112 #[must_use]
113 pub fn futures() -> Self {
114 Self {
115 session_pnl: SessionPnlConfig { loss_limit: -200.0 },
116 circuit_breaker: CircuitBreakerConfig::default(),
117 sizing: SizingConfig {
118 margin_per_trade: 1_000.0,
119 leverage: 10,
120 max_contracts: 20,
121 },
122 }
123 }
124
125 /// Cash equities: **no leverage** (1×), conservative size.
126 #[must_use]
127 pub fn equity() -> Self {
128 Self {
129 session_pnl: SessionPnlConfig { loss_limit: -200.0 },
130 circuit_breaker: CircuitBreakerConfig::default(),
131 sizing: SizingConfig {
132 margin_per_trade: 1_000.0,
133 leverage: 1,
134 max_contracts: 20,
135 },
136 }
137 }
138
139 /// The starting preset for an [`AssetClass`]: `CryptoSpot` →
140 /// [`Self::crypto_spot`], `Fx` → [`Self::fx`], `Future` → [`Self::futures`],
141 /// `Equity` → [`Self::equity`]; `CryptoPerp` and any other class fall back
142 /// to [`Self::crypto_perp`].
143 #[must_use]
144 pub fn preset_for(class: AssetClass) -> Self {
145 match class {
146 AssetClass::CryptoSpot => Self::crypto_spot(),
147 AssetClass::Fx => Self::fx(),
148 AssetClass::Future => Self::futures(),
149 AssetClass::Equity => Self::equity(),
150 AssetClass::CryptoPerp => Self::crypto_perp(),
151 // `AssetClass` is #[non_exhaustive]: `Other` and any future class
152 // fall back to the crypto-perp shape.
153 _ => Self::crypto_perp(),
154 }
155 }
156}
157
158/// What the execution service does when a bracket entry has been filled
159/// but its protective stop-loss leg could not be placed.
160///
161/// A bracket entry is placed *before* its SL + TP legs (the legs are
162/// reduce-only and need the position to exist). If the stop-loss leg is
163/// then rejected, the bot is holding a position with none of the
164/// protection the brain asked for. This policy decides what happens next.
165#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
166#[non_exhaustive]
167pub enum BracketFailurePolicy {
168 /// Immediately close the entry with a reduce-only market order
169 /// (default). The brain asked for a protected position; if the
170 /// framework can't protect it, it doesn't hold it. If the close
171 /// itself fails, an error is logged — at that point manual
172 /// intervention is required.
173 #[default]
174 CloseEntry,
175 /// Keep the position open and log an error. Use when the host has
176 /// its own protective layer (e.g. exchange-side account stops) and
177 /// an unprotected entry is acceptable.
178 KeepUnprotected,
179}
180
181/// Configuration for a [`Bot`].
182///
183/// Construct via [`BotConfig::builder`]. The builder validates every
184/// field on [`BotConfigBuilder::build`] and returns `Error::Config` on
185/// any violation — the framework never panics on bad config. `Bot::new`
186/// does a final brain-count check on top.
187///
188/// # Example
189///
190/// ```
191/// use std::time::Duration;
192/// use rustrade::BotConfig;
193///
194/// let config = BotConfig::builder()
195/// .name("market-maker")
196/// .symbols(["BTCUSDT", "ETHUSDT"])
197/// .shutdown_timeout(Duration::from_secs(5))
198/// .without_signal_handler() // tests + embedded use
199/// .build()
200/// .unwrap();
201///
202/// assert_eq!(config.name, "market-maker");
203/// assert_eq!(config.symbols.len(), 2);
204/// ```
205#[derive(Debug, Clone)]
206pub struct BotConfig {
207 /// Human-readable name used in logs, tracing spans, and supervisor
208 /// service identification.
209 pub name: String,
210 /// Symbols this bot trades. Every symbol gets a pre-seeded entry in
211 /// the risk-state map and the position cache. Must be non-empty —
212 /// the position cache and risk-state map would otherwise be empty,
213 /// which is a silent footgun.
214 pub symbols: Vec<Symbol>,
215 /// Maximum time to wait for services to drain on shutdown. Must be
216 /// `> 0`; the supervisor's drain logic needs a non-zero deadline.
217 pub shutdown_timeout: Duration,
218 /// Whether the supervisor installs its own Ctrl-C / SIGTERM handler.
219 /// Disable when the host service drives shutdown via [`BotHandle::shutdown`].
220 pub install_signal_handler: bool,
221 /// Capacity of the in-process market-data broadcast bus. Backed by
222 /// `tokio::sync::broadcast`, which has **drop-oldest** semantics: a
223 /// slow subscriber that falls behind by more than `capacity` events
224 /// sees `RecvError::Lagged(n)` and the oldest dropped events are
225 /// gone. Size this to absorb the worst-case latency between
226 /// publish and slowest subscriber's `recv`.
227 pub market_bus_capacity: usize,
228 /// Capacity of the in-process signal broadcast bus. Same drop-oldest
229 /// semantics as `market_bus_capacity`. Typically smaller — signals
230 /// are emitted ~once per non-Hold decision, far less frequent than
231 /// market events.
232 pub signal_bus_capacity: usize,
233 /// On shutdown, attempt to close any open position for each symbol
234 /// before exit, using `ExchangeClient::close_position`. Best-effort:
235 /// failures are logged but do not propagate.
236 pub close_positions_on_shutdown: bool,
237 /// Risk-layer defaults applied to every configured symbol that has no
238 /// entry in [`Self::per_symbol_risk`].
239 pub risk: RiskConfig,
240 /// Per-symbol risk overrides. A symbol present here uses its own
241 /// [`RiskConfig`] (session-PnL cap, circuit breaker, and sizing) instead
242 /// of [`Self::risk`] — e.g. a tighter drawdown cap on a volatile alt, or
243 /// a larger size on a flagship symbol. Symbols absent here use the
244 /// default. Resolve with [`Self::resolve_risk`].
245 pub per_symbol_risk: HashMap<Symbol, RiskConfig>,
246 /// Per-[`AssetClass`] risk overrides. A symbol whose
247 /// [`InstrumentSpec`](rustrade_core::InstrumentSpec) reports a class present
248 /// here uses that class's [`RiskConfig`] — unless a per-symbol override
249 /// also exists, which wins. Lets one bot apply crypto-perp / spot / FX /
250 /// futures rules side by side. See [`RiskConfig::preset_for`] for starting
251 /// presets and [`Self::resolve_risk`] for the precedence.
252 pub per_class_risk: HashMap<AssetClass, RiskConfig>,
253 /// Account-wide risk applied across **all** symbols: a daily-loss halt,
254 /// a max-concurrent-positions cap, and a gross-exposure cap. Complements
255 /// the per-symbol [`RiskConfig`] gates. Defaults to all-off (opt-in), so
256 /// a bot that doesn't set it behaves exactly as before.
257 pub portfolio: PortfolioRiskConfig,
258 /// What to do when a bracket entry fills but its protective stop-loss
259 /// leg fails to place. Defaults to [`BracketFailurePolicy::CloseEntry`]:
260 /// the unprotected entry is closed with a reduce-only market order.
261 pub bracket_failure_policy: BracketFailurePolicy,
262}
263
264impl BotConfig {
265 /// Begin building a config with sensible defaults.
266 pub fn builder() -> BotConfigBuilder {
267 BotConfigBuilder::default()
268 }
269
270 /// The effective [`RiskConfig`] for `symbol`, ignoring asset class: its
271 /// per-symbol override if set, else the bot-wide default ([`Self::risk`]).
272 /// Prefer [`Self::resolve_risk`], which also honours per-class overrides.
273 pub fn risk_for(&self, symbol: &Symbol) -> &RiskConfig {
274 self.per_symbol_risk.get(symbol).unwrap_or(&self.risk)
275 }
276
277 /// The effective [`RiskConfig`] for `symbol` of the given `asset_class`,
278 /// applying the precedence **per-symbol → per-class → default**. The
279 /// framework resolves `asset_class` from
280 /// [`ExchangeClient::instrument_spec`] at startup, so a multi-asset bot
281 /// gets the right rules per symbol without listing every one.
282 pub fn resolve_risk(&self, symbol: &Symbol, asset_class: AssetClass) -> &RiskConfig {
283 self.per_symbol_risk
284 .get(symbol)
285 .or_else(|| self.per_class_risk.get(&asset_class))
286 .unwrap_or(&self.risk)
287 }
288}
289
290/// Builder for [`BotConfig`].
291#[derive(Debug, Clone, Default)]
292pub struct BotConfigBuilder {
293 name: Option<String>,
294 symbols: Vec<Symbol>,
295 shutdown_timeout: Option<Duration>,
296 install_signal_handler: Option<bool>,
297 market_bus_capacity: Option<usize>,
298 signal_bus_capacity: Option<usize>,
299 close_positions_on_shutdown: Option<bool>,
300 risk: RiskConfig,
301 per_symbol_risk: HashMap<Symbol, RiskConfig>,
302 per_class_risk: HashMap<AssetClass, RiskConfig>,
303 portfolio: PortfolioRiskConfig,
304 bracket_failure_policy: Option<BracketFailurePolicy>,
305}
306
307impl BotConfigBuilder {
308 /// Human-readable bot name (logs, supervisor identification). Required.
309 pub fn name(mut self, name: impl Into<String>) -> Self {
310 self.name = Some(name.into());
311 self
312 }
313
314 /// Add a single symbol. Repeated calls accumulate.
315 pub fn symbol(mut self, sym: impl Into<Symbol>) -> Self {
316 self.symbols.push(sym.into());
317 self
318 }
319
320 /// Add many symbols at once. Repeated calls accumulate.
321 pub fn symbols<I, S>(mut self, syms: I) -> Self
322 where
323 I: IntoIterator<Item = S>,
324 S: Into<Symbol>,
325 {
326 self.symbols.extend(syms.into_iter().map(Into::into));
327 self
328 }
329
330 /// Maximum time to wait for services to drain on shutdown.
331 pub fn shutdown_timeout(mut self, dur: Duration) -> Self {
332 self.shutdown_timeout = Some(dur);
333 self
334 }
335
336 /// Disable the supervisor's signal handler — host drives shutdown.
337 pub fn without_signal_handler(mut self) -> Self {
338 self.install_signal_handler = Some(false);
339 self
340 }
341
342 /// Override the in-process market-data bus capacity (default 1024).
343 pub fn market_bus_capacity(mut self, cap: usize) -> Self {
344 self.market_bus_capacity = Some(cap);
345 self
346 }
347
348 /// Override the in-process signal-bus capacity (default 256).
349 pub fn signal_bus_capacity(mut self, cap: usize) -> Self {
350 self.signal_bus_capacity = Some(cap);
351 self
352 }
353
354 /// Enable best-effort `exchange.close_position` for non-flat positions
355 /// after the supervisor drains.
356 pub fn close_positions_on_shutdown(mut self, b: bool) -> Self {
357 self.close_positions_on_shutdown = Some(b);
358 self
359 }
360
361 /// Override the session-PnL config used for every symbol.
362 pub fn session_pnl_config(mut self, cfg: SessionPnlConfig) -> Self {
363 self.risk.session_pnl = cfg;
364 self
365 }
366
367 /// Override the circuit-breaker config used for every symbol.
368 pub fn circuit_breaker_config(mut self, cfg: CircuitBreakerConfig) -> Self {
369 self.risk.circuit_breaker = cfg;
370 self
371 }
372
373 /// Override the position-sizing config used for every symbol.
374 pub fn sizing_config(mut self, cfg: SizingConfig) -> Self {
375 self.risk.sizing = cfg;
376 self
377 }
378
379 /// Set the account-wide [`PortfolioRiskConfig`] — a daily-loss halt, a
380 /// max-concurrent-positions cap, and a gross-exposure cap applied across
381 /// all symbols. Unset, every limit is off (the per-symbol gates still apply).
382 ///
383 /// ```
384 /// use rustrade::{BotConfig, PortfolioRiskConfig};
385 ///
386 /// let cfg = BotConfig::builder()
387 /// .name("bot")
388 /// .symbols(["BTCUSDT", "ETHUSDT"])
389 /// .without_signal_handler()
390 /// .portfolio_config(PortfolioRiskConfig {
391 /// max_daily_loss: -500.0,
392 /// max_concurrent_positions: 3,
393 /// max_gross_exposure: 50_000.0,
394 /// })
395 /// .build()
396 /// .unwrap();
397 /// assert_eq!(cfg.portfolio.max_concurrent_positions, 3);
398 /// ```
399 pub fn portfolio_config(mut self, cfg: PortfolioRiskConfig) -> Self {
400 self.portfolio = cfg;
401 self
402 }
403
404 /// Set the [`BracketFailurePolicy`] — what happens when a bracket entry
405 /// fills but its protective stop-loss leg fails to place. Defaults to
406 /// [`BracketFailurePolicy::CloseEntry`].
407 pub fn bracket_failure_policy(mut self, policy: BracketFailurePolicy) -> Self {
408 self.bracket_failure_policy = Some(policy);
409 self
410 }
411
412 /// Set a per-symbol [`RiskConfig`] override. The given symbol then uses
413 /// this config (session-PnL cap, circuit breaker, sizing) instead of the
414 /// bot-wide default. Repeated calls for the same symbol replace the
415 /// previous override. Symbols without an override use the default.
416 ///
417 /// ```
418 /// use rustrade::{BotConfig, RiskConfig};
419 /// use rustrade::SessionPnlConfig;
420 ///
421 /// let cfg = BotConfig::builder()
422 /// .name("bot")
423 /// .symbols(["BTCUSDT", "DOGEUSDT"])
424 /// .without_signal_handler()
425 /// // tighter daily loss cap on the volatile alt
426 /// .symbol_risk("DOGEUSDT", RiskConfig {
427 /// session_pnl: SessionPnlConfig { loss_limit: -20.0 },
428 /// ..Default::default()
429 /// })
430 /// .build()
431 /// .unwrap();
432 /// assert_eq!(cfg.risk_for(&"DOGEUSDT".into()).session_pnl.loss_limit, -20.0);
433 /// ```
434 pub fn symbol_risk(mut self, symbol: impl Into<Symbol>, cfg: RiskConfig) -> Self {
435 self.per_symbol_risk.insert(symbol.into(), cfg);
436 self
437 }
438
439 /// Set a per-[`AssetClass`] [`RiskConfig`] override. Symbols whose
440 /// instrument reports this class use `cfg` (unless a per-symbol override
441 /// also exists, which wins). See [`RiskConfig::preset_for`] for ready-made
442 /// presets to start from.
443 ///
444 /// ```
445 /// use rustrade::{AssetClass, BotConfig, RiskConfig};
446 ///
447 /// let cfg = BotConfig::builder()
448 /// .name("multi-asset")
449 /// .symbols(["XBTUSDTM", "EURUSD"])
450 /// .without_signal_handler()
451 /// .class_risk(AssetClass::CryptoPerp, RiskConfig::crypto_perp())
452 /// .class_risk(AssetClass::Fx, RiskConfig::fx())
453 /// .build()
454 /// .unwrap();
455 /// // FX symbols resolve to the FX preset (20× leverage) absent a
456 /// // per-symbol override.
457 /// assert_eq!(
458 /// cfg.resolve_risk(&"EURUSD".into(), AssetClass::Fx).sizing.leverage,
459 /// 20
460 /// );
461 /// ```
462 pub fn class_risk(mut self, class: AssetClass, cfg: RiskConfig) -> Self {
463 self.per_class_risk.insert(class, cfg);
464 self
465 }
466
467 /// Validate and build. Returns `Error::Config` on any constraint
468 /// violation — the framework never panics on bad config.
469 pub fn build(self) -> Result<BotConfig> {
470 let name = self
471 .name
472 .filter(|n| !n.trim().is_empty())
473 .ok_or_else(|| Error::config("BotConfig.name is required and must not be empty"))?;
474
475 if self.symbols.is_empty() {
476 return Err(Error::config(
477 "BotConfig.symbols must contain at least one Symbol — \
478 the position cache and risk-state map are pre-seeded per symbol",
479 ));
480 }
481
482 let market_bus_capacity = self
483 .market_bus_capacity
484 .unwrap_or(DEFAULT_MARKET_BUS_CAPACITY);
485 if market_bus_capacity == 0 {
486 return Err(Error::config(
487 "BotConfig.market_bus_capacity must be > 0 (broadcast channel cannot have 0 slots)",
488 ));
489 }
490
491 let signal_bus_capacity = self
492 .signal_bus_capacity
493 .unwrap_or(DEFAULT_SIGNAL_BUS_CAPACITY);
494 if signal_bus_capacity == 0 {
495 return Err(Error::config(
496 "BotConfig.signal_bus_capacity must be > 0 (broadcast channel cannot have 0 slots)",
497 ));
498 }
499
500 let shutdown_timeout = self.shutdown_timeout.unwrap_or(Duration::from_secs(30));
501 if shutdown_timeout.is_zero() {
502 return Err(Error::config(
503 "BotConfig.shutdown_timeout must be > 0 — drain needs a non-zero deadline",
504 ));
505 }
506
507 // Validate the default risk config and every per-symbol / per-class
508 // override with the same rules — bad config never reaches the runtime.
509 validate_risk(&self.risk, "BotConfig.risk")?;
510 for (sym, cfg) in &self.per_symbol_risk {
511 validate_risk(cfg, &format!("BotConfig.per_symbol_risk[{sym}]"))?;
512 }
513 for (class, cfg) in &self.per_class_risk {
514 validate_risk(cfg, &format!("BotConfig.per_class_risk[{class:?}]"))?;
515 }
516 validate_portfolio(&self.portfolio)?;
517
518 Ok(BotConfig {
519 name,
520 symbols: self.symbols,
521 shutdown_timeout,
522 install_signal_handler: self.install_signal_handler.unwrap_or(true),
523 market_bus_capacity,
524 signal_bus_capacity,
525 close_positions_on_shutdown: self.close_positions_on_shutdown.unwrap_or(false),
526 risk: self.risk,
527 per_symbol_risk: self.per_symbol_risk,
528 per_class_risk: self.per_class_risk,
529 portfolio: self.portfolio,
530 bracket_failure_policy: self.bracket_failure_policy.unwrap_or_default(),
531 })
532 }
533}
534
535/// Validate a [`RiskConfig`] (the default or a per-symbol override).
536/// `ctx` names the offending field in the error.
537fn validate_risk(risk: &RiskConfig, ctx: &str) -> Result<()> {
538 if risk.session_pnl.loss_limit.is_nan() {
539 return Err(Error::config(format!(
540 "{ctx}.session_pnl.loss_limit must not be NaN"
541 )));
542 }
543 if !risk.sizing.margin_per_trade.is_finite() || risk.sizing.margin_per_trade < 0.0 {
544 return Err(Error::config(format!(
545 "{ctx}.sizing.margin_per_trade must be a finite non-negative number"
546 )));
547 }
548 Ok(())
549}
550
551/// Validate the account-wide [`PortfolioRiskConfig`]. `NaN` limits would make
552/// every comparison silently false (a disabled limit that looks enabled), so
553/// they're rejected; `±∞` is allowed as the explicit "disabled" sentinel.
554fn validate_portfolio(p: &PortfolioRiskConfig) -> Result<()> {
555 if p.max_daily_loss.is_nan() {
556 return Err(Error::config(
557 "BotConfig.portfolio.max_daily_loss must not be NaN (use f64::NEG_INFINITY to disable)",
558 ));
559 }
560 if p.max_gross_exposure.is_nan() {
561 return Err(Error::config(
562 "BotConfig.portfolio.max_gross_exposure must not be NaN (use f64::INFINITY to disable)",
563 ));
564 }
565 Ok(())
566}
567
568/// The embedded trading bot.
569///
570/// Owns a [`Supervisor`], an [`ExchangeClient`], one or more [`Brain`]s,
571/// and the in-process [`MarketDataBus`]. Created via [`Bot::new`]; run
572/// via [`Bot::run_until_shutdown`]; observed and steered via the
573/// [`BotHandle`] returned from [`Bot::handle`].
574///
575/// # Example
576///
577/// ```no_run
578/// use std::sync::Arc;
579/// use rustrade::{Bot, BotConfig};
580/// # struct MyBrain;
581/// # #[async_trait::async_trait]
582/// # impl rustrade_core::Brain for MyBrain {
583/// # fn name(&self) -> &str { "x" }
584/// # async fn on_event(&self, _: &rustrade_core::MarketDataEvent, _: &rustrade_core::Position)
585/// # -> rustrade_core::Result<rustrade_core::Decision> {
586/// # Ok(rustrade_core::Decision::hold())
587/// # }
588/// # async fn health(&self) -> rustrade_core::BrainHealth { rustrade_core::BrainHealth::ok() }
589/// # }
590/// # async fn run(exchange: Arc<dyn rustrade_core::ExchangeClient>) -> anyhow::Result<()> {
591/// let config = BotConfig::builder()
592/// .name("my-bot")
593/// .symbol("BTCUSDT")
594/// .build()?;
595///
596/// let bot = Bot::new(config, exchange, vec![Arc::new(MyBrain) as Arc<dyn rustrade_core::Brain>])?;
597/// let handle = bot.handle();
598///
599/// // Spawn the bot; ask it to shut down from elsewhere.
600/// let task = tokio::spawn(async move { bot.run_until_shutdown().await });
601/// handle.shutdown();
602/// task.await??;
603/// # Ok(())
604/// # }
605/// ```
606pub struct Bot {
607 config: BotConfig,
608 supervisor: Arc<Supervisor>,
609 exchange: Arc<dyn ExchangeClient>,
610 brains: Arc<Vec<Arc<dyn Brain>>>,
611 market_bus: MarketDataBus,
612 signal_bus: SignalBus,
613 positions: PositionCache,
614 risk: RiskStateMap,
615 metrics: Arc<dyn MetricsSink>,
616 state_store: Option<Arc<dyn StateStore>>,
617 persister_slot: crate::handle::PersisterSlot,
618 handle: BotHandle,
619 external_cancel: Option<CancellationToken>,
620 market_source: Option<Arc<dyn MarketSource>>,
621 fill_source: Option<Arc<dyn FillSource>>,
622 candle_pollers: Vec<CandlePollerSpec>,
623 order_tracker: OrderTracker,
624 order_tracking: Option<OrderTrackingSpec>,
625}
626
627/// Settings captured by `Bot::with_order_tracking`.
628struct OrderTrackingSpec {
629 ttl: Duration,
630 poll_cadence: Duration,
631}
632
633/// One registered `(symbol, interval, cadence)` for `Bot::with_candle_poller`.
634struct CandlePollerSpec {
635 source: Arc<dyn CandleSource>,
636 symbol: Symbol,
637 interval: Duration,
638 poll_cadence: Duration,
639 limit: usize,
640}
641
642impl Bot {
643 /// Construct a `Bot`. Validates that at least one brain is provided.
644 ///
645 /// The exchange client and brain set are immutable for the bot's
646 /// lifetime — to change them, build a new `Bot`.
647 pub fn new(
648 config: BotConfig,
649 exchange: Arc<dyn ExchangeClient>,
650 brains: Vec<Arc<dyn Brain>>,
651 ) -> Result<Self> {
652 if brains.is_empty() {
653 return Err(Error::config(
654 "Bot::new requires at least one Brain — empty brain list",
655 ));
656 }
657
658 // Multi-brain arbitration guard: no two brains may claim ownership of
659 // the same symbol via `Brain::owned_symbols` — that would let two
660 // strategies fight over one position. Brains returning `None` opt out
661 // and self-coordinate (current behaviour).
662 let mut claimed: HashMap<Symbol, String> = HashMap::new();
663 for brain in &brains {
664 let Some(syms) = brain.owned_symbols() else {
665 continue;
666 };
667 for sym in syms.into_iter().collect::<std::collections::HashSet<_>>() {
668 if let Some(other) = claimed.insert(sym.clone(), brain.name().to_string()) {
669 return Err(Error::config(format!(
670 "brains '{other}' and '{}' both declare ownership of {sym} — \
671 overlapping owned_symbols would fight over one position",
672 brain.name()
673 )));
674 }
675 }
676 }
677
678 let supervisor = Arc::new(Supervisor::new(
679 SupervisorConfig::default()
680 .with_shutdown_timeout(config.shutdown_timeout)
681 .with_default_backoff(Default::default())
682 .pipe(|c| {
683 if config.install_signal_handler {
684 c
685 } else {
686 c.without_signal_handler()
687 }
688 }),
689 ));
690
691 let market_bus = MarketDataBus::with_capacity(config.market_bus_capacity);
692 let signal_bus = SignalBus::with_capacity(config.signal_bus_capacity);
693 let positions = build_position_cache(&config.symbols);
694 let risk = build_risk_state(&config.symbols, |sym| {
695 // Resolve per-symbol → per-class (by the adapter's asset class) →
696 // default, so a multi-asset bot gets the right gates per symbol.
697 let class = exchange.instrument_spec(sym).asset_class;
698 let r = config.resolve_risk(sym, class);
699 (r.session_pnl.clone(), r.circuit_breaker.clone())
700 });
701
702 let brains = Arc::new(brains);
703 let persister_slot: crate::handle::PersisterSlot = Arc::new(std::sync::OnceLock::new());
704 let order_tracker = OrderTracker::new();
705 let handle = BotHandle::new(
706 supervisor.clone(),
707 brains.clone(),
708 risk.clone(),
709 positions.clone(),
710 signal_bus.clone(),
711 persister_slot.clone(),
712 order_tracker.clone(),
713 );
714
715 Ok(Self {
716 config,
717 supervisor,
718 exchange,
719 brains,
720 market_bus,
721 signal_bus,
722 positions,
723 risk,
724 metrics: Arc::new(NoopSink),
725 state_store: None,
726 persister_slot,
727 order_tracker,
728 handle,
729 external_cancel: None,
730 market_source: None,
731 fill_source: None,
732 candle_pollers: Vec::new(),
733 order_tracking: None,
734 })
735 }
736
737 /// Install a [`MetricsSink`]. The framework's services emit
738 /// counters and histograms to this sink on every observable event;
739 /// the default is [`NoopSink`], which discards everything.
740 pub fn with_metrics(mut self, sink: Arc<dyn MetricsSink>) -> Self {
741 self.metrics = sink;
742 self
743 }
744
745 /// Install a [`StateStore`] so per-symbol risk state (session PnL and
746 /// circuit breaker) survives restarts.
747 ///
748 /// Without a store, risk state is in-memory only: a crash mid-session
749 /// resets the daily drawdown cap and the loss-streak breaker. With one
750 /// wired, the bot:
751 ///
752 /// - **Restores** each symbol's snapshot on startup, then applies the
753 /// stale-snapshot policy — a session from an earlier UTC day rolls
754 /// over to fresh, and a breaker whose cooldown elapsed during
755 /// downtime auto-resets.
756 /// - **Persists** after every realised trade (whether fed via
757 /// [`BotHandle::record_trade_outcome`](crate::BotHandle::record_trade_outcome)
758 /// or auto-routed by the `FillRoutingService`).
759 /// - **Flushes** on graceful shutdown.
760 ///
761 /// Use [`rustrade_core::InMemoryStore`] for a non-durable default, or a
762 /// disk-/database-backed implementation from a downstream crate for
763 /// real durability. Snapshots are keyed by `(bot name, symbol)`, so
764 /// distinct bots can share one backend without collision.
765 pub fn with_state_store(mut self, store: Arc<dyn StateStore>) -> Self {
766 self.state_store = Some(store);
767 self
768 }
769
770 /// Enable resting-order lifecycle tracking.
771 ///
772 /// The [`ExecutionService`] records every resting order it places (limit
773 /// / post-only / IOC / FOK — market orders never rest, so they're
774 /// skipped), and a supervised [`OrderReaperService`] periodically:
775 ///
776 /// - **reconciles** the tracker against `exchange.get_open_orders` (so
777 /// orders filled or cancelled out-of-band stop being tracked), and
778 /// - **cancels** any resting order older than `ttl` via
779 /// `exchange.cancel_order`.
780 ///
781 /// `poll_cadence` is how often a sweep runs. The reaper is only spawned
782 /// if the adapter advertises [`Capability::OrderTracking`]; otherwise the
783 /// call is a no-op (with a warning at startup), since the framework can't
784 /// list or cancel orders without it. Live tracked orders are visible via
785 /// [`BotHandle::tracked_orders`](crate::BotHandle::tracked_orders).
786 pub fn with_order_tracking(mut self, ttl: Duration, poll_cadence: Duration) -> Self {
787 self.order_tracking = Some(OrderTrackingSpec { ttl, poll_cadence });
788 self
789 }
790
791 /// Register a [`CandleSource`] to be polled every `poll_cadence` for
792 /// `(symbol, interval)`. Polled candles are deduplicated by
793 /// timestamp and published to the bot's [`MarketDataBus`]. Repeated
794 /// calls accumulate — one supervised service per registered tuple.
795 pub fn with_candle_poller(
796 mut self,
797 source: Arc<dyn CandleSource>,
798 symbol: impl Into<Symbol>,
799 interval: Duration,
800 poll_cadence: Duration,
801 limit: usize,
802 ) -> Self {
803 self.candle_pollers.push(CandlePollerSpec {
804 source,
805 symbol: symbol.into(),
806 interval,
807 poll_cadence,
808 limit,
809 });
810 self
811 }
812
813 /// Tie this bot's shutdown to an externally-owned cancellation token.
814 ///
815 /// When the external token is cancelled, the bot's supervisor token
816 /// is cancelled too — equivalent to calling [`BotHandle::shutdown`]
817 /// but without spawning a linker task in the host.
818 ///
819 /// The reverse is not true: cancelling the bot does not cancel the
820 /// external token.
821 pub fn with_external_cancel(mut self, token: CancellationToken) -> Self {
822 self.external_cancel = Some(token);
823 self
824 }
825
826 /// Attach a [`MarketSource`] to be driven by a supervised
827 /// [`MarketFeedService`]. Source implementors are responsible for
828 /// publishing to the bot's [`MarketDataBus`] (obtain via
829 /// `bot.market_data_bus().clone()` before constructing the source).
830 pub fn with_market_source(mut self, source: Arc<dyn MarketSource>) -> Self {
831 self.market_source = Some(source);
832 self
833 }
834
835 /// Attach a [`FillSource`] to be driven by a supervised
836 /// [`FillRoutingService`]. Fills are routed to every brain via
837 /// `Brain::on_fill` and the position cache is refreshed from the
838 /// exchange after each one.
839 pub fn with_fill_source(mut self, source: Arc<dyn FillSource>) -> Self {
840 self.fill_source = Some(source);
841 self
842 }
843
844 /// Cheap cloneable handle for host services. Can be obtained at any
845 /// point — call before [`Self::run_until_shutdown`] so the host can
846 /// drive shutdown while the bot is running.
847 pub fn handle(&self) -> BotHandle {
848 self.handle.clone()
849 }
850
851 /// Reference to the bot's configuration.
852 pub fn config(&self) -> &BotConfig {
853 &self.config
854 }
855
856 /// Borrow the in-process market-data bus. Host services and adapters
857 /// publish here; the bot's framework services subscribe.
858 pub fn market_data_bus(&self) -> &MarketDataBus {
859 &self.market_bus
860 }
861
862 /// Borrow the in-process signal bus. The execution service publishes
863 /// a [`Signal`](rustrade_core::Signal) to this bus on every
864 /// non-`Hold` decision the brain emits; host services subscribe via
865 /// [`BotHandle::subscribe_signals`].
866 pub fn signal_bus(&self) -> &SignalBus {
867 &self.signal_bus
868 }
869
870 /// Spawn the framework services and run until shutdown.
871 ///
872 /// Returns after all spawned services have drained (or the configured
873 /// shutdown timeout elapses). Consumes `self` to make the
874 /// "construct → run → exit" lifecycle explicit; persistent observation
875 /// of the running bot is done via the [`BotHandle`] obtained earlier.
876 ///
877 /// # Runtime requirements
878 ///
879 /// - **Multi-thread tokio runtime.** The supervisor spawns each
880 /// service onto `tokio::spawn`. A current-thread runtime works
881 /// for small loads but loses the per-service parallelism the
882 /// framework is designed for. Use
883 /// `#[tokio::main(flavor = "multi_thread")]` or
884 /// `tokio::runtime::Builder::new_multi_thread()` in the host.
885 /// - **`tokio::spawn` is used internally.** Anywhere the host
886 /// embeds this method, a tokio runtime context must be active.
887 /// - **No nested runtimes.** `Bot::run_until_shutdown` is async; do
888 /// not call `block_on` on it from inside another runtime.
889 ///
890 /// # Resource expectations
891 ///
892 /// - **Memory per active symbol:** O(few hundred bytes) for the
893 /// position cache entry, `SymbolRisk` (a `SessionPnl` plus a
894 /// `CircuitBreaker` whose ring buffer is bounded by
895 /// `loss_limit`), plus the per-symbol slot in any host-owned
896 /// subscriber.
897 /// - **Channel buffers:** `market_bus_capacity` + `signal_bus_capacity`
898 /// slots per bus, each slot holding a clone of `MarketDataEvent` /
899 /// `Signal`. Drop-oldest semantics — back-pressure is *not*
900 /// propagated to publishers.
901 /// - **Expected shutdown time:** ≤ `shutdown_timeout`. A
902 /// well-behaved service responds to its cancel token in
903 /// milliseconds; the timeout is the worst-case bound, not the
904 /// typical case.
905 /// - **Restart-after-crash latency:** bounded by
906 /// [`BackoffConfig`](rustrade_supervisor::BackoffConfig). Defaults:
907 /// 100 ms base, 60 s cap, 10 retries within a 10-minute window
908 /// before the circuit breaker trips.
909 pub async fn run_until_shutdown(self) -> anyhow::Result<()> {
910 tracing::info!(
911 bot = %self.config.name,
912 brains = self.brains.len(),
913 symbols = self.config.symbols.len(),
914 exchange = %self.exchange.name(),
915 "rustrade Bot starting"
916 );
917
918 // Best-effort position prefetch — failures don't block startup.
919 self.prefetch_positions().await;
920
921 // Restore persisted risk state (if a store is wired) before any
922 // service runs, then publish the persister so the per-trade paths
923 // can save through it.
924 let persister = self
925 .state_store
926 .clone()
927 .map(|store| RiskPersister::new(store, self.config.name.clone()));
928 if let Some(p) = &persister {
929 p.restore_into(&self.risk).await;
930 // OnceLock: set is infallible the first (only) time per bot.
931 let _ = self.persister_slot.set(p.clone());
932 }
933
934 // Order tracking: only active when wired AND the adapter can list +
935 // cancel orders. Gate once here so both the execution service (which
936 // records resting orders) and the reaper agree.
937 let order_tracking_active =
938 self.order_tracking.is_some() && self.exchange.supports(Capability::OrderTracking);
939 if self.order_tracking.is_some() && !order_tracking_active {
940 tracing::warn!(
941 exchange = %self.exchange.name(),
942 "order tracking requested but adapter lacks Capability::OrderTracking — \
943 resting orders will NOT be tracked or aged out"
944 );
945 }
946
947 // Bracket (SL+TP / OCO) orders need three things: place protective
948 // stops (StopOrders), cancel the sibling on fill (OrderTracking), and
949 // detect that fill (a fill source). When all hold, a shared OCO
950 // registry links each bracket's legs; otherwise a brain emitting both
951 // SL+TP falls back to a single attached stop-loss (see
952 // `ExecutionService::attach_protection`).
953 let brackets_active = self.exchange.supports(Capability::StopOrders)
954 && self.exchange.supports(Capability::OrderTracking)
955 && self.fill_source.is_some();
956 let oco = brackets_active.then(crate::order_tracker::OcoRegistry::new);
957 tracing::info!(brackets_active, "bracket (SL+TP/OCO) support");
958
959 // Resolve each symbol's effective sizing (per-symbol → per-class by
960 // asset class → default) so multi-asset bots size each class correctly.
961 let sizing = Arc::new(crate::execution::SymbolSizing::new(
962 self.config.risk.sizing.clone(),
963 self.config
964 .symbols
965 .iter()
966 .map(|s| {
967 let class = self.exchange.instrument_spec(s).asset_class;
968 (s.clone(), self.config.resolve_risk(s, class).sizing.clone())
969 })
970 .collect(),
971 ));
972 // Account-wide portfolio risk, shared between the execution gate
973 // (reads it) and the risk sweep (maintains its daily-loss latch).
974 let portfolio: PortfolioRiskState = build_portfolio_risk(self.config.portfolio.clone());
975
976 // Pending-entry reservations shared by every execution service and
977 // the fill router — makes the portfolio gate check-and-reserve.
978 let pending = crate::pending::PendingEntryLedger::new(PENDING_ENTRY_TTL);
979
980 let ctx = ExecutionContext {
981 exchange: self.exchange.clone(),
982 bus: self.market_bus.clone(),
983 signals: self.signal_bus.clone(),
984 positions: self.positions.clone(),
985 risk: self.risk.clone(),
986 portfolio: portfolio.clone(),
987 sizing,
988 order_tracker: order_tracking_active.then(|| self.order_tracker.clone()),
989 oco: oco.clone(),
990 bracket_failure_policy: self.config.bracket_failure_policy,
991 pending: pending.clone(),
992 };
993
994 for brain in self.brains.iter() {
995 let svc = ExecutionService::new(brain.clone(), ctx.clone());
996 self.supervisor.spawn_service(Box::new(svc));
997 }
998
999 // Periodic risk sweep: rolls per-symbol SessionPnl / CircuitBreaker and
1000 // the portfolio halt over at 00:00 UTC during a live run (without it,
1001 // tick() only fires on restart), and re-derives the account daily-loss
1002 // latch from the per-symbol session PnLs.
1003 self.supervisor
1004 .spawn_service(Box::new(RiskSweepService::new(
1005 self.risk.clone(),
1006 portfolio.clone(),
1007 RISK_SWEEP_CADENCE,
1008 )));
1009
1010 if order_tracking_active {
1011 // Safe: order_tracking_active implies order_tracking is Some.
1012 let spec = self.order_tracking.as_ref().unwrap();
1013 self.supervisor
1014 .spawn_service(Box::new(OrderReaperService::new(
1015 self.exchange.clone(),
1016 self.order_tracker.clone(),
1017 self.config.symbols.clone(),
1018 spec.ttl,
1019 spec.poll_cadence,
1020 self.metrics.clone(),
1021 )));
1022 }
1023
1024 if let Some(source) = self.market_source.clone() {
1025 self.supervisor
1026 .spawn_service(Box::new(MarketFeedService::new(source)));
1027 }
1028
1029 if let Some(source) = self.fill_source.clone() {
1030 self.supervisor
1031 .spawn_service(Box::new(FillRoutingService::new(
1032 source,
1033 self.brains.clone(),
1034 self.exchange.clone(),
1035 self.positions.clone(),
1036 self.risk.clone(),
1037 self.metrics.clone(),
1038 persister.clone(),
1039 oco.clone(),
1040 pending.clone(),
1041 )));
1042 }
1043
1044 for spec in &self.candle_pollers {
1045 self.supervisor
1046 .spawn_service(Box::new(CandlePollerService::new(
1047 spec.source.clone(),
1048 spec.symbol.clone(),
1049 spec.interval,
1050 spec.poll_cadence,
1051 spec.limit,
1052 self.market_bus.clone(),
1053 self.metrics.clone(),
1054 )));
1055 }
1056
1057 // External cancellation linker: when the host's token fires,
1058 // cancel the supervisor's root token. The reverse is not wired.
1059 if let Some(external) = self.external_cancel.clone() {
1060 let supervisor = self.supervisor.clone();
1061 tokio::spawn(async move {
1062 external.cancelled().await;
1063 tracing::info!("external cancellation received; triggering bot shutdown");
1064 supervisor.trigger_shutdown();
1065 });
1066 }
1067
1068 let run_result = self.supervisor.run_until_shutdown().await;
1069
1070 if self.config.close_positions_on_shutdown {
1071 self.close_open_positions().await;
1072 }
1073
1074 // Persist final risk state + flush before exit so the next boot
1075 // restores an up-to-date snapshot.
1076 if let Some(p) = &persister {
1077 p.persist_all(&self.risk).await;
1078 }
1079
1080 for brain in self.brains.iter() {
1081 let health = brain.health().await;
1082 tracing::info!(
1083 brain = %brain.name(),
1084 healthy = health.healthy,
1085 events = health.events_processed,
1086 non_hold = health.non_hold_decisions,
1087 "final brain health"
1088 );
1089 }
1090
1091 tracing::info!(bot = %self.config.name, "rustrade Bot exited");
1092 run_result
1093 }
1094
1095 async fn prefetch_positions(&self) {
1096 for symbol in &self.config.symbols {
1097 match self.exchange.get_position(symbol).await {
1098 Ok(pos) => {
1099 self.positions.write().await.insert(symbol.clone(), pos);
1100 tracing::debug!(
1101 symbol = %symbol,
1102 qty = pos.qty,
1103 "prefetched position from exchange"
1104 );
1105 }
1106 Err(e) => {
1107 tracing::warn!(
1108 symbol = %symbol,
1109 error = %e,
1110 "failed to prefetch position; cache defaults to FLAT"
1111 );
1112 }
1113 }
1114 }
1115 }
1116
1117 async fn close_open_positions(&self) {
1118 let snapshot: Vec<(Symbol, Position)> = {
1119 let map = self.positions.read().await;
1120 map.iter()
1121 .filter(|(_, p)| !p.is_flat())
1122 .map(|(s, p)| (s.clone(), *p))
1123 .collect()
1124 };
1125
1126 if snapshot.is_empty() {
1127 tracing::info!("close_positions_on_shutdown: no open positions");
1128 return;
1129 }
1130
1131 for (symbol, position) in snapshot {
1132 match self.exchange.close_position(&symbol, &position).await {
1133 Ok(order_id) => tracing::info!(
1134 symbol = %symbol,
1135 qty = position.qty,
1136 order_id = %order_id,
1137 "close_positions_on_shutdown: closed"
1138 ),
1139 Err(e) => tracing::error!(
1140 symbol = %symbol,
1141 qty = position.qty,
1142 error = %e,
1143 "close_positions_on_shutdown: failed (best-effort)"
1144 ),
1145 }
1146 }
1147 }
1148}
1149
1150// Tiny `pipe` helper local to this module for builder ergonomics — keeps
1151// the `Bot::new` body readable when conditionally applying builder methods.
1152trait Pipe: Sized {
1153 fn pipe<F: FnOnce(Self) -> Self>(self, f: F) -> Self {
1154 f(self)
1155 }
1156}
1157impl<T> Pipe for T {}
1158
1159#[cfg(test)]
1160mod tests {
1161 use super::*;
1162 use async_trait::async_trait;
1163 use rustrade_core::{Fill, MarketDataEvent, Order, Position};
1164
1165 #[test]
1166 fn presets_differ_in_leverage_by_class() {
1167 assert_eq!(RiskConfig::crypto_perp().sizing.leverage, 5);
1168 assert_eq!(RiskConfig::crypto_spot().sizing.leverage, 1);
1169 assert_eq!(RiskConfig::fx().sizing.leverage, 20);
1170 assert_eq!(RiskConfig::futures().sizing.leverage, 10);
1171 assert_eq!(RiskConfig::equity().sizing.leverage, 1);
1172 }
1173
1174 #[test]
1175 fn preset_for_maps_each_class_with_perp_fallback() {
1176 assert_eq!(
1177 RiskConfig::preset_for(AssetClass::CryptoSpot)
1178 .sizing
1179 .leverage,
1180 1
1181 );
1182 assert_eq!(RiskConfig::preset_for(AssetClass::Fx).sizing.leverage, 20);
1183 assert_eq!(
1184 RiskConfig::preset_for(AssetClass::Future).sizing.leverage,
1185 10
1186 );
1187 assert_eq!(
1188 RiskConfig::preset_for(AssetClass::Equity).sizing.leverage,
1189 1
1190 );
1191 // CryptoPerp and the catch-all (`Other` / future classes) → perp shape.
1192 assert_eq!(
1193 RiskConfig::preset_for(AssetClass::CryptoPerp)
1194 .sizing
1195 .leverage,
1196 5
1197 );
1198 assert_eq!(RiskConfig::preset_for(AssetClass::Other).sizing.leverage, 5);
1199 }
1200
1201 #[test]
1202 fn resolve_risk_precedence_symbol_then_class_then_default() {
1203 let cfg = BotConfig::builder()
1204 .name("t")
1205 .symbols(["XBTUSDTM", "EURUSD", "ETHUSDTM"])
1206 .without_signal_handler()
1207 .class_risk(AssetClass::Fx, RiskConfig::fx())
1208 .symbol_risk(
1209 "ETHUSDTM",
1210 RiskConfig {
1211 sizing: SizingConfig {
1212 margin_per_trade: 1.0,
1213 leverage: 7,
1214 max_contracts: 1,
1215 },
1216 ..Default::default()
1217 },
1218 )
1219 .build()
1220 .unwrap();
1221
1222 // Per-symbol override wins even when a class override would also match.
1223 assert_eq!(
1224 cfg.resolve_risk(&"ETHUSDTM".into(), AssetClass::Fx)
1225 .sizing
1226 .leverage,
1227 7
1228 );
1229 // No per-symbol override → the per-class override applies.
1230 assert_eq!(
1231 cfg.resolve_risk(&"EURUSD".into(), AssetClass::Fx)
1232 .sizing
1233 .leverage,
1234 20
1235 );
1236 // Neither → the bot-wide default.
1237 let default_lev = RiskConfig::default().sizing.leverage;
1238 assert_eq!(
1239 cfg.resolve_risk(&"XBTUSDTM".into(), AssetClass::CryptoPerp)
1240 .sizing
1241 .leverage,
1242 default_lev
1243 );
1244 }
1245
1246 struct NoopBrain;
1247 #[async_trait]
1248 impl Brain for NoopBrain {
1249 fn name(&self) -> &str {
1250 "noop"
1251 }
1252 async fn on_event(
1253 &self,
1254 _e: &MarketDataEvent,
1255 _p: &Position,
1256 ) -> Result<rustrade_core::Decision> {
1257 Ok(rustrade_core::Decision::hold())
1258 }
1259 }
1260
1261 /// Brain that declares ownership of a fixed symbol set, for the
1262 /// multi-brain arbitration guard tests.
1263 struct OwningBrain {
1264 name: &'static str,
1265 owns: Vec<Symbol>,
1266 }
1267 #[async_trait]
1268 impl Brain for OwningBrain {
1269 fn name(&self) -> &str {
1270 self.name
1271 }
1272 fn owned_symbols(&self) -> Option<Vec<Symbol>> {
1273 Some(self.owns.clone())
1274 }
1275 async fn on_event(
1276 &self,
1277 _e: &MarketDataEvent,
1278 _p: &Position,
1279 ) -> Result<rustrade_core::Decision> {
1280 Ok(rustrade_core::Decision::hold())
1281 }
1282 }
1283
1284 struct NoopExchange;
1285 #[async_trait]
1286 impl ExchangeClient for NoopExchange {
1287 fn name(&self) -> &str {
1288 "noop"
1289 }
1290 async fn place_order(&self, _o: &Order) -> Result<String> {
1291 Ok("noop-1".into())
1292 }
1293 async fn cancel_all(&self, _s: &Symbol) -> Result<usize> {
1294 Ok(0)
1295 }
1296 async fn close_position(&self, _s: &Symbol, _p: &Position) -> Result<String> {
1297 Ok("noop-close".into())
1298 }
1299 async fn get_position(&self, _s: &Symbol) -> Result<Position> {
1300 Ok(Position::FLAT)
1301 }
1302 async fn get_balance(&self, _c: &str) -> Result<f64> {
1303 Ok(0.0)
1304 }
1305 }
1306
1307 fn cfg() -> BotConfig {
1308 BotConfig::builder()
1309 .name("test")
1310 .symbol("BTCUSDT")
1311 .without_signal_handler()
1312 .build()
1313 .unwrap()
1314 }
1315
1316 #[test]
1317 fn builder_requires_name() {
1318 let err = BotConfig::builder().build().unwrap_err();
1319 assert!(matches!(err, Error::Config(_)), "got {err:?}");
1320 }
1321
1322 #[test]
1323 fn builder_rejects_blank_name() {
1324 let err = BotConfig::builder().name(" ").build().unwrap_err();
1325 assert!(matches!(err, Error::Config(_)), "got {err:?}");
1326 }
1327
1328 #[test]
1329 fn builder_rejects_zero_market_bus_capacity() {
1330 let err = BotConfig::builder()
1331 .name("x")
1332 .symbol("BTCUSDT")
1333 .market_bus_capacity(0)
1334 .build()
1335 .unwrap_err();
1336 assert!(matches!(err, Error::Config(_)));
1337 }
1338
1339 #[test]
1340 fn builder_rejects_zero_signal_bus_capacity() {
1341 let err = BotConfig::builder()
1342 .name("x")
1343 .symbol("BTCUSDT")
1344 .signal_bus_capacity(0)
1345 .build()
1346 .unwrap_err();
1347 assert!(matches!(err, Error::Config(_)));
1348 }
1349
1350 #[test]
1351 fn builder_rejects_empty_symbol_list() {
1352 let err = BotConfig::builder().name("x").build().unwrap_err();
1353 assert!(matches!(err, Error::Config(_)));
1354 }
1355
1356 #[test]
1357 fn builder_rejects_zero_shutdown_timeout() {
1358 let err = BotConfig::builder()
1359 .name("x")
1360 .symbol("BTCUSDT")
1361 .shutdown_timeout(Duration::ZERO)
1362 .build()
1363 .unwrap_err();
1364 assert!(matches!(err, Error::Config(_)));
1365 }
1366
1367 #[test]
1368 fn builder_rejects_nan_loss_limit() {
1369 let err = BotConfig::builder()
1370 .name("x")
1371 .symbol("BTCUSDT")
1372 .session_pnl_config(SessionPnlConfig {
1373 loss_limit: f64::NAN,
1374 })
1375 .build()
1376 .unwrap_err();
1377 assert!(matches!(err, Error::Config(_)));
1378 }
1379
1380 #[test]
1381 fn builder_rejects_non_finite_margin() {
1382 let err = BotConfig::builder()
1383 .name("x")
1384 .symbol("BTCUSDT")
1385 .sizing_config(SizingConfig {
1386 margin_per_trade: f64::INFINITY,
1387 leverage: 1,
1388 max_contracts: 1,
1389 })
1390 .build()
1391 .unwrap_err();
1392 assert!(matches!(err, Error::Config(_)));
1393 }
1394
1395 #[test]
1396 fn builder_accumulates_symbols() {
1397 let c = BotConfig::builder()
1398 .name("x")
1399 .symbol("A")
1400 .symbols(["B", "C"])
1401 .build()
1402 .unwrap();
1403 assert_eq!(c.symbols.len(), 3);
1404 assert_eq!(c.symbols[0], Symbol::new("A"));
1405 assert_eq!(c.symbols[2], Symbol::new("C"));
1406 }
1407
1408 #[test]
1409 fn builder_accepts_risk_overrides() {
1410 let c = BotConfig::builder()
1411 .name("x")
1412 .symbol("BTCUSDT")
1413 .session_pnl_config(SessionPnlConfig { loss_limit: -123.0 })
1414 .sizing_config(SizingConfig {
1415 margin_per_trade: 250.0,
1416 leverage: 10,
1417 max_contracts: 5,
1418 })
1419 .build()
1420 .unwrap();
1421 assert_eq!(c.risk.session_pnl.loss_limit, -123.0);
1422 assert_eq!(c.risk.sizing.leverage, 10);
1423 }
1424
1425 #[test]
1426 fn per_symbol_risk_override_and_fallback() {
1427 let c = BotConfig::builder()
1428 .name("x")
1429 .symbols(["BTCUSDT", "ETHUSDT"])
1430 .session_pnl_config(SessionPnlConfig { loss_limit: -100.0 }) // default
1431 .symbol_risk(
1432 "BTCUSDT",
1433 RiskConfig {
1434 session_pnl: SessionPnlConfig { loss_limit: -25.0 },
1435 ..Default::default()
1436 },
1437 )
1438 .build()
1439 .unwrap();
1440 // BTC uses its override; ETH (no override) uses the default.
1441 assert_eq!(
1442 c.risk_for(&Symbol::from("BTCUSDT")).session_pnl.loss_limit,
1443 -25.0
1444 );
1445 assert_eq!(
1446 c.risk_for(&Symbol::from("ETHUSDT")).session_pnl.loss_limit,
1447 -100.0
1448 );
1449 }
1450
1451 #[test]
1452 fn builder_rejects_invalid_per_symbol_override() {
1453 let err = BotConfig::builder()
1454 .name("x")
1455 .symbol("BTCUSDT")
1456 .symbol_risk(
1457 "BTCUSDT",
1458 RiskConfig {
1459 session_pnl: SessionPnlConfig {
1460 loss_limit: f64::NAN,
1461 },
1462 ..Default::default()
1463 },
1464 )
1465 .build()
1466 .unwrap_err();
1467 assert!(matches!(err, Error::Config(_)), "got {err:?}");
1468 }
1469
1470 #[test]
1471 fn builder_has_separate_default_bus_capacities() {
1472 let c = BotConfig::builder()
1473 .name("x")
1474 .symbol("BTCUSDT")
1475 .build()
1476 .unwrap();
1477 assert_eq!(c.market_bus_capacity, 1024);
1478 assert_eq!(c.signal_bus_capacity, 256);
1479 }
1480
1481 #[tokio::test]
1482 async fn bot_requires_at_least_one_brain() {
1483 match Bot::new(cfg(), Arc::new(NoopExchange), vec![]) {
1484 Err(Error::Config(_)) => {}
1485 other => panic!(
1486 "expected Error::Config for empty brain list, got {:?}",
1487 other.map(|_| "Ok(Bot)").map_err(|e| format!("Err({e})"))
1488 ),
1489 }
1490 }
1491
1492 #[tokio::test]
1493 async fn bot_constructs_and_exposes_handle() {
1494 let bot = Bot::new(cfg(), Arc::new(NoopExchange), vec![Arc::new(NoopBrain)]).unwrap();
1495 let handle = bot.handle();
1496 assert!(!handle.is_shutting_down());
1497 assert_eq!(bot.config().name, "test");
1498 let h2 = handle.clone();
1499 assert!(!h2.is_shutting_down());
1500 }
1501
1502 #[tokio::test]
1503 async fn rejects_overlapping_owned_symbols() {
1504 // Two brains both claim BTCUSDT → config error.
1505 let cfg = BotConfig::builder()
1506 .name("x")
1507 .symbols(["BTCUSDT", "ETHUSDT"])
1508 .without_signal_handler()
1509 .build()
1510 .unwrap();
1511 let a = Arc::new(OwningBrain {
1512 name: "a",
1513 owns: vec![Symbol::from("BTCUSDT")],
1514 });
1515 let b = Arc::new(OwningBrain {
1516 name: "b",
1517 owns: vec![Symbol::from("BTCUSDT"), Symbol::from("ETHUSDT")],
1518 });
1519 assert!(
1520 matches!(
1521 Bot::new(cfg, Arc::new(NoopExchange), vec![a, b]),
1522 Err(Error::Config(_))
1523 ),
1524 "overlapping owned_symbols must be rejected"
1525 );
1526 }
1527
1528 #[tokio::test]
1529 async fn accepts_disjoint_owned_symbols() {
1530 // Disjoint ownership is fine.
1531 let cfg = BotConfig::builder()
1532 .name("x")
1533 .symbols(["BTCUSDT", "ETHUSDT"])
1534 .without_signal_handler()
1535 .build()
1536 .unwrap();
1537 let a = Arc::new(OwningBrain {
1538 name: "a",
1539 owns: vec![Symbol::from("BTCUSDT")],
1540 });
1541 let b = Arc::new(OwningBrain {
1542 name: "b",
1543 owns: vec![Symbol::from("ETHUSDT")],
1544 });
1545 assert!(Bot::new(cfg, Arc::new(NoopExchange), vec![a, b]).is_ok());
1546 }
1547
1548 #[tokio::test]
1549 async fn none_owners_are_not_guarded() {
1550 // Two `None` (catch-all) brains coexist — they opt out of the guard.
1551 let bot = Bot::new(
1552 cfg(),
1553 Arc::new(NoopExchange),
1554 vec![Arc::new(NoopBrain), Arc::new(NoopBrain)],
1555 );
1556 assert!(bot.is_ok());
1557 }
1558
1559 #[allow(dead_code)]
1560 fn _noop_fill_compiles(_: &Fill) {}
1561}