1use std::collections::{HashMap, HashSet};
2use std::net::SocketAddr;
3use std::path::PathBuf;
4use std::sync::{
5 atomic::{AtomicBool, AtomicI64, Ordering},
6 Arc, Once,
7};
8use std::time::{Duration, Instant};
9
10use anyhow::{anyhow, bail, Context, Result};
11use chrono::{DateTime, Utc};
12use clap::ValueEnum;
13use futures::StreamExt;
14use rust_decimal::{prelude::ToPrimitive, Decimal};
15use tokio::sync::{mpsc, Mutex, Notify};
16use tokio::task::JoinHandle;
17use tokio_tungstenite::tungstenite::Message;
18use tracing::{debug, error, info, trace, warn};
19use uuid::Uuid;
20
21use serde_json::{json, Value};
22
23fn ensure_builtin_connectors_registered() {
24 static INIT: Once = Once::new();
25 INIT.call_once(|| {
26 register_connector_factory(Arc::new(PaperFactory::default()));
27 #[cfg(feature = "bybit")]
28 register_bybit_factory();
29 #[cfg(feature = "binance")]
30 register_binance_factory();
31 });
32}
33
34#[cfg(feature = "binance")]
35use tesser_binance::{
36 fill_from_update, order_from_update, register_factory as register_binance_factory,
37 ws::{extract_order_update, BinanceUserDataStream, UserDataStreamEventsResponse},
38 BinanceClient,
39};
40use tesser_broker::{
41 get_connector_factory, register_connector_factory, BrokerResult, ConnectorFactory,
42 ConnectorStream, ConnectorStreamConfig, ExecutionClient, RouterExecutionClient,
43};
44#[cfg(feature = "bybit")]
45use tesser_bybit::ws::{BybitWsExecution, BybitWsOrder, PrivateMessage};
46#[cfg(feature = "bybit")]
47use tesser_bybit::{register_factory as register_bybit_factory, BybitClient, BybitCredentials};
48use tesser_config::{AlertingConfig, ExchangeConfig, PersistenceEngine, RiskManagementConfig};
49use tesser_core::{
50 AccountBalance, AssetId, Candle, ExchangeId, Fill, Interval, Order, OrderBook, OrderStatus,
51 Position, Price, Quantity, Side, Signal, SignalKind, Symbol, Tick,
52};
53use tesser_data::recorder::{ParquetRecorder, RecorderConfig, RecorderHandle};
54use tesser_events::{
55 CandleEvent, Event, EventBus, FillEvent, OrderBookEvent, OrderUpdateEvent, SignalEvent,
56 TickEvent,
57};
58use tesser_execution::{
59 AlgoStateRepository, BasicRiskChecker, ExecutionEngine, FixedOrderSizer, OrderOrchestrator,
60 PanicCloseConfig, PanicObserver, PreTradeRiskChecker, RiskContext, RiskLimits,
61 SqliteAlgoStateRepository, StoredAlgoState,
62};
63use tesser_journal::LmdbJournal;
64use tesser_markets::{InstrumentCatalog, MarketRegistry};
65use tesser_paper::{FeeScheduleConfig, PaperExecutionClient, PaperFactory};
66use tesser_portfolio::{
67 LiveState, Portfolio, PortfolioConfig, SqliteStateRepository, StateRepository,
68};
69use tesser_strategy::{Strategy, StrategyContext};
70
71use crate::alerts::{AlertDispatcher, AlertManager};
72use crate::control;
73use crate::telemetry::{spawn_metrics_server, LiveMetrics};
74use crate::PublicChannel;
75
76#[derive(Debug)]
78pub enum BrokerEvent {
79 OrderUpdate(Order),
80 Fill(Fill),
81}
82
83struct PanicAlertHook {
84 metrics: Arc<LiveMetrics>,
85 alerts: Arc<AlertManager>,
86}
87
88impl PanicAlertHook {
89 fn new(metrics: Arc<LiveMetrics>, alerts: Arc<AlertManager>) -> Self {
90 Self { metrics, alerts }
91 }
92}
93
94impl PanicObserver for PanicAlertHook {
95 fn on_group_event(&self, group_id: Uuid, symbol: Symbol, quantity: Quantity, reason: &str) {
96 self.metrics.inc_panic_close();
97 let alerts = self.alerts.clone();
98 let title = "Execution group panic close";
99 let message = format!("Group {group_id} panic-closed {symbol} qty={quantity}: {reason}");
100 tokio::spawn(async move {
101 alerts.notify(title, &message).await;
102 });
103 }
104}
105
106#[derive(Clone, Copy, Debug, Eq, PartialEq, ValueEnum)]
107#[value(rename_all = "kebab-case")]
108pub enum ExecutionBackend {
109 Paper,
110 Live,
111}
112
113impl ExecutionBackend {
114 fn is_paper(self) -> bool {
115 matches!(self, Self::Paper)
116 }
117}
118
119#[derive(Clone, Copy, Debug, Eq, PartialEq, ValueEnum)]
120#[value(rename_all = "kebab-case")]
121pub enum PersistenceBackend {
122 Sqlite,
123 Lmdb,
124}
125
126impl From<PersistenceBackend> for PersistenceEngine {
127 fn from(value: PersistenceBackend) -> Self {
128 match value {
129 PersistenceBackend::Sqlite => PersistenceEngine::Sqlite,
130 PersistenceBackend::Lmdb => PersistenceEngine::Lmdb,
131 }
132 }
133}
134
135const DEFAULT_ORDER_BOOK_DEPTH: usize = 50;
136
137pub const fn default_order_book_depth() -> usize {
138 DEFAULT_ORDER_BOOK_DEPTH
139}
140const STRATEGY_LOCK_WARN_THRESHOLD: Duration = Duration::from_millis(25);
141const STRATEGY_CALL_WARN_THRESHOLD: Duration = Duration::from_millis(250);
142const MARKET_EVENT_TIMEOUT: Duration = Duration::from_millis(10);
143
144#[async_trait::async_trait]
145trait LiveMarketStream: Send {
146 async fn next_tick(&mut self) -> BrokerResult<Option<Tick>>;
147 async fn next_candle(&mut self) -> BrokerResult<Option<Candle>>;
148 async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>>;
149}
150
151struct FactoryStreamAdapter {
152 inner: Box<dyn ConnectorStream>,
153}
154
155impl FactoryStreamAdapter {
156 fn new(inner: Box<dyn ConnectorStream>) -> Self {
157 Self { inner }
158 }
159}
160
161#[async_trait::async_trait]
162impl LiveMarketStream for FactoryStreamAdapter {
163 async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
164 self.inner.next_tick().await
165 }
166
167 async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
168 self.inner.next_candle().await
169 }
170
171 async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>> {
172 self.inner.next_order_book().await
173 }
174}
175
176struct RouterMarketStream {
177 tick_rx: mpsc::Receiver<Tick>,
178 candle_rx: mpsc::Receiver<Candle>,
179 book_rx: mpsc::Receiver<OrderBook>,
180 tasks: Vec<JoinHandle<()>>,
181}
182
183impl RouterMarketStream {
184 fn new(streams: Vec<(String, Box<dyn LiveMarketStream>)>, shutdown: ShutdownSignal) -> Self {
185 let (tick_tx, tick_rx) = mpsc::channel(512);
186 let (candle_tx, candle_rx) = mpsc::channel(512);
187 let (book_tx, book_rx) = mpsc::channel(512);
188 let mut tasks = Vec::new();
189 for (name, mut stream) in streams {
190 let tick_tx = tick_tx.clone();
191 let candle_tx = candle_tx.clone();
192 let book_tx = book_tx.clone();
193 let shutdown = shutdown.clone();
194 tasks.push(tokio::spawn(async move {
195 loop {
196 if shutdown.triggered() {
197 break;
198 }
199 let mut emitted = false;
200
201 let tick = tokio::select! {
202 res = stream.next_tick() => res,
203 _ = shutdown.wait() => break,
204 };
205 match tick {
206 Ok(Some(event)) => {
207 emitted = true;
208 if tick_tx.send(event).await.is_err() {
209 break;
210 }
211 }
212 Ok(None) => {}
213 Err(err) => {
214 warn!(exchange = %name, error = %err, "market stream tick failed");
215 break;
216 }
217 }
218
219 let candle = tokio::select! {
220 res = stream.next_candle() => res,
221 _ = shutdown.wait() => break,
222 };
223 match candle {
224 Ok(Some(event)) => {
225 emitted = true;
226 if candle_tx.send(event).await.is_err() {
227 break;
228 }
229 }
230 Ok(None) => {}
231 Err(err) => {
232 warn!(exchange = %name, error = %err, "market stream candle failed");
233 break;
234 }
235 }
236
237 let book = tokio::select! {
238 res = stream.next_order_book() => res,
239 _ = shutdown.wait() => break,
240 };
241 match book {
242 Ok(Some(event)) => {
243 emitted = true;
244 if book_tx.send(event).await.is_err() {
245 break;
246 }
247 }
248 Ok(None) => {}
249 Err(err) => {
250 warn!(exchange = %name, error = %err, "market stream order book failed");
251 break;
252 }
253 }
254
255 if !emitted && !shutdown.sleep(Duration::from_millis(5)).await {
256 break;
257 }
258 }
259 }));
260 }
261 Self {
262 tick_rx,
263 candle_rx,
264 book_rx,
265 tasks,
266 }
267 }
268}
269
270#[async_trait::async_trait]
271impl LiveMarketStream for RouterMarketStream {
272 async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
273 Ok(self.tick_rx.recv().await)
274 }
275
276 async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
277 Ok(self.candle_rx.recv().await)
278 }
279
280 async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>> {
281 Ok(self.book_rx.recv().await)
282 }
283}
284
285impl Drop for RouterMarketStream {
286 fn drop(&mut self) {
287 for handle in &self.tasks {
288 handle.abort();
289 }
290 }
291}
292
293#[derive(Clone)]
294pub struct PersistenceSettings {
295 pub engine: PersistenceEngine,
296 pub state_path: PathBuf,
297 pub algo_path: PathBuf,
298}
299
300impl PersistenceSettings {
301 pub fn new(engine: PersistenceEngine, state_path: PathBuf) -> Self {
302 let algo_path = match engine {
303 PersistenceEngine::Sqlite => state_path.with_extension("algos.db"),
304 PersistenceEngine::Lmdb => state_path.clone(),
305 };
306 Self {
307 engine,
308 state_path,
309 algo_path,
310 }
311 }
312
313 fn algo_repo_path(&self) -> &PathBuf {
314 &self.algo_path
315 }
316}
317
318struct PersistenceHandles {
319 state: Arc<dyn StateRepository<Snapshot = LiveState>>,
320 algo: Arc<dyn AlgoStateRepository<State = StoredAlgoState>>,
321}
322
323#[derive(Clone)]
324pub struct NamedExchange {
325 pub name: String,
326 pub config: ExchangeConfig,
327}
328
329struct ExchangeRoute {
330 name: String,
331 driver: String,
332 #[cfg(feature = "binance")]
333 ws_url: String,
334 execution: Arc<dyn ExecutionClient>,
335}
336
337struct ExchangeBuildResult {
338 execution_client: Arc<dyn ExecutionClient>,
339 router: Option<Arc<RouterExecutionClient>>,
340 market_stream: Box<dyn LiveMarketStream>,
341 routes: Vec<ExchangeRoute>,
342}
343
344pub struct LiveSessionSettings {
345 pub category: PublicChannel,
346 pub interval: Interval,
347 pub quantity: Quantity,
348 pub slippage_bps: Decimal,
349 pub fee_bps: Decimal,
350 pub history: usize,
351 pub metrics_addr: SocketAddr,
352 pub persistence: PersistenceSettings,
353 pub initial_balances: HashMap<AssetId, Decimal>,
354 pub reporting_currency: AssetId,
355 pub markets_file: Option<PathBuf>,
356 pub alerting: AlertingConfig,
357 pub exec_backend: ExecutionBackend,
358 pub risk: RiskManagementConfig,
359 pub reconciliation_interval: Duration,
360 pub reconciliation_threshold: Decimal,
361 pub orderbook_depth: usize,
362 pub record_path: Option<PathBuf>,
363 pub control_addr: SocketAddr,
364 pub panic_close: PanicCloseConfig,
365}
366
367impl LiveSessionSettings {
368 fn risk_limits(&self) -> RiskLimits {
369 RiskLimits {
370 max_order_quantity: self.risk.max_order_quantity.max(Decimal::ZERO),
371 max_position_quantity: self.risk.max_position_quantity.max(Decimal::ZERO),
372 max_order_notional: self
373 .risk
374 .max_order_notional
375 .and_then(|limit| (limit > Decimal::ZERO).then_some(limit)),
376 }
377 }
378}
379
380fn build_persistence_handles(settings: &LiveSessionSettings) -> Result<PersistenceHandles> {
381 match settings.persistence.engine {
382 PersistenceEngine::Sqlite => {
383 let state_repo: Arc<dyn StateRepository<Snapshot = LiveState>> = Arc::new(
384 SqliteStateRepository::new(settings.persistence.state_path.clone()),
385 );
386 let algo_repo: Arc<dyn AlgoStateRepository<State = StoredAlgoState>> = Arc::new(
387 SqliteAlgoStateRepository::new(settings.persistence.algo_repo_path())?,
388 );
389 Ok(PersistenceHandles {
390 state: state_repo,
391 algo: algo_repo,
392 })
393 }
394 PersistenceEngine::Lmdb => {
395 let journal = Arc::new(LmdbJournal::open(&settings.persistence.state_path)?);
396 let state_repo: Arc<dyn StateRepository<Snapshot = LiveState>> =
397 Arc::new(journal.state_repo());
398 let algo_repo: Arc<dyn AlgoStateRepository<State = StoredAlgoState>> =
399 Arc::new(journal.algo_repo());
400 Ok(PersistenceHandles {
401 state: state_repo,
402 algo: algo_repo,
403 })
404 }
405 }
406}
407
408pub async fn run_live(
409 strategy: Box<dyn Strategy>,
410 symbols: Vec<Symbol>,
411 exchanges: Vec<NamedExchange>,
412 settings: LiveSessionSettings,
413) -> Result<()> {
414 run_live_with_shutdown(
415 strategy,
416 symbols,
417 exchanges,
418 settings,
419 ShutdownSignal::new(),
420 )
421 .await
422}
423
424pub async fn run_live_with_shutdown(
426 strategy: Box<dyn Strategy>,
427 symbols: Vec<Symbol>,
428 exchanges: Vec<NamedExchange>,
429 settings: LiveSessionSettings,
430 shutdown: ShutdownSignal,
431) -> Result<()> {
432 if symbols.is_empty() {
433 return Err(anyhow!("strategy did not declare any subscriptions"));
434 }
435 if settings.quantity <= Decimal::ZERO {
436 return Err(anyhow!("--quantity must be positive"));
437 }
438
439 let public_connection = Arc::new(AtomicBool::new(false));
440 let private_connection = if matches!(settings.exec_backend, ExecutionBackend::Live) {
441 Some(Arc::new(AtomicBool::new(false)))
442 } else {
443 None
444 };
445 ensure_builtin_connectors_registered();
446 if exchanges.is_empty() {
447 return Err(anyhow!("no exchange profiles supplied"));
448 }
449 let symbol_codes: Vec<String> = symbols
450 .iter()
451 .map(|symbol| symbol.code().to_string())
452 .collect();
453 let driver_label = exchanges
454 .iter()
455 .map(|ex| ex.config.driver.clone())
456 .collect::<Vec<_>>()
457 .join(",");
458
459 let ExchangeBuildResult {
460 execution_client,
461 router,
462 market_stream,
463 routes,
464 } = build_exchange_routes(
465 &settings,
466 &exchanges,
467 &symbols,
468 &symbol_codes,
469 public_connection.clone(),
470 shutdown.clone(),
471 )
472 .await?;
473 let market_registry = load_market_registry(execution_client.clone(), &settings).await?;
474 if matches!(settings.exec_backend, ExecutionBackend::Live) {
475 info!(drivers = %driver_label, "live execution enabled");
476 }
477 let risk_checker: Arc<dyn PreTradeRiskChecker> =
478 Arc::new(BasicRiskChecker::new(settings.risk_limits()));
479 let execution = ExecutionEngine::new(
480 execution_client.clone(),
481 Box::new(FixedOrderSizer {
482 quantity: settings.quantity,
483 }),
484 risk_checker,
485 );
486
487 let mut bootstrap = None;
488 if matches!(settings.exec_backend, ExecutionBackend::Live) {
489 info!("synchronizing portfolio snapshot from exchange");
490 let positions = execution_client
491 .positions()
492 .await
493 .context("failed to fetch remote positions")?;
494 let balances = execution_client
495 .account_balances()
496 .await
497 .context("failed to fetch remote account balances")?;
498 let mut open_orders = Vec::new();
499 for symbol in &symbols {
500 let mut symbol_orders = execution_client
501 .list_open_orders(*symbol)
502 .await
503 .with_context(|| format!("failed to fetch open orders for {}", symbol.code()))?;
504 open_orders.append(&mut symbol_orders);
505 }
506 bootstrap = Some(LiveBootstrap {
507 positions,
508 balances,
509 open_orders,
510 });
511 }
512
513 let persistence = build_persistence_handles(&settings)?;
514
515 let metrics = Arc::new(LiveMetrics::new());
516 let alerting_cfg = settings.alerting.clone();
517 let dispatcher = AlertDispatcher::new(alerting_cfg.webhook_url.clone());
518 let alerts = Arc::new(AlertManager::new(
519 alerting_cfg,
520 dispatcher,
521 Some(public_connection.clone()),
522 private_connection.clone(),
523 ));
524 let panic_hook: Arc<dyn PanicObserver> =
525 Arc::new(PanicAlertHook::new(metrics.clone(), alerts.clone()));
526
527 let initial_open_orders = bootstrap
529 .as_ref()
530 .map(|data| data.open_orders.clone())
531 .unwrap_or_default();
532 let orchestrator = OrderOrchestrator::new(
533 Arc::new(execution),
534 persistence.algo.clone(),
535 initial_open_orders,
536 settings.panic_close,
537 Some(panic_hook.clone()),
538 )
539 .await?;
540
541 let runtime = LiveRuntime::new(
542 market_stream,
543 strategy,
544 symbols,
545 routes,
546 router.clone(),
547 orchestrator,
548 persistence.state,
549 settings,
550 metrics,
551 alerts,
552 market_registry,
553 shutdown,
554 public_connection,
555 private_connection,
556 bootstrap,
557 )
558 .await?;
559 runtime.run().await
560}
561
562async fn build_exchange_routes(
563 settings: &LiveSessionSettings,
564 exchanges: &[NamedExchange],
565 symbols: &[Symbol],
566 symbol_codes: &[String],
567 connection_flag: Arc<AtomicBool>,
568 shutdown: ShutdownSignal,
569) -> Result<ExchangeBuildResult> {
570 let mut stream_sources: Vec<(String, Box<dyn LiveMarketStream>)> = Vec::new();
571 let mut router_inputs: HashMap<ExchangeId, Arc<dyn ExecutionClient>> = HashMap::new();
572 let mut routes = Vec::new();
573
574 for exchange in exchanges {
575 let payload = build_exchange_payload(&exchange.config, settings, &exchange.name);
576 let driver = exchange.config.driver.clone();
577 let factory = get_connector_factory(&driver)
578 .ok_or_else(|| anyhow!("driver {} is not registered", driver))?;
579 let stream_config = ConnectorStreamConfig {
580 ws_url: Some(exchange.config.ws_url.clone()),
581 metadata: json!({
582 "category": settings.category.as_path(),
583 "symbols": symbol_codes,
584 "orderbook_depth": settings.orderbook_depth,
585 }),
586 connection_status: Some(connection_flag.clone()),
587 };
588 let mut connector_stream = factory
589 .create_market_stream(&payload, stream_config)
590 .await
591 .map_err(|err| {
592 anyhow!(
593 "failed to create market stream for {}: {err}",
594 exchange.name
595 )
596 })?;
597 connector_stream
598 .subscribe(symbol_codes, settings.interval)
599 .await
600 .map_err(|err| anyhow!("failed to subscribe {}: {err}", exchange.name))?;
601 stream_sources.push((
602 exchange.name.clone(),
603 Box::new(FactoryStreamAdapter::new(connector_stream)),
604 ));
605
606 let execution_client =
607 build_single_execution_client(settings, &driver, factory, &payload, symbols).await?;
608 let exchange_id = ExchangeId::from(exchange.name.as_str());
609 router_inputs.insert(exchange_id, execution_client.clone());
610 routes.push(ExchangeRoute {
611 name: exchange.name.clone(),
612 driver,
613 #[cfg(feature = "binance")]
614 ws_url: exchange.config.ws_url.clone(),
615 execution: execution_client.clone(),
616 });
617 }
618
619 let (execution_client, router_handle): (
620 Arc<dyn ExecutionClient>,
621 Option<Arc<RouterExecutionClient>>,
622 ) = if router_inputs.len() == 1 {
623 (router_inputs.into_values().next().unwrap(), None)
624 } else {
625 let router = Arc::new(RouterExecutionClient::new(router_inputs));
626 (router.clone(), Some(router))
627 };
628
629 let market_stream: Box<dyn LiveMarketStream> = if stream_sources.len() == 1 {
630 stream_sources.into_iter().next().unwrap().1
631 } else {
632 Box::new(RouterMarketStream::new(stream_sources, shutdown))
633 };
634
635 Ok(ExchangeBuildResult {
636 execution_client,
637 router: router_handle,
638 market_stream,
639 routes,
640 })
641}
642
643async fn build_single_execution_client(
644 settings: &LiveSessionSettings,
645 driver: &str,
646 connector_factory: Arc<dyn ConnectorFactory>,
647 connector_payload: &Value,
648 symbols: &[Symbol],
649) -> Result<Arc<dyn ExecutionClient>> {
650 match settings.exec_backend {
651 ExecutionBackend::Paper => {
652 if driver == "paper" {
653 return connector_factory
654 .create_execution_client(connector_payload)
655 .await
656 .map_err(|err| anyhow!("failed to create execution client: {err}"));
657 }
658 Ok(Arc::new(PaperExecutionClient::new(
659 format!("paper-{driver}"),
660 symbols.to_vec(),
661 settings.slippage_bps,
662 FeeScheduleConfig::with_defaults(
663 settings.fee_bps.max(Decimal::ZERO),
664 settings.fee_bps.max(Decimal::ZERO),
665 )
666 .build_model(),
667 )))
668 }
669 ExecutionBackend::Live => connector_factory
670 .create_execution_client(connector_payload)
671 .await
672 .map_err(|err| anyhow!("failed to create execution client: {err}")),
673 }
674}
675
676struct LiveRuntime {
677 market: Box<dyn LiveMarketStream>,
678 orchestrator: Arc<OrderOrchestrator>,
679 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
680 persisted: Arc<Mutex<LiveState>>,
681 event_bus: Arc<EventBus>,
682 recorder: Option<ParquetRecorder>,
683 control_task: Option<JoinHandle<()>>,
684 shutdown: ShutdownSignal,
685 metrics_task: JoinHandle<()>,
686 alert_task: Option<JoinHandle<()>>,
687 reconciliation_task: Option<JoinHandle<()>>,
688 reconciliation_ctx: Option<Arc<ReconciliationContext>>,
689 private_event_rx: mpsc::Receiver<BrokerEvent>,
690 #[allow(dead_code)]
691 last_private_sync: Arc<tokio::sync::Mutex<Option<DateTime<Utc>>>>,
692 subscriber_handles: Vec<JoinHandle<()>>,
693 connection_monitors: Vec<JoinHandle<()>>,
694 order_timeout_task: JoinHandle<()>,
695 strategy: Arc<Mutex<Box<dyn Strategy>>>,
696 _public_connection: Arc<AtomicBool>,
697 _private_connection: Option<Arc<AtomicBool>>,
698}
699
700struct LiveBootstrap {
701 positions: Vec<Position>,
702 balances: Vec<AccountBalance>,
703 open_orders: Vec<Order>,
704}
705
706impl LiveRuntime {
707 #[allow(clippy::too_many_arguments)]
708 async fn new(
709 market: Box<dyn LiveMarketStream>,
710 mut strategy: Box<dyn Strategy>,
711 symbols: Vec<Symbol>,
712 exchanges: Vec<ExchangeRoute>,
713 router: Option<Arc<RouterExecutionClient>>,
714 orchestrator: OrderOrchestrator,
715 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
716 settings: LiveSessionSettings,
717 metrics: Arc<LiveMetrics>,
718 alerts: Arc<AlertManager>,
719 market_registry: Arc<MarketRegistry>,
720 shutdown: ShutdownSignal,
721 public_connection: Arc<AtomicBool>,
722 private_connection: Option<Arc<AtomicBool>>,
723 bootstrap: Option<LiveBootstrap>,
724 ) -> Result<Self> {
725 let mut strategy_ctx = StrategyContext::new(settings.history);
726 strategy_ctx.attach_market_registry(market_registry.clone());
727 let mut persisted = match tokio::task::spawn_blocking({
728 let repo = state_repo.clone();
729 move || repo.load()
730 })
731 .await
732 {
733 Ok(Ok(state)) => state,
734 Ok(Err(err)) => {
735 warn!(error = %err, "failed to load live state; starting from defaults");
736 LiveState::default()
737 }
738 Err(err) => {
739 warn!(error = %err, "state load task failed; starting from defaults");
740 LiveState::default()
741 }
742 };
743 let mut live_bootstrap = None;
744 if let Some(data) = bootstrap {
745 persisted.open_orders = data.open_orders;
746 live_bootstrap = Some((data.positions, data.balances));
747 } else if matches!(settings.exec_backend, ExecutionBackend::Live) {
748 warn!("live session missing bootstrap data; continuing without remote snapshot");
749 }
750
751 let portfolio_cfg = PortfolioConfig {
752 initial_balances: settings.initial_balances.clone(),
753 reporting_currency: settings.reporting_currency,
754 max_drawdown: Some(settings.risk.max_drawdown),
755 };
756 let portfolio = if let Some((positions, balances)) = live_bootstrap {
757 Portfolio::from_exchange_state(
758 positions,
759 balances,
760 portfolio_cfg.clone(),
761 market_registry.clone(),
762 )
763 } else if let Some(snapshot) = persisted.portfolio.take() {
764 Portfolio::from_state(snapshot, portfolio_cfg.clone(), market_registry.clone())
765 } else {
766 Portfolio::new(portfolio_cfg.clone(), market_registry.clone())
767 };
768 strategy_ctx.update_positions(portfolio.positions());
769 if let Some(state) = persisted.strategy_state.take() {
771 info!("restoring strategy state from persistence");
772 strategy
773 .restore(state)
774 .context("failed to restore strategy state")?;
775 }
776 persisted.portfolio = Some(portfolio.snapshot());
777
778 let mut market_snapshots = HashMap::new();
779 for symbol in &symbols {
780 let mut snapshot = MarketSnapshot::default();
781 if let Some(price) = persisted.last_prices.get(symbol).copied() {
782 snapshot.last_trade = Some(price);
783 }
784 market_snapshots.insert(*symbol, snapshot);
785 }
786
787 metrics.update_connection_status("public", public_connection.load(Ordering::SeqCst));
788 if let Some(flag) = &private_connection {
789 metrics.update_connection_status("private", flag.load(Ordering::SeqCst));
790 }
791 let metrics_task = spawn_metrics_server(metrics.registry(), settings.metrics_addr);
792 let (private_event_tx, private_event_rx) = mpsc::channel(1024);
793 let last_private_sync = Arc::new(tokio::sync::Mutex::new(persisted.last_candle_ts));
794 let alert_task = alerts.spawn_watchdog();
795 let mut connection_monitors = Vec::new();
796 connection_monitors.push(spawn_connection_monitor(
797 shutdown.clone(),
798 public_connection.clone(),
799 metrics.clone(),
800 "public",
801 ));
802 if let Some(flag) = private_connection.clone() {
803 connection_monitors.push(spawn_connection_monitor(
804 shutdown.clone(),
805 flag,
806 metrics.clone(),
807 "private",
808 ));
809 }
810
811 if !settings.exec_backend.is_paper() {
812 let router_handle = router.clone();
813 for route in &exchanges {
814 match route.driver.as_str() {
815 "bybit" | "" => {
816 #[cfg(feature = "bybit")]
817 {
818 let bybit = route
819 .execution
820 .as_ref()
821 .as_any()
822 .downcast_ref::<BybitClient>()
823 .ok_or_else(|| {
824 anyhow!("execution client for {} is not Bybit", route.name)
825 })?;
826 let creds = bybit.get_credentials().ok_or_else(|| {
827 anyhow!("live execution requires Bybit credentials")
828 })?;
829 spawn_bybit_private_stream(
830 creds,
831 bybit.get_ws_url(),
832 private_event_tx.clone(),
833 route.execution.clone(),
834 symbols.clone(),
835 last_private_sync.clone(),
836 private_connection.clone(),
837 metrics.clone(),
838 router_handle.clone(),
839 shutdown.clone(),
840 );
841 }
842 #[cfg(not(feature = "bybit"))]
843 {
844 bail!("driver 'bybit' is unavailable without the 'bybit' feature");
845 }
846 }
847 "binance" => {
848 #[cfg(feature = "binance")]
849 {
850 spawn_binance_private_stream(
851 route.execution.clone(),
852 route.ws_url.clone(),
853 private_event_tx.clone(),
854 private_connection.clone(),
855 metrics.clone(),
856 router_handle.clone(),
857 shutdown.clone(),
858 );
859 }
860 #[cfg(not(feature = "binance"))]
861 {
862 bail!("driver 'binance' is unavailable without the 'binance' feature");
863 }
864 }
865 "paper" => {}
866 other => {
867 bail!("private stream unsupported for driver '{other}'");
868 }
869 }
870 }
871 }
872
873 let recorder = if let Some(record_path) = settings.record_path.clone() {
874 let config = RecorderConfig {
875 root: record_path.clone(),
876 ..RecorderConfig::default()
877 };
878 match ParquetRecorder::spawn(config).await {
879 Ok(recorder) => {
880 info!(path = %record_path.display(), "flight recorder enabled");
881 Some(recorder)
882 }
883 Err(err) => {
884 warn!(
885 error = %err,
886 path = %record_path.display(),
887 "failed to start flight recorder"
888 );
889 None
890 }
891 }
892 } else {
893 None
894 };
895 let recorder_handle = recorder.as_ref().map(|rec| rec.handle());
896
897 let strategy = Arc::new(Mutex::new(strategy));
898 let strategy_ctx = Arc::new(Mutex::new(strategy_ctx));
899 let portfolio = Arc::new(Mutex::new(portfolio));
900 let market_cache = Arc::new(Mutex::new(market_snapshots));
901 let persisted = Arc::new(Mutex::new(persisted));
902 let orchestrator = Arc::new(orchestrator);
903 let event_bus = Arc::new(EventBus::new(2048));
904 let last_data_timestamp = Arc::new(AtomicI64::new(0));
905 let control_task = control::spawn_control_plane(
906 settings.control_addr,
907 control::ControlPlaneComponents {
908 portfolio: portfolio.clone(),
909 orchestrator: orchestrator.clone(),
910 persisted: persisted.clone(),
911 last_data_timestamp: last_data_timestamp.clone(),
912 event_bus: event_bus.clone(),
913 strategy: strategy.clone(),
914 shutdown: shutdown.clone(),
915 },
916 );
917 let reconciliation_ctx = (!settings.exec_backend.is_paper()).then(|| {
918 Arc::new(ReconciliationContext::new(ReconciliationContextConfig {
919 client: orchestrator.execution_engine().client(),
920 portfolio: portfolio.clone(),
921 persisted: persisted.clone(),
922 state_repo: state_repo.clone(),
923 alerts: alerts.clone(),
924 metrics: metrics.clone(),
925 reporting_currency: settings.reporting_currency,
926 threshold: settings.reconciliation_threshold,
927 }))
928 });
929 let reconciliation_task = reconciliation_ctx.as_ref().map(|ctx| {
930 spawn_reconciliation_loop(
931 ctx.clone(),
932 shutdown.clone(),
933 settings.reconciliation_interval,
934 )
935 });
936 let driver_summary = Arc::new(if exchanges.is_empty() {
937 "unknown".to_string()
938 } else {
939 exchanges
940 .iter()
941 .map(|route| route.driver.clone())
942 .collect::<Vec<_>>()
943 .join(",")
944 });
945 let subscriber_handles = spawn_event_subscribers(
946 event_bus.clone(),
947 strategy.clone(),
948 strategy_ctx.clone(),
949 orchestrator.clone(),
950 portfolio.clone(),
951 metrics.clone(),
952 alerts.clone(),
953 market_cache.clone(),
954 state_repo.clone(),
955 persisted.clone(),
956 settings.exec_backend,
957 recorder_handle.clone(),
958 last_data_timestamp.clone(),
959 driver_summary.clone(),
960 market_registry.clone(),
961 );
962 let order_timeout_task = spawn_order_timeout_monitor(
963 orchestrator.clone(),
964 event_bus.clone(),
965 alerts.clone(),
966 shutdown.clone(),
967 );
968
969 info!(
970 symbols = ?symbols,
971 category = ?settings.category,
972 metrics_addr = %settings.metrics_addr,
973 state_path = %settings.persistence.state_path.display(),
974 persistence_engine = ?settings.persistence.engine,
975 history = settings.history,
976 "market stream ready"
977 );
978
979 for symbol in &symbols {
980 let ctx = shared_risk_context(
981 *symbol,
982 &portfolio,
983 &market_cache,
984 &persisted,
985 &market_registry,
986 )
987 .await;
988 orchestrator.update_risk_context(*symbol, ctx);
989 }
990
991 Ok(Self {
992 market,
993 orchestrator,
994 state_repo,
995 persisted,
996 event_bus,
997 recorder,
998 control_task: Some(control_task),
999 shutdown,
1000 metrics_task,
1001 alert_task,
1002 reconciliation_task,
1003 reconciliation_ctx,
1004 private_event_rx,
1005 last_private_sync,
1006 subscriber_handles,
1007 connection_monitors,
1008 order_timeout_task,
1009 strategy,
1010 _public_connection: public_connection,
1011 _private_connection: private_connection,
1012 })
1013 }
1014
1015 async fn run(mut self) -> Result<()> {
1016 info!("live session started");
1017 if let Some(ctx) = self.reconciliation_ctx.as_ref() {
1018 perform_state_reconciliation(ctx.as_ref())
1019 .await
1020 .context("initial state reconciliation failed")?;
1021 }
1022 let backoff = Duration::from_millis(200);
1023 let mut orchestrator_timer = tokio::time::interval(Duration::from_secs(1));
1024
1025 'run: while !self.shutdown.triggered() {
1026 let mut progressed = false;
1027
1028 let tick = tokio::select! {
1029 res = tokio::time::timeout(MARKET_EVENT_TIMEOUT, self.market.next_tick()) => Some(res),
1030 _ = self.shutdown.wait() => None,
1031 };
1032 match tick {
1033 Some(Ok(Ok(Some(tick)))) => {
1034 progressed = true;
1035 self.event_bus.publish(Event::Tick(TickEvent { tick }));
1036 }
1037 Some(Ok(Ok(None))) => {}
1038 Some(Ok(Err(err))) => return Err(err.into()),
1039 Some(Err(_)) => {}
1040 None => break 'run,
1041 }
1042
1043 let candle = tokio::select! {
1044 res = tokio::time::timeout(MARKET_EVENT_TIMEOUT, self.market.next_candle()) => Some(res),
1045 _ = self.shutdown.wait() => None,
1046 };
1047 match candle {
1048 Some(Ok(Ok(Some(candle)))) => {
1049 progressed = true;
1050 self.event_bus
1051 .publish(Event::Candle(CandleEvent { candle }));
1052 }
1053 Some(Ok(Ok(None))) => {}
1054 Some(Ok(Err(err))) => return Err(err.into()),
1055 Some(Err(_)) => {}
1056 None => break 'run,
1057 }
1058
1059 let book = tokio::select! {
1060 res = tokio::time::timeout(MARKET_EVENT_TIMEOUT, self.market.next_order_book()) => Some(res),
1061 _ = self.shutdown.wait() => None,
1062 };
1063 match book {
1064 Some(Ok(Ok(Some(book)))) => {
1065 progressed = true;
1066 self.event_bus
1067 .publish(Event::OrderBook(OrderBookEvent { order_book: book }));
1068 }
1069 Some(Ok(Ok(None))) => {}
1070 Some(Ok(Err(err))) => return Err(err.into()),
1071 Some(Err(_)) => {}
1072 None => break 'run,
1073 }
1074
1075 tokio::select! {
1076 biased;
1077 Some(event) = self.private_event_rx.recv() => {
1078 progressed = true;
1079 match event {
1080 BrokerEvent::OrderUpdate(order) => {
1081 info!(
1082 order_id = %order.id,
1083 status = ?order.status,
1084 symbol = %order.request.symbol,
1085 "received private order update"
1086 );
1087 self.event_bus
1088 .publish(Event::OrderUpdate(OrderUpdateEvent { order }));
1089 }
1090 BrokerEvent::Fill(fill) => {
1091 info!(
1092 order_id = %fill.order_id,
1093 symbol = %fill.symbol,
1094 qty = %fill.fill_quantity,
1095 price = %fill.fill_price,
1096 "received private fill"
1097 );
1098 self.event_bus.publish(Event::Fill(FillEvent { fill }));
1099 }
1100 }
1101 }
1102 _ = orchestrator_timer.tick() => {
1103 if let Err(e) = self.orchestrator.on_timer_tick().await {
1105 error!("Orchestrator timer tick failed: {}", e);
1106 }
1107 }
1108 _ = self.shutdown.wait() => break 'run,
1109 else => {}
1110 }
1111
1112 if !progressed && !self.shutdown.sleep(backoff).await {
1113 break;
1114 }
1115 }
1116 info!("live session stopping");
1117 self.metrics_task.abort();
1118 if let Some(handle) = self.alert_task.take() {
1119 handle.abort();
1120 }
1121 if let Some(handle) = self.reconciliation_task.take() {
1122 handle.abort();
1123 }
1124 self.order_timeout_task.abort();
1125 for handle in self.subscriber_handles.drain(..) {
1126 handle.abort();
1127 }
1128 for handle in self.connection_monitors.drain(..) {
1129 handle.abort();
1130 }
1131 if let Err(err) = persist_state(
1132 self.state_repo.clone(),
1133 self.persisted.clone(),
1134 Some(self.strategy.clone()),
1135 )
1136 .await
1137 {
1138 warn!(error = %err, "failed to persist shutdown state");
1139 }
1140 if let Some(task) = self.control_task.take() {
1141 if let Err(err) = task.await {
1142 warn!(error = %err, "control plane server task aborted");
1143 }
1144 }
1145 if let Some(recorder) = self.recorder.take() {
1146 if let Err(err) = recorder.shutdown().await {
1147 warn!(error = %err, "failed to flush flight recorder");
1148 }
1149 }
1150 Ok(())
1151 }
1152}
1153
1154struct ReconciliationContext {
1155 client: Arc<dyn ExecutionClient>,
1156 portfolio: Arc<Mutex<Portfolio>>,
1157 persisted: Arc<Mutex<LiveState>>,
1158 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1159 alerts: Arc<AlertManager>,
1160 metrics: Arc<LiveMetrics>,
1161 reporting_currency: AssetId,
1162 threshold: Decimal,
1163}
1164
1165struct ReconciliationContextConfig {
1166 client: Arc<dyn ExecutionClient>,
1167 portfolio: Arc<Mutex<Portfolio>>,
1168 persisted: Arc<Mutex<LiveState>>,
1169 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1170 alerts: Arc<AlertManager>,
1171 metrics: Arc<LiveMetrics>,
1172 reporting_currency: AssetId,
1173 threshold: Decimal,
1174}
1175
1176impl ReconciliationContext {
1177 fn new(config: ReconciliationContextConfig) -> Self {
1178 let ReconciliationContextConfig {
1179 client,
1180 portfolio,
1181 persisted,
1182 state_repo,
1183 alerts,
1184 metrics,
1185 reporting_currency,
1186 threshold,
1187 } = config;
1188 let min_threshold = Decimal::new(1, 6); let threshold = if threshold <= Decimal::ZERO {
1190 min_threshold
1191 } else {
1192 threshold
1193 };
1194 Self {
1195 client,
1196 portfolio,
1197 persisted,
1198 state_repo,
1199 alerts,
1200 metrics,
1201 reporting_currency,
1202 threshold,
1203 }
1204 }
1205}
1206
1207fn spawn_reconciliation_loop(
1208 ctx: Arc<ReconciliationContext>,
1209 shutdown: ShutdownSignal,
1210 interval: Duration,
1211) -> JoinHandle<()> {
1212 tokio::spawn(async move {
1213 while shutdown.sleep(interval).await {
1214 if let Err(err) = perform_state_reconciliation(ctx.as_ref()).await {
1215 error!(error = %err, "periodic state reconciliation failed");
1216 }
1217 }
1218 })
1219}
1220
1221async fn perform_state_reconciliation(ctx: &ReconciliationContext) -> Result<()> {
1222 info!("running state reconciliation");
1223 let remote_positions = ctx
1224 .client
1225 .positions()
1226 .await
1227 .context("failed to fetch remote positions")?;
1228 let remote_balances = ctx
1229 .client
1230 .account_balances()
1231 .await
1232 .context("failed to fetch remote balances")?;
1233 let (local_positions, local_cash) = {
1234 let guard = ctx.portfolio.lock().await;
1235 (guard.positions(), guard.cash())
1236 };
1237
1238 let remote_map = positions_to_map(remote_positions);
1239 let local_map = positions_to_map(local_positions);
1240 let mut tracked_symbols: HashSet<Symbol> = HashSet::new();
1241 tracked_symbols.extend(remote_map.keys().cloned());
1242 tracked_symbols.extend(local_map.keys().cloned());
1243
1244 let mut severe_findings = Vec::new();
1245 for symbol in tracked_symbols {
1246 let local_qty = local_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
1247 let remote_qty = remote_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
1248 let diff = (local_qty - remote_qty).abs();
1249 let diff_value = diff.to_f64().unwrap_or(0.0);
1250 let symbol_name = symbol.code().to_string();
1251 ctx.metrics.update_position_diff(&symbol_name, diff_value);
1252 if diff > Decimal::ZERO {
1253 warn!(
1254 symbol = %symbol_name,
1255 local = %local_qty,
1256 remote = %remote_qty,
1257 diff = %diff,
1258 "position mismatch detected during reconciliation"
1259 );
1260 let pct = normalize_diff(diff, remote_qty);
1261 if pct >= ctx.threshold {
1262 error!(
1263 symbol = %symbol_name,
1264 local = %local_qty,
1265 remote = %remote_qty,
1266 diff = %diff,
1267 pct = %pct,
1268 "position mismatch exceeds threshold"
1269 );
1270 severe_findings.push(format!(
1271 "{symbol_name} local={local_qty} remote={remote_qty} diff={diff}"
1272 ));
1273 }
1274 }
1275 }
1276
1277 let reporting = ctx.reporting_currency;
1278 let reporting_label = reporting.to_string();
1279 let remote_cash = remote_balances
1280 .iter()
1281 .find(|balance| balance.asset == reporting)
1282 .map(|balance| balance.available)
1283 .unwrap_or_else(|| Decimal::ZERO);
1284 let cash_diff = (remote_cash - local_cash).abs();
1285 ctx.metrics
1286 .update_balance_diff(&reporting_label, cash_diff.to_f64().unwrap_or(0.0));
1287 if cash_diff > Decimal::ZERO {
1288 warn!(
1289 currency = %reporting_label,
1290 local = %local_cash,
1291 remote = %remote_cash,
1292 diff = %cash_diff,
1293 "balance mismatch detected during reconciliation"
1294 );
1295 let pct = normalize_diff(cash_diff, remote_cash);
1296 if pct >= ctx.threshold {
1297 error!(
1298 currency = %reporting_label,
1299 local = %local_cash,
1300 remote = %remote_cash,
1301 diff = %cash_diff,
1302 pct = %pct,
1303 "balance mismatch exceeds threshold"
1304 );
1305 severe_findings.push(format!(
1306 "{reporting_label} balance local={local_cash} remote={remote_cash} diff={cash_diff}"
1307 ));
1308 }
1309 }
1310
1311 if severe_findings.is_empty() {
1312 info!("state reconciliation complete with no critical divergence");
1313 return Ok(());
1314 }
1315
1316 let alert_body = severe_findings.join("; ");
1317 ctx.alerts
1318 .notify("State reconciliation divergence", &alert_body)
1319 .await;
1320 enforce_liquidate_only(ctx).await;
1321 Ok(())
1322}
1323
1324async fn enforce_liquidate_only(ctx: &ReconciliationContext) {
1325 let snapshot = {
1326 let mut guard = ctx.portfolio.lock().await;
1327 if !guard.set_liquidate_only(true) {
1328 return;
1329 }
1330 info!("entering liquidate-only mode due to reconciliation divergence");
1331 guard.snapshot()
1332 };
1333 {
1334 let mut state = ctx.persisted.lock().await;
1335 state.portfolio = Some(snapshot);
1336 }
1337 if let Err(err) = persist_state(ctx.state_repo.clone(), ctx.persisted.clone(), None).await {
1338 warn!(error = %err, "failed to persist liquidate-only transition");
1339 }
1340}
1341
1342fn positions_to_map(positions: Vec<Position>) -> HashMap<Symbol, Decimal> {
1343 let mut map = HashMap::new();
1344 for position in positions {
1345 map.insert(position.symbol, position_signed_qty(&position));
1346 }
1347 map
1348}
1349
1350fn position_signed_qty(position: &Position) -> Decimal {
1351 match position.side {
1352 Some(Side::Buy) => position.quantity,
1353 Some(Side::Sell) => -position.quantity,
1354 None => Decimal::ZERO,
1355 }
1356}
1357
1358fn normalize_diff(diff: Decimal, reference: Decimal) -> Decimal {
1359 if diff <= Decimal::ZERO {
1360 Decimal::ZERO
1361 } else {
1362 let denominator = std::cmp::max(reference.abs(), Decimal::ONE);
1363 diff / denominator
1364 }
1365}
1366
1367fn build_exchange_payload(
1368 exchange: &ExchangeConfig,
1369 settings: &LiveSessionSettings,
1370 name: &str,
1371) -> Value {
1372 let mut payload = serde_json::Map::new();
1373 payload.insert("rest_url".into(), Value::String(exchange.rest_url.clone()));
1374 payload.insert("ws_url".into(), Value::String(exchange.ws_url.clone()));
1375 payload.insert("api_key".into(), Value::String(exchange.api_key.clone()));
1376 payload.insert(
1377 "api_secret".into(),
1378 Value::String(exchange.api_secret.clone()),
1379 );
1380 payload.insert(
1381 "category".into(),
1382 Value::String(settings.category.as_path().to_string()),
1383 );
1384 payload.insert("exchange".into(), Value::String(name.to_string()));
1385 payload.insert(
1386 "orderbook_depth".into(),
1387 Value::Number(serde_json::Number::from(settings.orderbook_depth as u64)),
1388 );
1389 if let Value::Object(extra) = exchange.params.clone() {
1390 for (key, value) in extra {
1391 payload.insert(key, value);
1392 }
1393 }
1394 Value::Object(payload)
1395}
1396
1397#[derive(Default)]
1398struct MarketSnapshot {
1399 last_trade: Option<Price>,
1400 last_trade_ts: Option<DateTime<Utc>>,
1401 last_candle: Option<Candle>,
1402}
1403
1404impl MarketSnapshot {
1405 fn price(&self) -> Option<Price> {
1406 self.last_trade
1407 .or_else(|| self.last_candle.as_ref().map(|c| c.close))
1408 }
1409}
1410
1411pub struct ShutdownSignal {
1412 flag: Arc<AtomicBool>,
1413 notify: Arc<Notify>,
1414}
1415
1416impl ShutdownSignal {
1417 pub fn new() -> Self {
1418 let flag = Arc::new(AtomicBool::new(false));
1419 let notify = Arc::new(Notify::new());
1420 let flag_clone = flag.clone();
1421 let notify_clone = notify.clone();
1422 tokio::spawn(async move {
1423 if tokio::signal::ctrl_c().await.is_ok() {
1424 flag_clone.store(true, Ordering::SeqCst);
1425 notify_clone.notify_waiters();
1426 }
1427 });
1428 Self { flag, notify }
1429 }
1430
1431 pub fn trigger(&self) {
1432 self.flag.store(true, Ordering::SeqCst);
1433 self.notify.notify_waiters();
1434 }
1435
1436 pub fn triggered(&self) -> bool {
1437 self.flag.load(Ordering::SeqCst)
1438 }
1439
1440 pub async fn wait(&self) {
1441 if self.triggered() {
1442 return;
1443 }
1444 self.notify.notified().await;
1445 }
1446
1447 async fn sleep(&self, duration: Duration) -> bool {
1448 tokio::select! {
1449 _ = tokio::time::sleep(duration) => true,
1450 _ = self.notify.notified() => false,
1451 }
1452 }
1453}
1454
1455impl Default for ShutdownSignal {
1456 fn default() -> Self {
1457 Self::new()
1458 }
1459}
1460
1461impl Clone for ShutdownSignal {
1462 fn clone(&self) -> Self {
1463 Self {
1464 flag: self.flag.clone(),
1465 notify: self.notify.clone(),
1466 }
1467 }
1468}
1469
1470#[allow(clippy::too_many_arguments)]
1471fn spawn_event_subscribers(
1472 bus: Arc<EventBus>,
1473 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1474 strategy_ctx: Arc<Mutex<StrategyContext>>,
1475 orchestrator: Arc<OrderOrchestrator>,
1476 portfolio: Arc<Mutex<Portfolio>>,
1477 metrics: Arc<LiveMetrics>,
1478 alerts: Arc<AlertManager>,
1479 market: Arc<Mutex<HashMap<Symbol, MarketSnapshot>>>,
1480 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1481 persisted: Arc<Mutex<LiveState>>,
1482 exec_backend: ExecutionBackend,
1483 recorder: Option<RecorderHandle>,
1484 last_data_timestamp: Arc<AtomicI64>,
1485 driver: Arc<String>,
1486 market_registry: Arc<MarketRegistry>,
1487) -> Vec<JoinHandle<()>> {
1488 let mut handles = Vec::new();
1489 let market_recorder = recorder.clone();
1490
1491 let market_bus = bus.clone();
1492 let market_strategy = strategy.clone();
1493 let market_ctx = strategy_ctx.clone();
1494 let market_metrics = metrics.clone();
1495 let market_alerts = alerts.clone();
1496 let market_state = state_repo.clone();
1497 let market_persisted = persisted.clone();
1498 let market_portfolio = portfolio.clone();
1499 let market_snapshot = market.clone();
1500 let orchestrator_clone = orchestrator.clone();
1501 let market_data_tracker = last_data_timestamp.clone();
1502 let market_catalog = market_registry.clone();
1503 let driver_clone = driver.clone();
1504 handles.push(tokio::spawn(async move {
1505 let recorder = market_recorder;
1506 let mut stream = market_bus.subscribe();
1507 loop {
1508 match stream.recv().await {
1509 Ok(Event::Tick(evt)) => {
1510 if let Some(handle) = recorder.as_ref() {
1511 handle.record_tick(evt.tick.clone());
1512 }
1513 if let Err(err) = process_tick_event(
1514 evt.tick,
1515 market_strategy.clone(),
1516 market_ctx.clone(),
1517 market_metrics.clone(),
1518 market_alerts.clone(),
1519 market_snapshot.clone(),
1520 market_portfolio.clone(),
1521 market_state.clone(),
1522 market_persisted.clone(),
1523 market_bus.clone(),
1524 market_data_tracker.clone(),
1525 market_catalog.clone(),
1526 )
1527 .await
1528 {
1529 warn!(error = %err, "tick handler failed");
1530 }
1531 }
1532 Ok(Event::Candle(evt)) => {
1533 if let Some(handle) = recorder.as_ref() {
1534 handle.record_candle(evt.candle.clone());
1535 }
1536 if let Err(err) = process_candle_event(
1537 evt.candle,
1538 market_strategy.clone(),
1539 market_ctx.clone(),
1540 market_metrics.clone(),
1541 market_alerts.clone(),
1542 market_snapshot.clone(),
1543 market_portfolio.clone(),
1544 orchestrator_clone.clone(),
1545 exec_backend,
1546 market_state.clone(),
1547 market_persisted.clone(),
1548 market_bus.clone(),
1549 market_data_tracker.clone(),
1550 market_catalog.clone(),
1551 )
1552 .await
1553 {
1554 warn!(error = %err, "candle handler failed");
1555 }
1556 }
1557 Ok(Event::OrderBook(evt)) => {
1558 if let Some(handle) = recorder.as_ref() {
1559 handle.record_order_book(evt.order_book.clone());
1560 }
1561 if let Err(err) = process_order_book_event(
1562 evt.order_book,
1563 market_strategy.clone(),
1564 market_ctx.clone(),
1565 market_metrics.clone(),
1566 market_alerts.clone(),
1567 market_snapshot.clone(),
1568 market_bus.clone(),
1569 market_data_tracker.clone(),
1570 driver_clone.clone(),
1571 market_catalog.clone(),
1572 )
1573 .await
1574 {
1575 warn!(error = %err, "order book handler failed");
1576 }
1577 }
1578 Ok(_) => {}
1579 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1580 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1581 warn!(lag = lag, "market subscriber lagged");
1582 continue;
1583 }
1584 }
1585 }
1586 }));
1587
1588 let exec_bus = bus.clone();
1589 let exec_portfolio = portfolio.clone();
1590 let exec_market = market.clone();
1591 let exec_persisted = persisted.clone();
1592 let exec_alerts = alerts.clone();
1593 let exec_metrics = metrics.clone();
1594 let exec_orchestrator = orchestrator.clone();
1595 let exec_recorder = recorder.clone();
1596 handles.push(tokio::spawn(async move {
1597 let orchestrator = exec_orchestrator.clone();
1598 let recorder = exec_recorder;
1599 let mut stream = exec_bus.subscribe();
1600 loop {
1601 match stream.recv().await {
1602 Ok(Event::Signal(evt)) => {
1603 if let Some(handle) = recorder.as_ref() {
1604 handle.record_signal(evt.signal.clone());
1605 }
1606 if let Err(err) = process_signal_event(
1607 evt.signal,
1608 orchestrator.clone(),
1609 exec_portfolio.clone(),
1610 exec_market.clone(),
1611 exec_persisted.clone(),
1612 exec_alerts.clone(),
1613 exec_metrics.clone(),
1614 market_registry.clone(),
1615 )
1616 .await
1617 {
1618 warn!(error = %err, "signal handler failed");
1619 }
1620 }
1621 Ok(_) => {}
1622 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1623 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1624 warn!(lag = lag, "signal subscriber lagged");
1625 continue;
1626 }
1627 }
1628 }
1629 }));
1630
1631 let fill_bus = bus.clone();
1632 let fill_state = state_repo.clone();
1633 let fill_orchestrator = orchestrator.clone();
1634 let fill_persisted = persisted.clone();
1635 let fill_alerts = alerts.clone();
1636 let fill_recorder = recorder.clone();
1637 handles.push(tokio::spawn(async move {
1638 let orchestrator = fill_orchestrator.clone();
1639 let persisted = fill_persisted.clone();
1640 let recorder = fill_recorder;
1641 let mut stream = fill_bus.subscribe();
1642 loop {
1643 match stream.recv().await {
1644 Ok(Event::Fill(evt)) => {
1645 if let Some(handle) = recorder.as_ref() {
1646 handle.record_fill(evt.fill.clone());
1647 }
1648 if let Err(err) = process_fill_event(
1649 evt.fill,
1650 portfolio.clone(),
1651 strategy.clone(),
1652 strategy_ctx.clone(),
1653 orchestrator.clone(),
1654 metrics.clone(),
1655 fill_alerts.clone(),
1656 fill_state.clone(),
1657 persisted.clone(),
1658 )
1659 .await
1660 {
1661 warn!(error = ?err, "fill handler failed");
1662 }
1663 }
1664 Ok(_) => {}
1665 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1666 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1667 warn!(lag = lag, "fill subscriber lagged");
1668 continue;
1669 }
1670 }
1671 }
1672 }));
1673
1674 let order_bus = bus.clone();
1675 let order_persisted = persisted.clone();
1676 let order_alerts = alerts.clone();
1677 let order_orchestrator = orchestrator.clone();
1678 let order_recorder = recorder;
1681 handles.push(tokio::spawn(async move {
1682 let orchestrator = order_orchestrator.clone();
1683 let persisted = order_persisted.clone();
1684 let recorder = order_recorder;
1685 let mut stream = order_bus.subscribe();
1686 loop {
1687 match stream.recv().await {
1688 Ok(Event::OrderUpdate(evt)) => {
1689 if let Some(handle) = recorder.as_ref() {
1690 handle.record_order(evt.order.clone());
1691 }
1692 if let Err(err) = process_order_update_event(
1693 evt.order,
1694 orchestrator.clone(),
1695 order_alerts.clone(),
1696 state_repo.clone(),
1697 persisted.clone(),
1698 )
1699 .await
1700 {
1701 warn!(error = %err, "order update handler failed");
1702 }
1703 }
1704 Ok(_) => {}
1705 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1706 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1707 warn!(lag = lag, "order subscriber lagged");
1708 continue;
1709 }
1710 }
1711 }
1712 }));
1713
1714 handles
1715}
1716
1717#[allow(clippy::too_many_arguments)]
1718async fn process_tick_event(
1719 tick: Tick,
1720 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1721 strategy_ctx: Arc<Mutex<StrategyContext>>,
1722 metrics: Arc<LiveMetrics>,
1723 alerts: Arc<AlertManager>,
1724 market: Arc<Mutex<HashMap<Symbol, MarketSnapshot>>>,
1725 portfolio: Arc<Mutex<Portfolio>>,
1726 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1727 persisted: Arc<Mutex<LiveState>>,
1728 bus: Arc<EventBus>,
1729 last_data_timestamp: Arc<AtomicI64>,
1730 market_registry: Arc<MarketRegistry>,
1731) -> Result<()> {
1732 metrics.inc_tick();
1733 metrics.update_staleness(0.0);
1734 metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1735 last_data_timestamp.store(tick.exchange_timestamp.timestamp(), Ordering::SeqCst);
1736 alerts.heartbeat().await;
1737 {
1738 let mut guard = market.lock().await;
1739 if let Some(snapshot) = guard.get_mut(&tick.symbol) {
1740 snapshot.last_trade = Some(tick.price);
1741 snapshot.last_trade_ts = Some(tick.exchange_timestamp);
1742 }
1743 }
1744 let mut drawdown_triggered = false;
1745 let mut snapshot_on_trigger = None;
1746 {
1747 let mut guard = portfolio.lock().await;
1748 let was_liquidate_only = guard.liquidate_only();
1749 match guard.update_market_data(tick.symbol, tick.price) {
1750 Ok(_) => {
1751 if !was_liquidate_only && guard.liquidate_only() {
1752 drawdown_triggered = true;
1753 snapshot_on_trigger = Some(guard.snapshot());
1754 }
1755 }
1756 Err(err) => {
1757 warn!(
1758 symbol = %tick.symbol,
1759 error = %err,
1760 "failed to refresh market data"
1761 );
1762 }
1763 }
1764 }
1765 {
1766 let mut state = persisted.lock().await;
1767 state.last_prices.insert(tick.symbol, tick.price);
1768 if drawdown_triggered {
1769 if let Some(snapshot) = snapshot_on_trigger.take() {
1770 state.portfolio = Some(snapshot);
1771 }
1772 }
1773 }
1774 if drawdown_triggered {
1775 persist_state(
1776 state_repo.clone(),
1777 persisted.clone(),
1778 Some(strategy.clone()),
1779 )
1780 .await?;
1781 alert_liquidate_only(alerts.clone()).await;
1782 }
1783 {
1784 let mut ctx = strategy_ctx.lock().await;
1785 ctx.push_tick(tick.clone());
1786 let lock_start = Instant::now();
1787 let mut strat = strategy.lock().await;
1788 log_strategy_lock("tick", lock_start.elapsed());
1789 let call_start = Instant::now();
1790 strat
1791 .on_tick(&ctx, &tick)
1792 .await
1793 .context("strategy failure on tick event")?;
1794 log_strategy_call("tick", call_start.elapsed());
1795 }
1796 emit_signals(
1797 strategy.clone(),
1798 bus.clone(),
1799 metrics.clone(),
1800 market_registry.clone(),
1801 )
1802 .await;
1803 debug!(symbol = %tick.symbol, price = %tick.price, "completed tick processing");
1804 Ok(())
1805}
1806
1807#[allow(clippy::too_many_arguments)]
1808async fn process_candle_event(
1809 candle: Candle,
1810 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1811 strategy_ctx: Arc<Mutex<StrategyContext>>,
1812 metrics: Arc<LiveMetrics>,
1813 alerts: Arc<AlertManager>,
1814 market: Arc<Mutex<HashMap<Symbol, MarketSnapshot>>>,
1815 portfolio: Arc<Mutex<Portfolio>>,
1816 orchestrator: Arc<OrderOrchestrator>,
1817 exec_backend: ExecutionBackend,
1818 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1819 persisted: Arc<Mutex<LiveState>>,
1820 bus: Arc<EventBus>,
1821 last_data_timestamp: Arc<AtomicI64>,
1822 market_registry: Arc<MarketRegistry>,
1823) -> Result<()> {
1824 metrics.inc_candle();
1825 metrics.update_staleness(0.0);
1826 metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1827 last_data_timestamp.store(candle.timestamp.timestamp(), Ordering::SeqCst);
1828 alerts.heartbeat().await;
1829 let candle_label = candle.symbol.code().to_string();
1830 metrics.update_price(&candle_label, candle.close.to_f64().unwrap_or(0.0));
1831 {
1832 let mut guard = market.lock().await;
1833 if let Some(snapshot) = guard.get_mut(&candle.symbol) {
1834 snapshot.last_candle = Some(candle.clone());
1835 snapshot.last_trade = Some(candle.close);
1836 }
1837 }
1838 if exec_backend.is_paper() {
1839 let client = orchestrator.execution_engine().client();
1840 if let Some(paper) = client.as_any().downcast_ref::<PaperExecutionClient>() {
1841 paper.update_price(&candle.symbol, candle.close);
1842 }
1843 }
1844 let mut candle_drawdown_triggered = false;
1845 let mut candle_snapshot = None;
1846 {
1847 let mut guard = portfolio.lock().await;
1848 let was_liquidate_only = guard.liquidate_only();
1849 match guard.update_market_data(candle.symbol, candle.close) {
1850 Ok(_) => {
1851 if !was_liquidate_only && guard.liquidate_only() {
1852 candle_drawdown_triggered = true;
1853 candle_snapshot = Some(guard.snapshot());
1854 }
1855 }
1856 Err(err) => {
1857 warn!(
1858 symbol = %candle.symbol,
1859 error = %err,
1860 "failed to refresh market data"
1861 );
1862 }
1863 }
1864 }
1865 if candle_drawdown_triggered {
1866 if let Some(snapshot) = candle_snapshot.take() {
1867 let mut persisted_guard = persisted.lock().await;
1868 persisted_guard.portfolio = Some(snapshot);
1869 }
1870 alert_liquidate_only(alerts.clone()).await;
1871 }
1872 {
1873 let mut ctx = strategy_ctx.lock().await;
1874 ctx.push_candle(candle.clone());
1875 let lock_start = Instant::now();
1876 let mut strat = strategy.lock().await;
1877 log_strategy_lock("candle", lock_start.elapsed());
1878 let call_start = Instant::now();
1879 strat
1880 .on_candle(&ctx, &candle)
1881 .await
1882 .context("strategy failure on candle event")?;
1883 log_strategy_call("candle", call_start.elapsed());
1884 }
1885 {
1886 let mut snapshot = persisted.lock().await;
1887 snapshot.last_candle_ts = Some(candle.timestamp);
1888 snapshot.last_prices.insert(candle.symbol, candle.close);
1889 }
1890 persist_state(
1891 state_repo.clone(),
1892 persisted.clone(),
1893 Some(strategy.clone()),
1894 )
1895 .await?;
1896 let ctx = shared_risk_context(
1897 candle.symbol,
1898 &portfolio,
1899 &market,
1900 &persisted,
1901 &market_registry,
1902 )
1903 .await;
1904 orchestrator.update_risk_context(candle.symbol, ctx);
1905 emit_signals(
1906 strategy.clone(),
1907 bus.clone(),
1908 metrics.clone(),
1909 market_registry.clone(),
1910 )
1911 .await;
1912 debug!(symbol = %candle.symbol, close = %candle.close, "completed candle processing");
1913 Ok(())
1914}
1915
1916#[allow(clippy::too_many_arguments)]
1917async fn process_order_book_event(
1918 mut book: OrderBook,
1919 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1920 strategy_ctx: Arc<Mutex<StrategyContext>>,
1921 metrics: Arc<LiveMetrics>,
1922 alerts: Arc<AlertManager>,
1923 _market: Arc<Mutex<HashMap<Symbol, MarketSnapshot>>>,
1924 bus: Arc<EventBus>,
1925 last_data_timestamp: Arc<AtomicI64>,
1926 driver: Arc<String>,
1927 market_registry: Arc<MarketRegistry>,
1928) -> Result<()> {
1929 metrics.update_staleness(0.0);
1930 alerts.heartbeat().await;
1931 last_data_timestamp.store(book.timestamp.timestamp(), Ordering::SeqCst);
1932 let driver_name = driver.as_str();
1933 let local_checksum = if let Some(cs) = book.local_checksum {
1934 cs
1935 } else {
1936 let computed = book.computed_checksum(None);
1937 book.local_checksum = Some(computed);
1938 computed
1939 };
1940 let symbol_label = book.symbol.code().to_string();
1941 if let Some(expected) = book.exchange_checksum {
1942 if expected != local_checksum {
1943 metrics.inc_checksum_mismatch(driver_name, &symbol_label);
1944 alerts
1945 .order_book_checksum_mismatch(driver_name, &symbol_label, expected, local_checksum)
1946 .await;
1947 }
1948 }
1949 {
1950 let mut ctx = strategy_ctx.lock().await;
1951 ctx.push_order_book(book.clone());
1952 let lock_start = Instant::now();
1953 let mut strat = strategy.lock().await;
1954 log_strategy_lock("order_book", lock_start.elapsed());
1955 let call_start = Instant::now();
1956 strat
1957 .on_order_book(&ctx, &book)
1958 .await
1959 .context("strategy failure on order book")?;
1960 log_strategy_call("order_book", call_start.elapsed());
1961 }
1962 emit_signals(
1963 strategy.clone(),
1964 bus.clone(),
1965 metrics.clone(),
1966 market_registry.clone(),
1967 )
1968 .await;
1969 Ok(())
1970}
1971
1972#[allow(clippy::too_many_arguments)]
1973async fn process_signal_event(
1974 signal: Signal,
1975 orchestrator: Arc<OrderOrchestrator>,
1976 portfolio: Arc<Mutex<Portfolio>>,
1977 market: Arc<Mutex<HashMap<Symbol, MarketSnapshot>>>,
1978 persisted: Arc<Mutex<LiveState>>,
1979 alerts: Arc<AlertManager>,
1980 metrics: Arc<LiveMetrics>,
1981 market_registry: Arc<MarketRegistry>,
1982) -> Result<()> {
1983 let ctx = shared_risk_context(
1984 signal.symbol,
1985 &portfolio,
1986 &market,
1987 &persisted,
1988 &market_registry,
1989 )
1990 .await;
1991 orchestrator.update_risk_context(signal.symbol, ctx);
1992 match orchestrator.on_signal(&signal, &ctx).await {
1993 Ok(()) => {
1994 alerts.reset_order_failures().await;
1995 }
1996 Err(err) => {
1997 metrics.inc_order_failure();
1998 metrics.inc_router_failure("orchestrator");
1999 alerts
2000 .order_failure(&format!("orchestrator error: {err}"))
2001 .await;
2002 }
2003 }
2004 Ok(())
2005}
2006
2007fn log_strategy_lock(event: &str, wait: Duration) {
2008 let wait_ms = wait.as_secs_f64() * 1000.0;
2009 if wait >= STRATEGY_LOCK_WARN_THRESHOLD {
2010 warn!(target: "strategy", event, wait_ms, "strategy lock wait exceeded threshold");
2011 } else {
2012 trace!(target: "strategy", event, wait_ms, "strategy lock acquired");
2013 }
2014}
2015
2016fn log_strategy_call(event: &str, elapsed: Duration) {
2017 let duration_ms = elapsed.as_secs_f64() * 1000.0;
2018 if elapsed >= STRATEGY_CALL_WARN_THRESHOLD {
2019 warn!(target: "strategy", event, duration_ms, "strategy call latency above threshold");
2020 } else {
2021 trace!(target: "strategy", event, duration_ms, "strategy call completed");
2022 }
2023}
2024
2025#[allow(clippy::too_many_arguments)]
2026async fn process_fill_event(
2027 fill: Fill,
2028 portfolio: Arc<Mutex<Portfolio>>,
2029 strategy: Arc<Mutex<Box<dyn Strategy>>>,
2030 strategy_ctx: Arc<Mutex<StrategyContext>>,
2031 orchestrator: Arc<OrderOrchestrator>,
2032 metrics: Arc<LiveMetrics>,
2033 alerts: Arc<AlertManager>,
2034 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
2035 persisted: Arc<Mutex<LiveState>>,
2036) -> Result<()> {
2037 let mut drawdown_triggered = false;
2038 {
2039 let mut guard = portfolio.lock().await;
2040 let was_liquidate_only = guard.liquidate_only();
2041 guard
2042 .apply_fill(&fill)
2043 .with_context(|| format!("Failed to apply fill to portfolio for {}", fill.symbol))?;
2044 if !was_liquidate_only && guard.liquidate_only() {
2045 drawdown_triggered = true;
2046 }
2047 let snapshot = guard.snapshot();
2048 let mut persisted_guard = persisted.lock().await;
2049 persisted_guard.portfolio = Some(snapshot);
2050 }
2051 {
2052 let positions = {
2053 let guard = portfolio.lock().await;
2054 guard.positions()
2055 };
2056 let mut ctx = strategy_ctx.lock().await;
2057 ctx.update_positions(positions);
2058 }
2059 orchestrator.on_fill(&fill).await.ok();
2060 {
2061 let ctx = strategy_ctx.lock().await;
2062 let lock_start = Instant::now();
2063 let mut strat = strategy.lock().await;
2064 log_strategy_lock("fill", lock_start.elapsed());
2065 let call_start = Instant::now();
2066 strat
2067 .on_fill(&ctx, &fill)
2068 .await
2069 .context("Strategy failed on fill event")?;
2070 log_strategy_call("fill", call_start.elapsed());
2071 }
2072 let equity = {
2073 let guard = portfolio.lock().await;
2074 guard.equity()
2075 };
2076 if let Some(value) = equity.to_f64() {
2077 metrics.update_equity(value);
2078 }
2079 alerts.update_equity(equity).await;
2080 metrics.inc_order();
2081 alerts
2082 .notify(
2083 "Order Filled",
2084 &format!(
2085 "order filled: {}@{} ({})",
2086 fill.fill_quantity,
2087 fill.fill_price,
2088 match fill.side {
2089 Side::Buy => "buy",
2090 Side::Sell => "sell",
2091 }
2092 ),
2093 )
2094 .await;
2095 if drawdown_triggered {
2096 alert_liquidate_only(alerts.clone()).await;
2097 }
2098 persist_state(
2099 state_repo.clone(),
2100 persisted.clone(),
2101 Some(strategy.clone()),
2102 )
2103 .await?;
2104 Ok(())
2105}
2106
2107async fn process_order_update_event(
2108 order: Order,
2109 orchestrator: Arc<OrderOrchestrator>,
2110 alerts: Arc<AlertManager>,
2111 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
2112 persisted: Arc<Mutex<LiveState>>,
2113) -> Result<()> {
2114 orchestrator.on_order_update(&order).await;
2115 if matches!(order.status, OrderStatus::Rejected) {
2116 error!(
2117 order_id = %order.id,
2118 symbol = %order.request.symbol,
2119 "order rejected by exchange"
2120 );
2121 alerts.order_failure("order rejected by exchange").await;
2122 alerts
2123 .notify(
2124 "Order rejected",
2125 &format!(
2126 "Order {} for {} was rejected",
2127 order.id, order.request.symbol
2128 ),
2129 )
2130 .await;
2131 }
2132 {
2133 let mut snapshot = persisted.lock().await;
2134 let mut found = false;
2135 for existing in &mut snapshot.open_orders {
2136 if existing.id == order.id {
2137 *existing = order.clone();
2138 found = true;
2139 break;
2140 }
2141 }
2142 if !found {
2143 snapshot.open_orders.push(order.clone());
2144 }
2145 if matches!(
2146 order.status,
2147 OrderStatus::Filled | OrderStatus::Canceled | OrderStatus::Rejected
2148 ) {
2149 snapshot.open_orders.retain(|o| o.id != order.id);
2150 }
2151 }
2152 persist_state(state_repo, persisted, None).await?;
2153 Ok(())
2154}
2155
2156async fn emit_signals(
2157 strategy: Arc<Mutex<Box<dyn Strategy>>>,
2158 bus: Arc<EventBus>,
2159 metrics: Arc<LiveMetrics>,
2160 market_registry: Arc<MarketRegistry>,
2161) {
2162 let signals = {
2163 let mut strat = strategy.lock().await;
2164 let drained = strat.drain_signals();
2165 debug!(count = drained.len(), "strategy drained signals");
2166 drained
2167 };
2168 if signals.is_empty() {
2169 return;
2170 }
2171 metrics.inc_signals(signals.len());
2172 let mut normalized = signals;
2173 normalize_group_quantities(&mut normalized, &market_registry);
2174 for signal in normalized {
2175 debug!(id = %signal.id, symbol = %signal.symbol, kind = ?signal.kind, "publishing signal event");
2176 bus.publish(Event::Signal(SignalEvent { signal }));
2177 }
2178}
2179
2180fn normalize_group_quantities(signals: &mut [Signal], registry: &MarketRegistry) {
2181 use std::collections::HashMap;
2182
2183 assign_implicit_group_ids(signals);
2184
2185 let mut groups: HashMap<Uuid, Vec<usize>> = HashMap::new();
2186 for (idx, signal) in signals.iter().enumerate() {
2187 if let Some(group_id) = signal.group_id {
2188 groups.entry(group_id).or_default().push(idx);
2189 }
2190 }
2191 for indices in groups.values() {
2192 if indices.len() < 2 {
2193 continue;
2194 }
2195 let mut quantity = indices
2196 .iter()
2197 .filter_map(|idx| signals[*idx].quantity)
2198 .find(|qty| *qty > Decimal::ZERO);
2199 let mut step = Decimal::ZERO;
2200 for idx in indices {
2201 let symbol = signals[*idx].symbol;
2202 let Some(instr) = registry.get(symbol) else {
2203 quantity = None;
2204 break;
2205 };
2206 if instr.lot_size > step {
2207 step = instr.lot_size;
2208 }
2209 }
2210 let Some(mut qty) = quantity else {
2211 continue;
2212 };
2213 if step > Decimal::ZERO {
2214 qty = (qty / step).floor() * step;
2215 }
2216 if qty <= Decimal::ZERO {
2217 continue;
2218 }
2219 for idx in indices {
2220 signals[*idx].quantity = Some(qty);
2221 }
2222 }
2223}
2224
2225fn assign_implicit_group_ids(signals: &mut [Signal]) {
2226 use std::collections::HashMap;
2227
2228 let mut note_groups: HashMap<String, Vec<usize>> = HashMap::new();
2229 for (idx, signal) in signals.iter().enumerate() {
2230 if signal.group_id.is_some() {
2231 continue;
2232 }
2233 if let Some(note) = signal.note.as_deref() {
2234 if !note.is_empty() {
2235 note_groups.entry(note.to_string()).or_default().push(idx);
2236 }
2237 }
2238 }
2239 for indices in note_groups.values() {
2240 if indices.len() < 2 {
2241 continue;
2242 }
2243 let group = Uuid::new_v4();
2244 for idx in indices {
2245 signals[*idx].group_id = Some(group);
2246 }
2247 }
2248
2249 let mut untagged: Vec<usize> = signals
2250 .iter()
2251 .enumerate()
2252 .filter(|(_, signal)| signal.group_id.is_none())
2253 .map(|(idx, _)| idx)
2254 .collect();
2255 if untagged.len() == 2 {
2256 let a = signals[untagged[0]].kind;
2257 let b = signals[untagged[1]].kind;
2258 if signal_kind_family(a) == signal_kind_family(b) && signal_kind_family(a).is_some() {
2259 let group = Uuid::new_v4();
2260 for idx in untagged.drain(..) {
2261 signals[idx].group_id = Some(group);
2262 }
2263 }
2264 }
2265}
2266
2267fn signal_kind_family(kind: SignalKind) -> Option<u8> {
2268 match kind {
2269 SignalKind::EnterLong | SignalKind::EnterShort => Some(0),
2270 SignalKind::ExitLong | SignalKind::ExitShort | SignalKind::Flatten => Some(1),
2271 }
2272}
2273
2274async fn persist_state(
2275 repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
2276 persisted: Arc<Mutex<LiveState>>,
2277 strategy: Option<Arc<Mutex<Box<dyn Strategy>>>>,
2278) -> Result<()> {
2279 if let Some(strat_lock) = strategy {
2280 let strat = strat_lock.lock().await;
2282 if let Ok(json_state) = strat.snapshot() {
2283 let mut guard = persisted.lock().await;
2284 guard.strategy_state = Some(json_state);
2285 } else {
2286 warn!("failed to snapshot strategy state");
2287 }
2288 }
2289
2290 let snapshot = {
2291 let guard = persisted.lock().await;
2292 guard.clone()
2293 };
2294 tokio::task::spawn_blocking(move || repo.save(&snapshot))
2295 .await
2296 .map_err(|err| anyhow!("state persistence task failed: {err}"))?
2297 .map_err(|err| anyhow!(err.to_string()))
2298}
2299
2300async fn shared_risk_context(
2301 symbol: Symbol,
2302 portfolio: &Arc<Mutex<Portfolio>>,
2303 market: &Arc<Mutex<HashMap<Symbol, MarketSnapshot>>>,
2304 persisted: &Arc<Mutex<LiveState>>,
2305 registry: &Arc<MarketRegistry>,
2306) -> RiskContext {
2307 let instrument = registry.get(symbol);
2308 let (instrument_kind, base_asset, quote_asset, settlement_asset) = instrument
2309 .map(|instrument| {
2310 (
2311 Some(instrument.kind),
2312 instrument.base,
2313 instrument.quote,
2314 instrument.settlement_currency,
2315 )
2316 })
2317 .unwrap_or((
2318 None,
2319 AssetId::unspecified(),
2320 AssetId::unspecified(),
2321 AssetId::unspecified(),
2322 ));
2323 let (
2324 signed_qty,
2325 equity,
2326 venue_equity,
2327 liquidate_only,
2328 base_available,
2329 quote_available,
2330 settlement_available,
2331 ) = {
2332 let guard = portfolio.lock().await;
2333 (
2334 guard.signed_position_qty(symbol),
2335 guard.equity(),
2336 guard.exchange_equity(symbol.exchange),
2337 guard.liquidate_only(),
2338 guard
2339 .balance(base_asset)
2340 .map(|cash| cash.quantity)
2341 .unwrap_or_default(),
2342 guard
2343 .balance(quote_asset)
2344 .map(|cash| cash.quantity)
2345 .unwrap_or_default(),
2346 guard
2347 .balance(settlement_asset)
2348 .map(|cash| cash.quantity)
2349 .unwrap_or_default(),
2350 )
2351 };
2352 let observed_price = {
2353 let guard = market.lock().await;
2354 guard.get(&symbol).and_then(|snapshot| snapshot.price())
2355 };
2356 let last_price = if let Some(price) = observed_price {
2357 price
2358 } else {
2359 let guard = persisted.lock().await;
2360 guard
2361 .last_prices
2362 .get(&symbol)
2363 .copied()
2364 .unwrap_or(Decimal::ZERO)
2365 };
2366 RiskContext {
2367 symbol,
2368 exchange: symbol.exchange,
2369 signed_position_qty: signed_qty,
2370 portfolio_equity: equity,
2371 exchange_equity: venue_equity,
2372 last_price,
2373 liquidate_only,
2374 instrument_kind,
2375 base_asset,
2376 quote_asset,
2377 settlement_asset,
2378 base_available,
2379 quote_available,
2380 settlement_available,
2381 }
2382}
2383
2384async fn alert_liquidate_only(alerts: Arc<AlertManager>) {
2385 alerts
2386 .notify(
2387 "Max drawdown triggered",
2388 "Portfolio entered liquidate-only mode; new exposure blocked until review",
2389 )
2390 .await;
2391}
2392
2393fn spawn_connection_monitor(
2394 shutdown: ShutdownSignal,
2395 flag: Arc<AtomicBool>,
2396 metrics: Arc<LiveMetrics>,
2397 stream: &'static str,
2398) -> JoinHandle<()> {
2399 tokio::spawn(async move {
2400 loop {
2401 metrics.update_connection_status(stream, flag.load(Ordering::SeqCst));
2402 if !shutdown.sleep(Duration::from_secs(5)).await {
2403 break;
2404 }
2405 }
2406 })
2407}
2408
2409fn spawn_order_timeout_monitor(
2410 orchestrator: Arc<OrderOrchestrator>,
2411 bus: Arc<EventBus>,
2412 alerts: Arc<AlertManager>,
2413 shutdown: ShutdownSignal,
2414) -> JoinHandle<()> {
2415 tokio::spawn(async move {
2416 let mut ticker = tokio::time::interval(tesser_execution::orchestrator::ORDER_POLL_INTERVAL);
2417 loop {
2418 ticker.tick().await;
2419 if shutdown.triggered() {
2420 break;
2421 }
2422 match orchestrator.poll_stale_orders().await {
2423 Ok(updates) => {
2424 for order in updates {
2425 if matches!(order.status, OrderStatus::Rejected | OrderStatus::Canceled) {
2426 let message = format!(
2427 "Order {} for {} timed out after {}s",
2428 order.id,
2429 order.request.symbol,
2430 tesser_execution::orchestrator::ORDER_TIMEOUT.as_secs()
2431 );
2432 error!(%message);
2433 alerts.order_failure(&message).await;
2434 alerts.notify("Order timeout", &message).await;
2435 }
2436 bus.publish(Event::OrderUpdate(OrderUpdateEvent { order }));
2437 }
2438 }
2439 Err(err) => {
2440 warn!(error = %err, "order timeout monitor failed");
2441 }
2442 }
2443 }
2444 })
2445}
2446
2447async fn load_market_registry(
2448 client: Arc<dyn ExecutionClient>,
2449 settings: &LiveSessionSettings,
2450) -> Result<Arc<MarketRegistry>> {
2451 let mut catalog = InstrumentCatalog::new();
2452 let mut loaded_local = false;
2453 if let Some(path) = &settings.markets_file {
2454 catalog
2455 .add_file(path)
2456 .with_context(|| format!("failed to load markets from {}", path.display()))?;
2457 loaded_local = true;
2458 }
2459
2460 if !loaded_local && !settings.exec_backend.is_paper() {
2461 let instruments = client
2462 .list_instruments(settings.category.as_path())
2463 .await
2464 .context("failed to fetch instruments from execution client")?;
2465 catalog
2466 .add_instruments(instruments)
2467 .map_err(|err| anyhow!(err.to_string()))?;
2468 } else if catalog.is_empty() {
2469 return Err(anyhow!(
2470 "paper execution requires --markets-file when exchange metadata is unavailable"
2471 ));
2472 }
2473
2474 if catalog.is_empty() {
2475 return Err(anyhow!(
2476 "no market metadata available; supply --markets-file or use a live exchange"
2477 ));
2478 }
2479
2480 let registry = catalog.build().map_err(|err| anyhow!(err.to_string()))?;
2481 Ok(Arc::new(registry))
2482}
2483
2484#[cfg(feature = "bybit")]
2485#[allow(clippy::too_many_arguments)]
2486fn spawn_bybit_private_stream(
2487 creds: BybitCredentials,
2488 ws_url: String,
2489 private_tx: mpsc::Sender<BrokerEvent>,
2490 exec_client: Arc<dyn ExecutionClient>,
2491 symbols: Vec<Symbol>,
2492 last_sync: Arc<tokio::sync::Mutex<Option<DateTime<Utc>>>>,
2493 private_connection_flag: Option<Arc<AtomicBool>>,
2494 metrics: Arc<LiveMetrics>,
2495 router: Option<Arc<RouterExecutionClient>>,
2496 shutdown: ShutdownSignal,
2497) {
2498 let exchange_id = exec_client
2499 .as_any()
2500 .downcast_ref::<BybitClient>()
2501 .map(|client| client.exchange())
2502 .unwrap_or(ExchangeId::UNSPECIFIED);
2503 let venue_symbols: Vec<Symbol> = symbols
2504 .iter()
2505 .copied()
2506 .filter(|symbol| symbol.exchange == exchange_id)
2507 .collect();
2508 tokio::spawn(async move {
2509 loop {
2510 match tesser_bybit::ws::connect_private(
2511 &ws_url,
2512 &creds,
2513 private_connection_flag.clone(),
2514 )
2515 .await
2516 {
2517 Ok(mut socket) => {
2518 if let Some(flag) = &private_connection_flag {
2519 flag.store(true, Ordering::SeqCst);
2520 }
2521 metrics.update_connection_status("private", true);
2522 info!("Connected to Bybit private WebSocket stream");
2523 for symbol in &venue_symbols {
2524 match exec_client.list_open_orders(*symbol).await {
2525 Ok(orders) => {
2526 for mut order in orders {
2527 if let Some(router) = &router {
2528 order = router.normalize_order_event(exchange_id, order);
2529 }
2530 if let Err(err) =
2531 private_tx.send(BrokerEvent::OrderUpdate(order)).await
2532 {
2533 error!("failed to send reconciled order update: {err}");
2534 }
2535 }
2536 }
2537 Err(e) => {
2538 error!(
2539 "failed to reconcile open orders for {}: {e}",
2540 symbol.code()
2541 );
2542 }
2543 }
2544 }
2545 if let Some(bybit) = exec_client.as_any().downcast_ref::<BybitClient>() {
2546 let since = {
2547 let guard = last_sync.lock().await;
2548 guard.unwrap_or_else(|| Utc::now() - chrono::Duration::minutes(30))
2549 };
2550 match bybit.list_executions_since(since).await {
2551 Ok(fills) => {
2552 for mut fill in fills {
2553 if let Some(router) = &router {
2554 match router.normalize_fill_event(exchange_id, fill) {
2555 Some(normalized) => fill = normalized,
2556 None => {
2557 metrics.inc_router_failure("orphan_fill");
2558 continue;
2559 }
2560 }
2561 }
2562 if let Err(err) = private_tx.send(BrokerEvent::Fill(fill)).await
2563 {
2564 error!("failed to send reconciled fill: {err}");
2565 }
2566 }
2567 }
2568 Err(e) => {
2569 error!("failed to reconcile executions since {:?}: {}", since, e);
2570 }
2571 }
2572 let mut guard = last_sync.lock().await;
2573 *guard = Some(Utc::now());
2574 }
2575
2576 while let Some(msg) = socket.next().await {
2577 if shutdown.triggered() {
2578 break;
2579 }
2580 if let Ok(Message::Text(text)) = msg {
2581 if let Ok(value) = serde_json::from_str::<serde_json::Value>(&text) {
2582 if let Some(topic) = value.get("topic").and_then(|v| v.as_str()) {
2583 match topic {
2584 "order" => {
2585 if let Ok(msg) = serde_json::from_value::<
2586 PrivateMessage<BybitWsOrder>,
2587 >(
2588 value.clone()
2589 ) {
2590 for update in msg.data {
2591 if let Ok(mut order) =
2592 update.to_tesser_order(exchange_id, None)
2593 {
2594 if let Some(router) = &router {
2595 order = router.normalize_order_event(
2596 exchange_id,
2597 order,
2598 );
2599 }
2600 if let Err(err) = private_tx
2601 .send(BrokerEvent::OrderUpdate(order))
2602 .await
2603 {
2604 error!(
2605 "failed to send private order update: {err}"
2606 );
2607 }
2608 }
2609 }
2610 }
2611 }
2612 "execution" => {
2613 if let Ok(msg) = serde_json::from_value::<
2614 PrivateMessage<BybitWsExecution>,
2615 >(
2616 value.clone()
2617 ) {
2618 for exec in msg.data {
2619 if let Ok(mut fill) =
2620 exec.to_tesser_fill(exchange_id)
2621 {
2622 if let Some(router) = &router {
2623 match router.normalize_fill_event(
2624 exchange_id,
2625 fill,
2626 ) {
2627 Some(normalized) => {
2628 fill = normalized
2629 }
2630 None => {
2631 metrics.inc_router_failure(
2632 "orphan_fill",
2633 );
2634 continue;
2635 }
2636 }
2637 }
2638 if let Err(err) = private_tx
2639 .send(BrokerEvent::Fill(fill))
2640 .await
2641 {
2642 error!(
2643 "failed to send private fill event: {err}"
2644 );
2645 }
2646 }
2647 }
2648 }
2649 }
2650 _ => {}
2651 }
2652 }
2653 }
2654 }
2655 }
2656 }
2657 Err(e) => {
2658 if let Some(flag) = &private_connection_flag {
2659 flag.store(false, Ordering::SeqCst);
2660 }
2661 metrics.update_connection_status("private", false);
2662 error!("Bybit private WebSocket connection failed: {e}. Retrying...");
2663 tokio::time::sleep(Duration::from_secs(5)).await;
2664 }
2665 }
2666 if shutdown.triggered() {
2667 break;
2668 }
2669 }
2670 });
2671}
2672
2673#[cfg(feature = "binance")]
2674#[allow(clippy::too_many_arguments)]
2675fn spawn_binance_private_stream(
2676 exec_client: Arc<dyn ExecutionClient>,
2677 ws_url: String,
2678 private_tx: mpsc::Sender<BrokerEvent>,
2679 private_connection_flag: Option<Arc<AtomicBool>>,
2680 metrics: Arc<LiveMetrics>,
2681 router: Option<Arc<RouterExecutionClient>>,
2682 shutdown: ShutdownSignal,
2683) {
2684 let router_handle = router.clone();
2685 tokio::spawn(async move {
2686 let router = router_handle;
2687 loop {
2688 let Some(binance) = exec_client
2689 .as_ref()
2690 .as_any()
2691 .downcast_ref::<BinanceClient>()
2692 else {
2693 warn!("execution client is not Binance");
2694 return;
2695 };
2696 let exchange = binance.exchange();
2697 let listen_key = match binance.start_user_stream().await {
2698 Ok(key) => key,
2699 Err(err) => {
2700 error!("failed to start Binance user stream: {err}");
2701 tokio::time::sleep(Duration::from_secs(5)).await;
2702 continue;
2703 }
2704 };
2705 match BinanceUserDataStream::connect(&ws_url, &listen_key).await {
2706 Ok(user_stream) => {
2707 if let Some(flag) = &private_connection_flag {
2708 flag.store(true, Ordering::SeqCst);
2709 }
2710 metrics.update_connection_status("private", true);
2711 let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
2712 let tx_orders = private_tx.clone();
2713 let exchange_id = exchange;
2714 let router_for_event = router.clone();
2715 let metrics_for_event = metrics.clone();
2716 user_stream.on_event(move |event| {
2717 if let Some(update) = extract_order_update(&event) {
2718 if let Some(mut order) = order_from_update(exchange_id, update) {
2719 if let Some(router) = &router_for_event {
2720 order = router.normalize_order_event(exchange_id, order);
2721 }
2722 let _ = tx_orders.blocking_send(BrokerEvent::OrderUpdate(order));
2723 }
2724 if let Some(mut fill) = fill_from_update(exchange_id, update) {
2725 if let Some(router) = &router_for_event {
2726 match router.normalize_fill_event(exchange_id, fill) {
2727 Some(normalized) => fill = normalized,
2728 None => {
2729 metrics_for_event.inc_router_failure("orphan_fill");
2730 return;
2731 }
2732 }
2733 }
2734 let _ = tx_orders.blocking_send(BrokerEvent::Fill(fill));
2735 }
2736 }
2737 if matches!(event, UserDataStreamEventsResponse::ListenKeyExpired(_)) {
2738 let _ = reconnect_tx.try_send(());
2739 }
2740 });
2741 let keepalive_client = exec_client.clone();
2742 let keepalive_handle = tokio::spawn(async move {
2743 let mut interval = tokio::time::interval(Duration::from_secs(30 * 60));
2744 loop {
2745 interval.tick().await;
2746 let Some(client) = keepalive_client
2747 .as_ref()
2748 .as_any()
2749 .downcast_ref::<BinanceClient>()
2750 else {
2751 break;
2752 };
2753 if client.keepalive_user_stream().await.is_err() {
2754 break;
2755 }
2756 }
2757 });
2758 tokio::select! {
2759 _ = reconnect_rx.recv() => {
2760 warn!("binance listen key expired; reconnecting");
2761 }
2762 _ = shutdown.wait() => {
2763 keepalive_handle.abort();
2764 let _ = user_stream.unsubscribe().await;
2765 return;
2766 }
2767 }
2768 keepalive_handle.abort();
2769 let _ = user_stream.unsubscribe().await;
2770 }
2771 Err(err) => {
2772 error!("failed to connect to Binance user stream: {err}");
2773 }
2774 }
2775 if let Some(flag) = &private_connection_flag {
2776 flag.store(false, Ordering::SeqCst);
2777 }
2778 metrics.update_connection_status("private", false);
2779 if shutdown.triggered() {
2780 break;
2781 }
2782 tokio::time::sleep(Duration::from_secs(5)).await;
2783 }
2784 });
2785}
2786
2787#[cfg(test)]
2788mod tests {
2789 use super::*;
2790 use std::collections::VecDeque;
2791 use tesser_core::OrderBookLevel;
2792
2793 struct StaticStream {
2794 ticks: VecDeque<Tick>,
2795 candles: VecDeque<Candle>,
2796 books: VecDeque<OrderBook>,
2797 }
2798
2799 impl StaticStream {
2800 fn new(ticks: Vec<Tick>, candles: Vec<Candle>, books: Vec<OrderBook>) -> Self {
2801 Self {
2802 ticks: ticks.into(),
2803 candles: candles.into(),
2804 books: books.into(),
2805 }
2806 }
2807 }
2808
2809 #[async_trait::async_trait]
2810 impl LiveMarketStream for StaticStream {
2811 async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
2812 Ok(self.ticks.pop_front())
2813 }
2814
2815 async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
2816 Ok(self.candles.pop_front())
2817 }
2818
2819 async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>> {
2820 Ok(self.books.pop_front())
2821 }
2822 }
2823
2824 fn build_tick(exchange: &str, price: i64) -> Tick {
2825 Tick {
2826 symbol: Symbol::from(exchange),
2827 price: Decimal::from(price),
2828 size: Decimal::ONE,
2829 side: Side::Buy,
2830 exchange_timestamp: Utc::now(),
2831 received_at: Utc::now(),
2832 }
2833 }
2834
2835 fn build_candle(exchange: &str, close: i64) -> Candle {
2836 Candle {
2837 symbol: Symbol::from(exchange),
2838 interval: Interval::OneMinute,
2839 open: Decimal::from(close),
2840 high: Decimal::from(close),
2841 low: Decimal::from(close),
2842 close: Decimal::from(close),
2843 volume: Decimal::ONE,
2844 timestamp: Utc::now(),
2845 }
2846 }
2847
2848 fn build_book(exchange: &str, price: i64) -> OrderBook {
2849 OrderBook {
2850 symbol: Symbol::from(exchange),
2851 bids: vec![OrderBookLevel {
2852 price: Decimal::from(price),
2853 size: Decimal::ONE,
2854 }],
2855 asks: vec![OrderBookLevel {
2856 price: Decimal::from(price + 1),
2857 size: Decimal::ONE,
2858 }],
2859 timestamp: Utc::now(),
2860 exchange_checksum: None,
2861 local_checksum: None,
2862 }
2863 }
2864
2865 #[tokio::test]
2866 async fn router_market_stream_fans_in_events() {
2867 let shutdown = ShutdownSignal::new();
2868 let stream_a = Box::new(StaticStream::new(
2869 vec![build_tick("A", 1), build_tick("A", 2)],
2870 vec![build_candle("A", 10)],
2871 vec![build_book("A", 5)],
2872 ));
2873 let stream_b = Box::new(StaticStream::new(
2874 vec![build_tick("B", 3)],
2875 vec![build_candle("B", 20)],
2876 vec![build_book("B", 15)],
2877 ));
2878 let mut router = RouterMarketStream::new(
2879 vec![("A".into(), stream_a), ("B".into(), stream_b)],
2880 shutdown.clone(),
2881 );
2882
2883 let first = router.next_tick().await.unwrap().unwrap();
2884 let second = router.next_tick().await.unwrap().unwrap();
2885 let third = router.next_tick().await.unwrap().unwrap();
2886 assert_eq!(first.symbol, Symbol::from("A"));
2887 assert_eq!(second.symbol, Symbol::from("A"));
2888 assert_eq!(third.symbol, Symbol::from("B"));
2889
2890 let candle_a = router.next_candle().await.unwrap().unwrap();
2891 let candle_b = router.next_candle().await.unwrap().unwrap();
2892 assert_eq!(candle_a.symbol, Symbol::from("A"));
2893 assert_eq!(candle_b.symbol, Symbol::from("B"));
2894
2895 let book_a = router.next_order_book().await.unwrap().unwrap();
2896 let book_b = router.next_order_book().await.unwrap().unwrap();
2897 assert_eq!(book_a.bids[0].price, Decimal::from(5));
2898 assert_eq!(book_b.asks[0].price, Decimal::from(16));
2899
2900 shutdown.trigger();
2901 }
2902}