rustrade/bot.rs
1//! The [`Bot`] entry point and its [`BotConfig`] builder.
2//!
3//! `Bot::new` validates configuration and constructs a supervised runtime.
4//! `Bot::run_until_shutdown` starts the framework services and blocks
5//! until shutdown is triggered (via signal or [`BotHandle::shutdown`]).
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use rustrade_core::{
12 AssetClass, Brain, CandleSource, Capability, Error, ExchangeClient, FillSource, MarketDataBus,
13 MarketSource, MetricsSink, NoopSink, Position, Result, SignalBus, StateStore, Symbol,
14};
15use rustrade_risk::{CircuitBreakerConfig, PortfolioRiskConfig, SessionPnlConfig, SizingConfig};
16use rustrade_supervisor::{Supervisor, SupervisorConfig};
17use tokio_util::sync::CancellationToken;
18
19use crate::execution::{ExecutionContext, ExecutionService};
20use crate::handle::BotHandle;
21use crate::order_tracker::{OrderReaperService, OrderTracker};
22use crate::risk_state::{
23 PortfolioRiskState, PositionCache, RiskPersister, RiskStateMap, build_portfolio_risk,
24 build_position_cache, build_risk_state,
25};
26use crate::risk_sweep::RiskSweepService;
27use crate::services::{CandlePollerService, FillRoutingService, MarketFeedService};
28
29const DEFAULT_MARKET_BUS_CAPACITY: usize = 1024;
30const DEFAULT_SIGNAL_BUS_CAPACITY: usize = 256;
31/// How often the risk sweep runs: detects the 00:00 UTC rollover for the
32/// per-symbol and account risk state, and refreshes the portfolio daily-loss
33/// latch. Coarse on purpose — these are day-scale controls.
34const RISK_SWEEP_CADENCE: Duration = Duration::from_secs(15);
35
36/// Risk-layer config for a symbol: session-PnL cap, circuit breaker, and
37/// position sizing. Used both as the bot-wide default
38/// ([`BotConfig::risk`]) and as a per-symbol override
39/// ([`BotConfig::per_symbol_risk`], set via
40/// [`BotConfigBuilder::symbol_risk`]).
41#[derive(Debug, Clone, Default)]
42pub struct RiskConfig {
43 /// Session PnL config applied to every configured symbol.
44 pub session_pnl: SessionPnlConfig,
45 /// Circuit-breaker config applied to every configured symbol.
46 pub circuit_breaker: CircuitBreakerConfig,
47 /// Position-sizing config used by the execution service.
48 pub sizing: SizingConfig,
49}
50
51impl RiskConfig {
52 /// Per-[`AssetClass`] starting presets. These are **sensible defaults to
53 /// tune**, not prescriptions — they mainly differ in leverage (the most
54 /// class-defining knob) and per-trade size; tighten or loosen for your
55 /// venue and risk appetite. Apply one per class via
56 /// [`BotConfigBuilder::class_risk`], or per symbol via
57 /// [`BotConfigBuilder::symbol_risk`].
58 ///
59 /// Crypto perpetuals: moderate 5× leverage (the framework default shape).
60 #[must_use]
61 pub fn crypto_perp() -> Self {
62 Self {
63 session_pnl: SessionPnlConfig { loss_limit: -50.0 },
64 circuit_breaker: CircuitBreakerConfig::default(),
65 sizing: SizingConfig {
66 margin_per_trade: 500.0,
67 leverage: 5,
68 max_contracts: 50,
69 },
70 }
71 }
72
73 /// Crypto spot: **no leverage** (1×) — one unit traded is one base unit.
74 #[must_use]
75 pub fn crypto_spot() -> Self {
76 Self {
77 session_pnl: SessionPnlConfig { loss_limit: -50.0 },
78 circuit_breaker: CircuitBreakerConfig::default(),
79 sizing: SizingConfig {
80 margin_per_trade: 500.0,
81 leverage: 1,
82 max_contracts: 50,
83 },
84 }
85 }
86
87 /// FX: higher leverage is conventional; a tighter sliding breaker.
88 #[must_use]
89 pub fn fx() -> Self {
90 Self {
91 session_pnl: SessionPnlConfig { loss_limit: -100.0 },
92 circuit_breaker: CircuitBreakerConfig {
93 loss_limit: 3,
94 window_secs: 7_200,
95 cooldown_secs: 3_600,
96 },
97 sizing: SizingConfig {
98 margin_per_trade: 1_000.0,
99 leverage: 20,
100 max_contracts: 50,
101 },
102 }
103 }
104
105 /// Dated futures: bigger contracts, moderate leverage, a wider daily cap.
106 #[must_use]
107 pub fn futures() -> Self {
108 Self {
109 session_pnl: SessionPnlConfig { loss_limit: -200.0 },
110 circuit_breaker: CircuitBreakerConfig::default(),
111 sizing: SizingConfig {
112 margin_per_trade: 1_000.0,
113 leverage: 10,
114 max_contracts: 20,
115 },
116 }
117 }
118
119 /// Cash equities: **no leverage** (1×), conservative size.
120 #[must_use]
121 pub fn equity() -> Self {
122 Self {
123 session_pnl: SessionPnlConfig { loss_limit: -200.0 },
124 circuit_breaker: CircuitBreakerConfig::default(),
125 sizing: SizingConfig {
126 margin_per_trade: 1_000.0,
127 leverage: 1,
128 max_contracts: 20,
129 },
130 }
131 }
132
133 /// The starting preset for an [`AssetClass`]: `CryptoSpot` →
134 /// [`Self::crypto_spot`], `Fx` → [`Self::fx`], `Future` → [`Self::futures`],
135 /// `Equity` → [`Self::equity`]; `CryptoPerp` and any other class fall back
136 /// to [`Self::crypto_perp`].
137 #[must_use]
138 pub fn preset_for(class: AssetClass) -> Self {
139 match class {
140 AssetClass::CryptoSpot => Self::crypto_spot(),
141 AssetClass::Fx => Self::fx(),
142 AssetClass::Future => Self::futures(),
143 AssetClass::Equity => Self::equity(),
144 AssetClass::CryptoPerp => Self::crypto_perp(),
145 // `AssetClass` is #[non_exhaustive]: `Other` and any future class
146 // fall back to the crypto-perp shape.
147 _ => Self::crypto_perp(),
148 }
149 }
150}
151
152/// Configuration for a [`Bot`].
153///
154/// Construct via [`BotConfig::builder`]. The builder validates every
155/// field on [`BotConfigBuilder::build`] and returns `Error::Config` on
156/// any violation — the framework never panics on bad config. `Bot::new`
157/// does a final brain-count check on top.
158///
159/// # Example
160///
161/// ```
162/// use std::time::Duration;
163/// use rustrade::BotConfig;
164///
165/// let config = BotConfig::builder()
166/// .name("market-maker")
167/// .symbols(["BTCUSDT", "ETHUSDT"])
168/// .shutdown_timeout(Duration::from_secs(5))
169/// .without_signal_handler() // tests + embedded use
170/// .build()
171/// .unwrap();
172///
173/// assert_eq!(config.name, "market-maker");
174/// assert_eq!(config.symbols.len(), 2);
175/// ```
176#[derive(Debug, Clone)]
177pub struct BotConfig {
178 /// Human-readable name used in logs, tracing spans, and supervisor
179 /// service identification.
180 pub name: String,
181 /// Symbols this bot trades. Every symbol gets a pre-seeded entry in
182 /// the risk-state map and the position cache. Must be non-empty —
183 /// the position cache and risk-state map would otherwise be empty,
184 /// which is a silent footgun.
185 pub symbols: Vec<Symbol>,
186 /// Maximum time to wait for services to drain on shutdown. Must be
187 /// `> 0`; the supervisor's drain logic needs a non-zero deadline.
188 pub shutdown_timeout: Duration,
189 /// Whether the supervisor installs its own Ctrl-C / SIGTERM handler.
190 /// Disable when the host service drives shutdown via [`BotHandle::shutdown`].
191 pub install_signal_handler: bool,
192 /// Capacity of the in-process market-data broadcast bus. Backed by
193 /// `tokio::sync::broadcast`, which has **drop-oldest** semantics: a
194 /// slow subscriber that falls behind by more than `capacity` events
195 /// sees `RecvError::Lagged(n)` and the oldest dropped events are
196 /// gone. Size this to absorb the worst-case latency between
197 /// publish and slowest subscriber's `recv`.
198 pub market_bus_capacity: usize,
199 /// Capacity of the in-process signal broadcast bus. Same drop-oldest
200 /// semantics as `market_bus_capacity`. Typically smaller — signals
201 /// are emitted ~once per non-Hold decision, far less frequent than
202 /// market events.
203 pub signal_bus_capacity: usize,
204 /// On shutdown, attempt to close any open position for each symbol
205 /// before exit, using `ExchangeClient::close_position`. Best-effort:
206 /// failures are logged but do not propagate.
207 pub close_positions_on_shutdown: bool,
208 /// Risk-layer defaults applied to every configured symbol that has no
209 /// entry in [`Self::per_symbol_risk`].
210 pub risk: RiskConfig,
211 /// Per-symbol risk overrides. A symbol present here uses its own
212 /// [`RiskConfig`] (session-PnL cap, circuit breaker, and sizing) instead
213 /// of [`Self::risk`] — e.g. a tighter drawdown cap on a volatile alt, or
214 /// a larger size on a flagship symbol. Symbols absent here use the
215 /// default. Resolve with [`Self::resolve_risk`].
216 pub per_symbol_risk: HashMap<Symbol, RiskConfig>,
217 /// Per-[`AssetClass`] risk overrides. A symbol whose
218 /// [`InstrumentSpec`](rustrade_core::InstrumentSpec) reports a class present
219 /// here uses that class's [`RiskConfig`] — unless a per-symbol override
220 /// also exists, which wins. Lets one bot apply crypto-perp / spot / FX /
221 /// futures rules side by side. See [`RiskConfig::preset_for`] for starting
222 /// presets and [`Self::resolve_risk`] for the precedence.
223 pub per_class_risk: HashMap<AssetClass, RiskConfig>,
224 /// Account-wide risk applied across **all** symbols: a daily-loss halt,
225 /// a max-concurrent-positions cap, and a gross-exposure cap. Complements
226 /// the per-symbol [`RiskConfig`] gates. Defaults to all-off (opt-in), so
227 /// a bot that doesn't set it behaves exactly as before.
228 pub portfolio: PortfolioRiskConfig,
229}
230
231impl BotConfig {
232 /// Begin building a config with sensible defaults.
233 pub fn builder() -> BotConfigBuilder {
234 BotConfigBuilder::default()
235 }
236
237 /// The effective [`RiskConfig`] for `symbol`, ignoring asset class: its
238 /// per-symbol override if set, else the bot-wide default ([`Self::risk`]).
239 /// Prefer [`Self::resolve_risk`], which also honours per-class overrides.
240 pub fn risk_for(&self, symbol: &Symbol) -> &RiskConfig {
241 self.per_symbol_risk.get(symbol).unwrap_or(&self.risk)
242 }
243
244 /// The effective [`RiskConfig`] for `symbol` of the given `asset_class`,
245 /// applying the precedence **per-symbol → per-class → default**. The
246 /// framework resolves `asset_class` from
247 /// [`ExchangeClient::instrument_spec`] at startup, so a multi-asset bot
248 /// gets the right rules per symbol without listing every one.
249 pub fn resolve_risk(&self, symbol: &Symbol, asset_class: AssetClass) -> &RiskConfig {
250 self.per_symbol_risk
251 .get(symbol)
252 .or_else(|| self.per_class_risk.get(&asset_class))
253 .unwrap_or(&self.risk)
254 }
255}
256
257/// Builder for [`BotConfig`].
258#[derive(Debug, Clone, Default)]
259pub struct BotConfigBuilder {
260 name: Option<String>,
261 symbols: Vec<Symbol>,
262 shutdown_timeout: Option<Duration>,
263 install_signal_handler: Option<bool>,
264 market_bus_capacity: Option<usize>,
265 signal_bus_capacity: Option<usize>,
266 close_positions_on_shutdown: Option<bool>,
267 risk: RiskConfig,
268 per_symbol_risk: HashMap<Symbol, RiskConfig>,
269 per_class_risk: HashMap<AssetClass, RiskConfig>,
270 portfolio: PortfolioRiskConfig,
271}
272
273impl BotConfigBuilder {
274 /// Human-readable bot name (logs, supervisor identification). Required.
275 pub fn name(mut self, name: impl Into<String>) -> Self {
276 self.name = Some(name.into());
277 self
278 }
279
280 /// Add a single symbol. Repeated calls accumulate.
281 pub fn symbol(mut self, sym: impl Into<Symbol>) -> Self {
282 self.symbols.push(sym.into());
283 self
284 }
285
286 /// Add many symbols at once. Repeated calls accumulate.
287 pub fn symbols<I, S>(mut self, syms: I) -> Self
288 where
289 I: IntoIterator<Item = S>,
290 S: Into<Symbol>,
291 {
292 self.symbols.extend(syms.into_iter().map(Into::into));
293 self
294 }
295
296 /// Maximum time to wait for services to drain on shutdown.
297 pub fn shutdown_timeout(mut self, dur: Duration) -> Self {
298 self.shutdown_timeout = Some(dur);
299 self
300 }
301
302 /// Disable the supervisor's signal handler — host drives shutdown.
303 pub fn without_signal_handler(mut self) -> Self {
304 self.install_signal_handler = Some(false);
305 self
306 }
307
308 /// Override the in-process market-data bus capacity (default 1024).
309 pub fn market_bus_capacity(mut self, cap: usize) -> Self {
310 self.market_bus_capacity = Some(cap);
311 self
312 }
313
314 /// Override the in-process signal-bus capacity (default 256).
315 pub fn signal_bus_capacity(mut self, cap: usize) -> Self {
316 self.signal_bus_capacity = Some(cap);
317 self
318 }
319
320 /// Enable best-effort `exchange.close_position` for non-flat positions
321 /// after the supervisor drains.
322 pub fn close_positions_on_shutdown(mut self, b: bool) -> Self {
323 self.close_positions_on_shutdown = Some(b);
324 self
325 }
326
327 /// Override the session-PnL config used for every symbol.
328 pub fn session_pnl_config(mut self, cfg: SessionPnlConfig) -> Self {
329 self.risk.session_pnl = cfg;
330 self
331 }
332
333 /// Override the circuit-breaker config used for every symbol.
334 pub fn circuit_breaker_config(mut self, cfg: CircuitBreakerConfig) -> Self {
335 self.risk.circuit_breaker = cfg;
336 self
337 }
338
339 /// Override the position-sizing config used for every symbol.
340 pub fn sizing_config(mut self, cfg: SizingConfig) -> Self {
341 self.risk.sizing = cfg;
342 self
343 }
344
345 /// Set the account-wide [`PortfolioRiskConfig`] — a daily-loss halt, a
346 /// max-concurrent-positions cap, and a gross-exposure cap applied across
347 /// all symbols. Unset, every limit is off (the per-symbol gates still apply).
348 ///
349 /// ```
350 /// use rustrade::{BotConfig, PortfolioRiskConfig};
351 ///
352 /// let cfg = BotConfig::builder()
353 /// .name("bot")
354 /// .symbols(["BTCUSDT", "ETHUSDT"])
355 /// .without_signal_handler()
356 /// .portfolio_config(PortfolioRiskConfig {
357 /// max_daily_loss: -500.0,
358 /// max_concurrent_positions: 3,
359 /// max_gross_exposure: 50_000.0,
360 /// })
361 /// .build()
362 /// .unwrap();
363 /// assert_eq!(cfg.portfolio.max_concurrent_positions, 3);
364 /// ```
365 pub fn portfolio_config(mut self, cfg: PortfolioRiskConfig) -> Self {
366 self.portfolio = cfg;
367 self
368 }
369
370 /// Set a per-symbol [`RiskConfig`] override. The given symbol then uses
371 /// this config (session-PnL cap, circuit breaker, sizing) instead of the
372 /// bot-wide default. Repeated calls for the same symbol replace the
373 /// previous override. Symbols without an override use the default.
374 ///
375 /// ```
376 /// use rustrade::{BotConfig, RiskConfig};
377 /// use rustrade::SessionPnlConfig;
378 ///
379 /// let cfg = BotConfig::builder()
380 /// .name("bot")
381 /// .symbols(["BTCUSDT", "DOGEUSDT"])
382 /// .without_signal_handler()
383 /// // tighter daily loss cap on the volatile alt
384 /// .symbol_risk("DOGEUSDT", RiskConfig {
385 /// session_pnl: SessionPnlConfig { loss_limit: -20.0 },
386 /// ..Default::default()
387 /// })
388 /// .build()
389 /// .unwrap();
390 /// assert_eq!(cfg.risk_for(&"DOGEUSDT".into()).session_pnl.loss_limit, -20.0);
391 /// ```
392 pub fn symbol_risk(mut self, symbol: impl Into<Symbol>, cfg: RiskConfig) -> Self {
393 self.per_symbol_risk.insert(symbol.into(), cfg);
394 self
395 }
396
397 /// Set a per-[`AssetClass`] [`RiskConfig`] override. Symbols whose
398 /// instrument reports this class use `cfg` (unless a per-symbol override
399 /// also exists, which wins). See [`RiskConfig::preset_for`] for ready-made
400 /// presets to start from.
401 ///
402 /// ```
403 /// use rustrade::{AssetClass, BotConfig, RiskConfig};
404 ///
405 /// let cfg = BotConfig::builder()
406 /// .name("multi-asset")
407 /// .symbols(["XBTUSDTM", "EURUSD"])
408 /// .without_signal_handler()
409 /// .class_risk(AssetClass::CryptoPerp, RiskConfig::crypto_perp())
410 /// .class_risk(AssetClass::Fx, RiskConfig::fx())
411 /// .build()
412 /// .unwrap();
413 /// // FX symbols resolve to the FX preset (20× leverage) absent a
414 /// // per-symbol override.
415 /// assert_eq!(
416 /// cfg.resolve_risk(&"EURUSD".into(), AssetClass::Fx).sizing.leverage,
417 /// 20
418 /// );
419 /// ```
420 pub fn class_risk(mut self, class: AssetClass, cfg: RiskConfig) -> Self {
421 self.per_class_risk.insert(class, cfg);
422 self
423 }
424
425 /// Validate and build. Returns `Error::Config` on any constraint
426 /// violation — the framework never panics on bad config.
427 pub fn build(self) -> Result<BotConfig> {
428 let name = self
429 .name
430 .filter(|n| !n.trim().is_empty())
431 .ok_or_else(|| Error::config("BotConfig.name is required and must not be empty"))?;
432
433 if self.symbols.is_empty() {
434 return Err(Error::config(
435 "BotConfig.symbols must contain at least one Symbol — \
436 the position cache and risk-state map are pre-seeded per symbol",
437 ));
438 }
439
440 let market_bus_capacity = self
441 .market_bus_capacity
442 .unwrap_or(DEFAULT_MARKET_BUS_CAPACITY);
443 if market_bus_capacity == 0 {
444 return Err(Error::config(
445 "BotConfig.market_bus_capacity must be > 0 (broadcast channel cannot have 0 slots)",
446 ));
447 }
448
449 let signal_bus_capacity = self
450 .signal_bus_capacity
451 .unwrap_or(DEFAULT_SIGNAL_BUS_CAPACITY);
452 if signal_bus_capacity == 0 {
453 return Err(Error::config(
454 "BotConfig.signal_bus_capacity must be > 0 (broadcast channel cannot have 0 slots)",
455 ));
456 }
457
458 let shutdown_timeout = self.shutdown_timeout.unwrap_or(Duration::from_secs(30));
459 if shutdown_timeout.is_zero() {
460 return Err(Error::config(
461 "BotConfig.shutdown_timeout must be > 0 — drain needs a non-zero deadline",
462 ));
463 }
464
465 // Validate the default risk config and every per-symbol / per-class
466 // override with the same rules — bad config never reaches the runtime.
467 validate_risk(&self.risk, "BotConfig.risk")?;
468 for (sym, cfg) in &self.per_symbol_risk {
469 validate_risk(cfg, &format!("BotConfig.per_symbol_risk[{sym}]"))?;
470 }
471 for (class, cfg) in &self.per_class_risk {
472 validate_risk(cfg, &format!("BotConfig.per_class_risk[{class:?}]"))?;
473 }
474 validate_portfolio(&self.portfolio)?;
475
476 Ok(BotConfig {
477 name,
478 symbols: self.symbols,
479 shutdown_timeout,
480 install_signal_handler: self.install_signal_handler.unwrap_or(true),
481 market_bus_capacity,
482 signal_bus_capacity,
483 close_positions_on_shutdown: self.close_positions_on_shutdown.unwrap_or(false),
484 risk: self.risk,
485 per_symbol_risk: self.per_symbol_risk,
486 per_class_risk: self.per_class_risk,
487 portfolio: self.portfolio,
488 })
489 }
490}
491
492/// Validate a [`RiskConfig`] (the default or a per-symbol override).
493/// `ctx` names the offending field in the error.
494fn validate_risk(risk: &RiskConfig, ctx: &str) -> Result<()> {
495 if risk.session_pnl.loss_limit.is_nan() {
496 return Err(Error::config(format!(
497 "{ctx}.session_pnl.loss_limit must not be NaN"
498 )));
499 }
500 if !risk.sizing.margin_per_trade.is_finite() || risk.sizing.margin_per_trade < 0.0 {
501 return Err(Error::config(format!(
502 "{ctx}.sizing.margin_per_trade must be a finite non-negative number"
503 )));
504 }
505 Ok(())
506}
507
508/// Validate the account-wide [`PortfolioRiskConfig`]. `NaN` limits would make
509/// every comparison silently false (a disabled limit that looks enabled), so
510/// they're rejected; `±∞` is allowed as the explicit "disabled" sentinel.
511fn validate_portfolio(p: &PortfolioRiskConfig) -> Result<()> {
512 if p.max_daily_loss.is_nan() {
513 return Err(Error::config(
514 "BotConfig.portfolio.max_daily_loss must not be NaN (use f64::NEG_INFINITY to disable)",
515 ));
516 }
517 if p.max_gross_exposure.is_nan() {
518 return Err(Error::config(
519 "BotConfig.portfolio.max_gross_exposure must not be NaN (use f64::INFINITY to disable)",
520 ));
521 }
522 Ok(())
523}
524
525/// The embedded trading bot.
526///
527/// Owns a [`Supervisor`], an [`ExchangeClient`], one or more [`Brain`]s,
528/// and the in-process [`MarketDataBus`]. Created via [`Bot::new`]; run
529/// via [`Bot::run_until_shutdown`]; observed and steered via the
530/// [`BotHandle`] returned from [`Bot::handle`].
531///
532/// # Example
533///
534/// ```no_run
535/// use std::sync::Arc;
536/// use rustrade::{Bot, BotConfig};
537/// # struct MyBrain;
538/// # #[async_trait::async_trait]
539/// # impl rustrade_core::Brain for MyBrain {
540/// # fn name(&self) -> &str { "x" }
541/// # async fn on_event(&self, _: &rustrade_core::MarketDataEvent, _: &rustrade_core::Position)
542/// # -> rustrade_core::Result<rustrade_core::Decision> {
543/// # Ok(rustrade_core::Decision::hold())
544/// # }
545/// # async fn health(&self) -> rustrade_core::BrainHealth { rustrade_core::BrainHealth::ok() }
546/// # }
547/// # async fn run(exchange: Arc<dyn rustrade_core::ExchangeClient>) -> anyhow::Result<()> {
548/// let config = BotConfig::builder()
549/// .name("my-bot")
550/// .symbol("BTCUSDT")
551/// .build()?;
552///
553/// let bot = Bot::new(config, exchange, vec![Arc::new(MyBrain) as Arc<dyn rustrade_core::Brain>])?;
554/// let handle = bot.handle();
555///
556/// // Spawn the bot; ask it to shut down from elsewhere.
557/// let task = tokio::spawn(async move { bot.run_until_shutdown().await });
558/// handle.shutdown();
559/// task.await??;
560/// # Ok(())
561/// # }
562/// ```
563pub struct Bot {
564 config: BotConfig,
565 supervisor: Arc<Supervisor>,
566 exchange: Arc<dyn ExchangeClient>,
567 brains: Arc<Vec<Arc<dyn Brain>>>,
568 market_bus: MarketDataBus,
569 signal_bus: SignalBus,
570 positions: PositionCache,
571 risk: RiskStateMap,
572 metrics: Arc<dyn MetricsSink>,
573 state_store: Option<Arc<dyn StateStore>>,
574 persister_slot: crate::handle::PersisterSlot,
575 handle: BotHandle,
576 external_cancel: Option<CancellationToken>,
577 market_source: Option<Arc<dyn MarketSource>>,
578 fill_source: Option<Arc<dyn FillSource>>,
579 candle_pollers: Vec<CandlePollerSpec>,
580 order_tracker: OrderTracker,
581 order_tracking: Option<OrderTrackingSpec>,
582}
583
584/// Settings captured by `Bot::with_order_tracking`.
585struct OrderTrackingSpec {
586 ttl: Duration,
587 poll_cadence: Duration,
588}
589
590/// One registered `(symbol, interval, cadence)` for `Bot::with_candle_poller`.
591struct CandlePollerSpec {
592 source: Arc<dyn CandleSource>,
593 symbol: Symbol,
594 interval: Duration,
595 poll_cadence: Duration,
596 limit: usize,
597}
598
599impl Bot {
600 /// Construct a `Bot`. Validates that at least one brain is provided.
601 ///
602 /// The exchange client and brain set are immutable for the bot's
603 /// lifetime — to change them, build a new `Bot`.
604 pub fn new(
605 config: BotConfig,
606 exchange: Arc<dyn ExchangeClient>,
607 brains: Vec<Arc<dyn Brain>>,
608 ) -> Result<Self> {
609 if brains.is_empty() {
610 return Err(Error::config(
611 "Bot::new requires at least one Brain — empty brain list",
612 ));
613 }
614
615 // Multi-brain arbitration guard: no two brains may claim ownership of
616 // the same symbol via `Brain::owned_symbols` — that would let two
617 // strategies fight over one position. Brains returning `None` opt out
618 // and self-coordinate (current behaviour).
619 let mut claimed: HashMap<Symbol, String> = HashMap::new();
620 for brain in &brains {
621 let Some(syms) = brain.owned_symbols() else {
622 continue;
623 };
624 for sym in syms.into_iter().collect::<std::collections::HashSet<_>>() {
625 if let Some(other) = claimed.insert(sym.clone(), brain.name().to_string()) {
626 return Err(Error::config(format!(
627 "brains '{other}' and '{}' both declare ownership of {sym} — \
628 overlapping owned_symbols would fight over one position",
629 brain.name()
630 )));
631 }
632 }
633 }
634
635 let supervisor = Arc::new(Supervisor::new(
636 SupervisorConfig::default()
637 .with_shutdown_timeout(config.shutdown_timeout)
638 .with_default_backoff(Default::default())
639 .pipe(|c| {
640 if config.install_signal_handler {
641 c
642 } else {
643 c.without_signal_handler()
644 }
645 }),
646 ));
647
648 let market_bus = MarketDataBus::with_capacity(config.market_bus_capacity);
649 let signal_bus = SignalBus::with_capacity(config.signal_bus_capacity);
650 let positions = build_position_cache(&config.symbols);
651 let risk = build_risk_state(&config.symbols, |sym| {
652 // Resolve per-symbol → per-class (by the adapter's asset class) →
653 // default, so a multi-asset bot gets the right gates per symbol.
654 let class = exchange.instrument_spec(sym).asset_class;
655 let r = config.resolve_risk(sym, class);
656 (r.session_pnl.clone(), r.circuit_breaker.clone())
657 });
658
659 let brains = Arc::new(brains);
660 let persister_slot: crate::handle::PersisterSlot = Arc::new(std::sync::OnceLock::new());
661 let order_tracker = OrderTracker::new();
662 let handle = BotHandle::new(
663 supervisor.clone(),
664 brains.clone(),
665 risk.clone(),
666 positions.clone(),
667 signal_bus.clone(),
668 persister_slot.clone(),
669 order_tracker.clone(),
670 );
671
672 Ok(Self {
673 config,
674 supervisor,
675 exchange,
676 brains,
677 market_bus,
678 signal_bus,
679 positions,
680 risk,
681 metrics: Arc::new(NoopSink),
682 state_store: None,
683 persister_slot,
684 order_tracker,
685 handle,
686 external_cancel: None,
687 market_source: None,
688 fill_source: None,
689 candle_pollers: Vec::new(),
690 order_tracking: None,
691 })
692 }
693
694 /// Install a [`MetricsSink`]. The framework's services emit
695 /// counters and histograms to this sink on every observable event;
696 /// the default is [`NoopSink`], which discards everything.
697 pub fn with_metrics(mut self, sink: Arc<dyn MetricsSink>) -> Self {
698 self.metrics = sink;
699 self
700 }
701
702 /// Install a [`StateStore`] so per-symbol risk state (session PnL and
703 /// circuit breaker) survives restarts.
704 ///
705 /// Without a store, risk state is in-memory only: a crash mid-session
706 /// resets the daily drawdown cap and the loss-streak breaker. With one
707 /// wired, the bot:
708 ///
709 /// - **Restores** each symbol's snapshot on startup, then applies the
710 /// stale-snapshot policy — a session from an earlier UTC day rolls
711 /// over to fresh, and a breaker whose cooldown elapsed during
712 /// downtime auto-resets.
713 /// - **Persists** after every realised trade (whether fed via
714 /// [`BotHandle::record_trade_outcome`](crate::BotHandle::record_trade_outcome)
715 /// or auto-routed by the `FillRoutingService`).
716 /// - **Flushes** on graceful shutdown.
717 ///
718 /// Use [`rustrade_core::InMemoryStore`] for a non-durable default, or a
719 /// disk-/database-backed implementation from a downstream crate for
720 /// real durability. Snapshots are keyed by `(bot name, symbol)`, so
721 /// distinct bots can share one backend without collision.
722 pub fn with_state_store(mut self, store: Arc<dyn StateStore>) -> Self {
723 self.state_store = Some(store);
724 self
725 }
726
727 /// Enable resting-order lifecycle tracking.
728 ///
729 /// The [`ExecutionService`] records every resting order it places (limit
730 /// / post-only / IOC / FOK — market orders never rest, so they're
731 /// skipped), and a supervised [`OrderReaperService`] periodically:
732 ///
733 /// - **reconciles** the tracker against `exchange.get_open_orders` (so
734 /// orders filled or cancelled out-of-band stop being tracked), and
735 /// - **cancels** any resting order older than `ttl` via
736 /// `exchange.cancel_order`.
737 ///
738 /// `poll_cadence` is how often a sweep runs. The reaper is only spawned
739 /// if the adapter advertises [`Capability::OrderTracking`]; otherwise the
740 /// call is a no-op (with a warning at startup), since the framework can't
741 /// list or cancel orders without it. Live tracked orders are visible via
742 /// [`BotHandle::tracked_orders`](crate::BotHandle::tracked_orders).
743 pub fn with_order_tracking(mut self, ttl: Duration, poll_cadence: Duration) -> Self {
744 self.order_tracking = Some(OrderTrackingSpec { ttl, poll_cadence });
745 self
746 }
747
748 /// Register a [`CandleSource`] to be polled every `poll_cadence` for
749 /// `(symbol, interval)`. Polled candles are deduplicated by
750 /// timestamp and published to the bot's [`MarketDataBus`]. Repeated
751 /// calls accumulate — one supervised service per registered tuple.
752 pub fn with_candle_poller(
753 mut self,
754 source: Arc<dyn CandleSource>,
755 symbol: impl Into<Symbol>,
756 interval: Duration,
757 poll_cadence: Duration,
758 limit: usize,
759 ) -> Self {
760 self.candle_pollers.push(CandlePollerSpec {
761 source,
762 symbol: symbol.into(),
763 interval,
764 poll_cadence,
765 limit,
766 });
767 self
768 }
769
770 /// Tie this bot's shutdown to an externally-owned cancellation token.
771 ///
772 /// When the external token is cancelled, the bot's supervisor token
773 /// is cancelled too — equivalent to calling [`BotHandle::shutdown`]
774 /// but without spawning a linker task in the host.
775 ///
776 /// The reverse is not true: cancelling the bot does not cancel the
777 /// external token.
778 pub fn with_external_cancel(mut self, token: CancellationToken) -> Self {
779 self.external_cancel = Some(token);
780 self
781 }
782
783 /// Attach a [`MarketSource`] to be driven by a supervised
784 /// [`MarketFeedService`]. Source implementors are responsible for
785 /// publishing to the bot's [`MarketDataBus`] (obtain via
786 /// `bot.market_data_bus().clone()` before constructing the source).
787 pub fn with_market_source(mut self, source: Arc<dyn MarketSource>) -> Self {
788 self.market_source = Some(source);
789 self
790 }
791
792 /// Attach a [`FillSource`] to be driven by a supervised
793 /// [`FillRoutingService`]. Fills are routed to every brain via
794 /// `Brain::on_fill` and the position cache is refreshed from the
795 /// exchange after each one.
796 pub fn with_fill_source(mut self, source: Arc<dyn FillSource>) -> Self {
797 self.fill_source = Some(source);
798 self
799 }
800
801 /// Cheap cloneable handle for host services. Can be obtained at any
802 /// point — call before [`Self::run_until_shutdown`] so the host can
803 /// drive shutdown while the bot is running.
804 pub fn handle(&self) -> BotHandle {
805 self.handle.clone()
806 }
807
808 /// Reference to the bot's configuration.
809 pub fn config(&self) -> &BotConfig {
810 &self.config
811 }
812
813 /// Borrow the in-process market-data bus. Host services and adapters
814 /// publish here; the bot's framework services subscribe.
815 pub fn market_data_bus(&self) -> &MarketDataBus {
816 &self.market_bus
817 }
818
819 /// Borrow the in-process signal bus. The execution service publishes
820 /// a [`Signal`](rustrade_core::Signal) to this bus on every
821 /// non-`Hold` decision the brain emits; host services subscribe via
822 /// [`BotHandle::subscribe_signals`].
823 pub fn signal_bus(&self) -> &SignalBus {
824 &self.signal_bus
825 }
826
827 /// Spawn the framework services and run until shutdown.
828 ///
829 /// Returns after all spawned services have drained (or the configured
830 /// shutdown timeout elapses). Consumes `self` to make the
831 /// "construct → run → exit" lifecycle explicit; persistent observation
832 /// of the running bot is done via the [`BotHandle`] obtained earlier.
833 ///
834 /// # Runtime requirements
835 ///
836 /// - **Multi-thread tokio runtime.** The supervisor spawns each
837 /// service onto `tokio::spawn`. A current-thread runtime works
838 /// for small loads but loses the per-service parallelism the
839 /// framework is designed for. Use
840 /// `#[tokio::main(flavor = "multi_thread")]` or
841 /// `tokio::runtime::Builder::new_multi_thread()` in the host.
842 /// - **`tokio::spawn` is used internally.** Anywhere the host
843 /// embeds this method, a tokio runtime context must be active.
844 /// - **No nested runtimes.** `Bot::run_until_shutdown` is async; do
845 /// not call `block_on` on it from inside another runtime.
846 ///
847 /// # Resource expectations
848 ///
849 /// - **Memory per active symbol:** O(few hundred bytes) for the
850 /// position cache entry, `SymbolRisk` (a `SessionPnl` plus a
851 /// `CircuitBreaker` whose ring buffer is bounded by
852 /// `loss_limit`), plus the per-symbol slot in any host-owned
853 /// subscriber.
854 /// - **Channel buffers:** `market_bus_capacity` + `signal_bus_capacity`
855 /// slots per bus, each slot holding a clone of `MarketDataEvent` /
856 /// `Signal`. Drop-oldest semantics — back-pressure is *not*
857 /// propagated to publishers.
858 /// - **Expected shutdown time:** ≤ `shutdown_timeout`. A
859 /// well-behaved service responds to its cancel token in
860 /// milliseconds; the timeout is the worst-case bound, not the
861 /// typical case.
862 /// - **Restart-after-crash latency:** bounded by
863 /// [`BackoffConfig`](rustrade_supervisor::BackoffConfig). Defaults:
864 /// 100 ms base, 60 s cap, 10 retries within a 10-minute window
865 /// before the circuit breaker trips.
866 pub async fn run_until_shutdown(self) -> anyhow::Result<()> {
867 tracing::info!(
868 bot = %self.config.name,
869 brains = self.brains.len(),
870 symbols = self.config.symbols.len(),
871 exchange = %self.exchange.name(),
872 "rustrade Bot starting"
873 );
874
875 // Best-effort position prefetch — failures don't block startup.
876 self.prefetch_positions().await;
877
878 // Restore persisted risk state (if a store is wired) before any
879 // service runs, then publish the persister so the per-trade paths
880 // can save through it.
881 let persister = self
882 .state_store
883 .clone()
884 .map(|store| RiskPersister::new(store, self.config.name.clone()));
885 if let Some(p) = &persister {
886 p.restore_into(&self.risk).await;
887 // OnceLock: set is infallible the first (only) time per bot.
888 let _ = self.persister_slot.set(p.clone());
889 }
890
891 // Order tracking: only active when wired AND the adapter can list +
892 // cancel orders. Gate once here so both the execution service (which
893 // records resting orders) and the reaper agree.
894 let order_tracking_active =
895 self.order_tracking.is_some() && self.exchange.supports(Capability::OrderTracking);
896 if self.order_tracking.is_some() && !order_tracking_active {
897 tracing::warn!(
898 exchange = %self.exchange.name(),
899 "order tracking requested but adapter lacks Capability::OrderTracking — \
900 resting orders will NOT be tracked or aged out"
901 );
902 }
903
904 // Bracket (SL+TP / OCO) orders need three things: place protective
905 // stops (StopOrders), cancel the sibling on fill (OrderTracking), and
906 // detect that fill (a fill source). When all hold, a shared OCO
907 // registry links each bracket's legs; otherwise a brain emitting both
908 // SL+TP falls back to a single attached stop-loss (see
909 // `ExecutionService::attach_protection`).
910 let brackets_active = self.exchange.supports(Capability::StopOrders)
911 && self.exchange.supports(Capability::OrderTracking)
912 && self.fill_source.is_some();
913 let oco = brackets_active.then(crate::order_tracker::OcoRegistry::new);
914 tracing::info!(brackets_active, "bracket (SL+TP/OCO) support");
915
916 // Resolve each symbol's effective sizing (per-symbol → per-class by
917 // asset class → default) so multi-asset bots size each class correctly.
918 let sizing = Arc::new(crate::execution::SymbolSizing::new(
919 self.config.risk.sizing.clone(),
920 self.config
921 .symbols
922 .iter()
923 .map(|s| {
924 let class = self.exchange.instrument_spec(s).asset_class;
925 (s.clone(), self.config.resolve_risk(s, class).sizing.clone())
926 })
927 .collect(),
928 ));
929 // Account-wide portfolio risk, shared between the execution gate
930 // (reads it) and the risk sweep (maintains its daily-loss latch).
931 let portfolio: PortfolioRiskState = build_portfolio_risk(self.config.portfolio.clone());
932
933 let ctx = ExecutionContext {
934 exchange: self.exchange.clone(),
935 bus: self.market_bus.clone(),
936 signals: self.signal_bus.clone(),
937 positions: self.positions.clone(),
938 risk: self.risk.clone(),
939 portfolio: portfolio.clone(),
940 sizing,
941 order_tracker: order_tracking_active.then(|| self.order_tracker.clone()),
942 oco: oco.clone(),
943 };
944
945 for brain in self.brains.iter() {
946 let svc = ExecutionService::new(brain.clone(), ctx.clone());
947 self.supervisor.spawn_service(Box::new(svc));
948 }
949
950 // Periodic risk sweep: rolls per-symbol SessionPnl / CircuitBreaker and
951 // the portfolio halt over at 00:00 UTC during a live run (without it,
952 // tick() only fires on restart), and re-derives the account daily-loss
953 // latch from the per-symbol session PnLs.
954 self.supervisor
955 .spawn_service(Box::new(RiskSweepService::new(
956 self.risk.clone(),
957 portfolio.clone(),
958 RISK_SWEEP_CADENCE,
959 )));
960
961 if order_tracking_active {
962 // Safe: order_tracking_active implies order_tracking is Some.
963 let spec = self.order_tracking.as_ref().unwrap();
964 self.supervisor
965 .spawn_service(Box::new(OrderReaperService::new(
966 self.exchange.clone(),
967 self.order_tracker.clone(),
968 self.config.symbols.clone(),
969 spec.ttl,
970 spec.poll_cadence,
971 self.metrics.clone(),
972 )));
973 }
974
975 if let Some(source) = self.market_source.clone() {
976 self.supervisor
977 .spawn_service(Box::new(MarketFeedService::new(source)));
978 }
979
980 if let Some(source) = self.fill_source.clone() {
981 self.supervisor
982 .spawn_service(Box::new(FillRoutingService::new(
983 source,
984 self.brains.clone(),
985 self.exchange.clone(),
986 self.positions.clone(),
987 self.risk.clone(),
988 self.metrics.clone(),
989 persister.clone(),
990 oco.clone(),
991 )));
992 }
993
994 for spec in &self.candle_pollers {
995 self.supervisor
996 .spawn_service(Box::new(CandlePollerService::new(
997 spec.source.clone(),
998 spec.symbol.clone(),
999 spec.interval,
1000 spec.poll_cadence,
1001 spec.limit,
1002 self.market_bus.clone(),
1003 self.metrics.clone(),
1004 )));
1005 }
1006
1007 // External cancellation linker: when the host's token fires,
1008 // cancel the supervisor's root token. The reverse is not wired.
1009 if let Some(external) = self.external_cancel.clone() {
1010 let supervisor = self.supervisor.clone();
1011 tokio::spawn(async move {
1012 external.cancelled().await;
1013 tracing::info!("external cancellation received; triggering bot shutdown");
1014 supervisor.trigger_shutdown();
1015 });
1016 }
1017
1018 let run_result = self.supervisor.run_until_shutdown().await;
1019
1020 if self.config.close_positions_on_shutdown {
1021 self.close_open_positions().await;
1022 }
1023
1024 // Persist final risk state + flush before exit so the next boot
1025 // restores an up-to-date snapshot.
1026 if let Some(p) = &persister {
1027 p.persist_all(&self.risk).await;
1028 }
1029
1030 for brain in self.brains.iter() {
1031 let health = brain.health().await;
1032 tracing::info!(
1033 brain = %brain.name(),
1034 healthy = health.healthy,
1035 events = health.events_processed,
1036 non_hold = health.non_hold_decisions,
1037 "final brain health"
1038 );
1039 }
1040
1041 tracing::info!(bot = %self.config.name, "rustrade Bot exited");
1042 run_result
1043 }
1044
1045 async fn prefetch_positions(&self) {
1046 for symbol in &self.config.symbols {
1047 match self.exchange.get_position(symbol).await {
1048 Ok(pos) => {
1049 self.positions.write().await.insert(symbol.clone(), pos);
1050 tracing::debug!(
1051 symbol = %symbol,
1052 qty = pos.qty,
1053 "prefetched position from exchange"
1054 );
1055 }
1056 Err(e) => {
1057 tracing::warn!(
1058 symbol = %symbol,
1059 error = %e,
1060 "failed to prefetch position; cache defaults to FLAT"
1061 );
1062 }
1063 }
1064 }
1065 }
1066
1067 async fn close_open_positions(&self) {
1068 let snapshot: Vec<(Symbol, Position)> = {
1069 let map = self.positions.read().await;
1070 map.iter()
1071 .filter(|(_, p)| !p.is_flat())
1072 .map(|(s, p)| (s.clone(), *p))
1073 .collect()
1074 };
1075
1076 if snapshot.is_empty() {
1077 tracing::info!("close_positions_on_shutdown: no open positions");
1078 return;
1079 }
1080
1081 for (symbol, position) in snapshot {
1082 match self.exchange.close_position(&symbol, &position).await {
1083 Ok(order_id) => tracing::info!(
1084 symbol = %symbol,
1085 qty = position.qty,
1086 order_id = %order_id,
1087 "close_positions_on_shutdown: closed"
1088 ),
1089 Err(e) => tracing::error!(
1090 symbol = %symbol,
1091 qty = position.qty,
1092 error = %e,
1093 "close_positions_on_shutdown: failed (best-effort)"
1094 ),
1095 }
1096 }
1097 }
1098}
1099
1100// Tiny `pipe` helper local to this module for builder ergonomics — keeps
1101// the `Bot::new` body readable when conditionally applying builder methods.
1102trait Pipe: Sized {
1103 fn pipe<F: FnOnce(Self) -> Self>(self, f: F) -> Self {
1104 f(self)
1105 }
1106}
1107impl<T> Pipe for T {}
1108
1109#[cfg(test)]
1110mod tests {
1111 use super::*;
1112 use async_trait::async_trait;
1113 use rustrade_core::{Fill, MarketDataEvent, Order, Position};
1114
1115 #[test]
1116 fn presets_differ_in_leverage_by_class() {
1117 assert_eq!(RiskConfig::crypto_perp().sizing.leverage, 5);
1118 assert_eq!(RiskConfig::crypto_spot().sizing.leverage, 1);
1119 assert_eq!(RiskConfig::fx().sizing.leverage, 20);
1120 assert_eq!(RiskConfig::futures().sizing.leverage, 10);
1121 assert_eq!(RiskConfig::equity().sizing.leverage, 1);
1122 }
1123
1124 #[test]
1125 fn preset_for_maps_each_class_with_perp_fallback() {
1126 assert_eq!(
1127 RiskConfig::preset_for(AssetClass::CryptoSpot)
1128 .sizing
1129 .leverage,
1130 1
1131 );
1132 assert_eq!(RiskConfig::preset_for(AssetClass::Fx).sizing.leverage, 20);
1133 assert_eq!(
1134 RiskConfig::preset_for(AssetClass::Future).sizing.leverage,
1135 10
1136 );
1137 assert_eq!(
1138 RiskConfig::preset_for(AssetClass::Equity).sizing.leverage,
1139 1
1140 );
1141 // CryptoPerp and the catch-all (`Other` / future classes) → perp shape.
1142 assert_eq!(
1143 RiskConfig::preset_for(AssetClass::CryptoPerp)
1144 .sizing
1145 .leverage,
1146 5
1147 );
1148 assert_eq!(RiskConfig::preset_for(AssetClass::Other).sizing.leverage, 5);
1149 }
1150
1151 #[test]
1152 fn resolve_risk_precedence_symbol_then_class_then_default() {
1153 let cfg = BotConfig::builder()
1154 .name("t")
1155 .symbols(["XBTUSDTM", "EURUSD", "ETHUSDTM"])
1156 .without_signal_handler()
1157 .class_risk(AssetClass::Fx, RiskConfig::fx())
1158 .symbol_risk(
1159 "ETHUSDTM",
1160 RiskConfig {
1161 sizing: SizingConfig {
1162 margin_per_trade: 1.0,
1163 leverage: 7,
1164 max_contracts: 1,
1165 },
1166 ..Default::default()
1167 },
1168 )
1169 .build()
1170 .unwrap();
1171
1172 // Per-symbol override wins even when a class override would also match.
1173 assert_eq!(
1174 cfg.resolve_risk(&"ETHUSDTM".into(), AssetClass::Fx)
1175 .sizing
1176 .leverage,
1177 7
1178 );
1179 // No per-symbol override → the per-class override applies.
1180 assert_eq!(
1181 cfg.resolve_risk(&"EURUSD".into(), AssetClass::Fx)
1182 .sizing
1183 .leverage,
1184 20
1185 );
1186 // Neither → the bot-wide default.
1187 let default_lev = RiskConfig::default().sizing.leverage;
1188 assert_eq!(
1189 cfg.resolve_risk(&"XBTUSDTM".into(), AssetClass::CryptoPerp)
1190 .sizing
1191 .leverage,
1192 default_lev
1193 );
1194 }
1195
1196 struct NoopBrain;
1197 #[async_trait]
1198 impl Brain for NoopBrain {
1199 fn name(&self) -> &str {
1200 "noop"
1201 }
1202 async fn on_event(
1203 &self,
1204 _e: &MarketDataEvent,
1205 _p: &Position,
1206 ) -> Result<rustrade_core::Decision> {
1207 Ok(rustrade_core::Decision::hold())
1208 }
1209 }
1210
1211 /// Brain that declares ownership of a fixed symbol set, for the
1212 /// multi-brain arbitration guard tests.
1213 struct OwningBrain {
1214 name: &'static str,
1215 owns: Vec<Symbol>,
1216 }
1217 #[async_trait]
1218 impl Brain for OwningBrain {
1219 fn name(&self) -> &str {
1220 self.name
1221 }
1222 fn owned_symbols(&self) -> Option<Vec<Symbol>> {
1223 Some(self.owns.clone())
1224 }
1225 async fn on_event(
1226 &self,
1227 _e: &MarketDataEvent,
1228 _p: &Position,
1229 ) -> Result<rustrade_core::Decision> {
1230 Ok(rustrade_core::Decision::hold())
1231 }
1232 }
1233
1234 struct NoopExchange;
1235 #[async_trait]
1236 impl ExchangeClient for NoopExchange {
1237 fn name(&self) -> &str {
1238 "noop"
1239 }
1240 async fn place_order(&self, _o: &Order) -> Result<String> {
1241 Ok("noop-1".into())
1242 }
1243 async fn cancel_all(&self, _s: &Symbol) -> Result<usize> {
1244 Ok(0)
1245 }
1246 async fn close_position(&self, _s: &Symbol, _p: &Position) -> Result<String> {
1247 Ok("noop-close".into())
1248 }
1249 async fn get_position(&self, _s: &Symbol) -> Result<Position> {
1250 Ok(Position::FLAT)
1251 }
1252 async fn get_balance(&self, _c: &str) -> Result<f64> {
1253 Ok(0.0)
1254 }
1255 }
1256
1257 fn cfg() -> BotConfig {
1258 BotConfig::builder()
1259 .name("test")
1260 .symbol("BTCUSDT")
1261 .without_signal_handler()
1262 .build()
1263 .unwrap()
1264 }
1265
1266 #[test]
1267 fn builder_requires_name() {
1268 let err = BotConfig::builder().build().unwrap_err();
1269 assert!(matches!(err, Error::Config(_)), "got {err:?}");
1270 }
1271
1272 #[test]
1273 fn builder_rejects_blank_name() {
1274 let err = BotConfig::builder().name(" ").build().unwrap_err();
1275 assert!(matches!(err, Error::Config(_)), "got {err:?}");
1276 }
1277
1278 #[test]
1279 fn builder_rejects_zero_market_bus_capacity() {
1280 let err = BotConfig::builder()
1281 .name("x")
1282 .symbol("BTCUSDT")
1283 .market_bus_capacity(0)
1284 .build()
1285 .unwrap_err();
1286 assert!(matches!(err, Error::Config(_)));
1287 }
1288
1289 #[test]
1290 fn builder_rejects_zero_signal_bus_capacity() {
1291 let err = BotConfig::builder()
1292 .name("x")
1293 .symbol("BTCUSDT")
1294 .signal_bus_capacity(0)
1295 .build()
1296 .unwrap_err();
1297 assert!(matches!(err, Error::Config(_)));
1298 }
1299
1300 #[test]
1301 fn builder_rejects_empty_symbol_list() {
1302 let err = BotConfig::builder().name("x").build().unwrap_err();
1303 assert!(matches!(err, Error::Config(_)));
1304 }
1305
1306 #[test]
1307 fn builder_rejects_zero_shutdown_timeout() {
1308 let err = BotConfig::builder()
1309 .name("x")
1310 .symbol("BTCUSDT")
1311 .shutdown_timeout(Duration::ZERO)
1312 .build()
1313 .unwrap_err();
1314 assert!(matches!(err, Error::Config(_)));
1315 }
1316
1317 #[test]
1318 fn builder_rejects_nan_loss_limit() {
1319 let err = BotConfig::builder()
1320 .name("x")
1321 .symbol("BTCUSDT")
1322 .session_pnl_config(SessionPnlConfig {
1323 loss_limit: f64::NAN,
1324 })
1325 .build()
1326 .unwrap_err();
1327 assert!(matches!(err, Error::Config(_)));
1328 }
1329
1330 #[test]
1331 fn builder_rejects_non_finite_margin() {
1332 let err = BotConfig::builder()
1333 .name("x")
1334 .symbol("BTCUSDT")
1335 .sizing_config(SizingConfig {
1336 margin_per_trade: f64::INFINITY,
1337 leverage: 1,
1338 max_contracts: 1,
1339 })
1340 .build()
1341 .unwrap_err();
1342 assert!(matches!(err, Error::Config(_)));
1343 }
1344
1345 #[test]
1346 fn builder_accumulates_symbols() {
1347 let c = BotConfig::builder()
1348 .name("x")
1349 .symbol("A")
1350 .symbols(["B", "C"])
1351 .build()
1352 .unwrap();
1353 assert_eq!(c.symbols.len(), 3);
1354 assert_eq!(c.symbols[0], Symbol::new("A"));
1355 assert_eq!(c.symbols[2], Symbol::new("C"));
1356 }
1357
1358 #[test]
1359 fn builder_accepts_risk_overrides() {
1360 let c = BotConfig::builder()
1361 .name("x")
1362 .symbol("BTCUSDT")
1363 .session_pnl_config(SessionPnlConfig { loss_limit: -123.0 })
1364 .sizing_config(SizingConfig {
1365 margin_per_trade: 250.0,
1366 leverage: 10,
1367 max_contracts: 5,
1368 })
1369 .build()
1370 .unwrap();
1371 assert_eq!(c.risk.session_pnl.loss_limit, -123.0);
1372 assert_eq!(c.risk.sizing.leverage, 10);
1373 }
1374
1375 #[test]
1376 fn per_symbol_risk_override_and_fallback() {
1377 let c = BotConfig::builder()
1378 .name("x")
1379 .symbols(["BTCUSDT", "ETHUSDT"])
1380 .session_pnl_config(SessionPnlConfig { loss_limit: -100.0 }) // default
1381 .symbol_risk(
1382 "BTCUSDT",
1383 RiskConfig {
1384 session_pnl: SessionPnlConfig { loss_limit: -25.0 },
1385 ..Default::default()
1386 },
1387 )
1388 .build()
1389 .unwrap();
1390 // BTC uses its override; ETH (no override) uses the default.
1391 assert_eq!(
1392 c.risk_for(&Symbol::from("BTCUSDT")).session_pnl.loss_limit,
1393 -25.0
1394 );
1395 assert_eq!(
1396 c.risk_for(&Symbol::from("ETHUSDT")).session_pnl.loss_limit,
1397 -100.0
1398 );
1399 }
1400
1401 #[test]
1402 fn builder_rejects_invalid_per_symbol_override() {
1403 let err = BotConfig::builder()
1404 .name("x")
1405 .symbol("BTCUSDT")
1406 .symbol_risk(
1407 "BTCUSDT",
1408 RiskConfig {
1409 session_pnl: SessionPnlConfig {
1410 loss_limit: f64::NAN,
1411 },
1412 ..Default::default()
1413 },
1414 )
1415 .build()
1416 .unwrap_err();
1417 assert!(matches!(err, Error::Config(_)), "got {err:?}");
1418 }
1419
1420 #[test]
1421 fn builder_has_separate_default_bus_capacities() {
1422 let c = BotConfig::builder()
1423 .name("x")
1424 .symbol("BTCUSDT")
1425 .build()
1426 .unwrap();
1427 assert_eq!(c.market_bus_capacity, 1024);
1428 assert_eq!(c.signal_bus_capacity, 256);
1429 }
1430
1431 #[tokio::test]
1432 async fn bot_requires_at_least_one_brain() {
1433 match Bot::new(cfg(), Arc::new(NoopExchange), vec![]) {
1434 Err(Error::Config(_)) => {}
1435 other => panic!(
1436 "expected Error::Config for empty brain list, got {:?}",
1437 other.map(|_| "Ok(Bot)").map_err(|e| format!("Err({e})"))
1438 ),
1439 }
1440 }
1441
1442 #[tokio::test]
1443 async fn bot_constructs_and_exposes_handle() {
1444 let bot = Bot::new(cfg(), Arc::new(NoopExchange), vec![Arc::new(NoopBrain)]).unwrap();
1445 let handle = bot.handle();
1446 assert!(!handle.is_shutting_down());
1447 assert_eq!(bot.config().name, "test");
1448 let h2 = handle.clone();
1449 assert!(!h2.is_shutting_down());
1450 }
1451
1452 #[tokio::test]
1453 async fn rejects_overlapping_owned_symbols() {
1454 // Two brains both claim BTCUSDT → config error.
1455 let cfg = BotConfig::builder()
1456 .name("x")
1457 .symbols(["BTCUSDT", "ETHUSDT"])
1458 .without_signal_handler()
1459 .build()
1460 .unwrap();
1461 let a = Arc::new(OwningBrain {
1462 name: "a",
1463 owns: vec![Symbol::from("BTCUSDT")],
1464 });
1465 let b = Arc::new(OwningBrain {
1466 name: "b",
1467 owns: vec![Symbol::from("BTCUSDT"), Symbol::from("ETHUSDT")],
1468 });
1469 assert!(
1470 matches!(
1471 Bot::new(cfg, Arc::new(NoopExchange), vec![a, b]),
1472 Err(Error::Config(_))
1473 ),
1474 "overlapping owned_symbols must be rejected"
1475 );
1476 }
1477
1478 #[tokio::test]
1479 async fn accepts_disjoint_owned_symbols() {
1480 // Disjoint ownership is fine.
1481 let cfg = BotConfig::builder()
1482 .name("x")
1483 .symbols(["BTCUSDT", "ETHUSDT"])
1484 .without_signal_handler()
1485 .build()
1486 .unwrap();
1487 let a = Arc::new(OwningBrain {
1488 name: "a",
1489 owns: vec![Symbol::from("BTCUSDT")],
1490 });
1491 let b = Arc::new(OwningBrain {
1492 name: "b",
1493 owns: vec![Symbol::from("ETHUSDT")],
1494 });
1495 assert!(Bot::new(cfg, Arc::new(NoopExchange), vec![a, b]).is_ok());
1496 }
1497
1498 #[tokio::test]
1499 async fn none_owners_are_not_guarded() {
1500 // Two `None` (catch-all) brains coexist — they opt out of the guard.
1501 let bot = Bot::new(
1502 cfg(),
1503 Arc::new(NoopExchange),
1504 vec![Arc::new(NoopBrain), Arc::new(NoopBrain)],
1505 );
1506 assert!(bot.is_ok());
1507 }
1508
1509 #[allow(dead_code)]
1510 fn _noop_fill_compiles(_: &Fill) {}
1511}