tesser_cli/
live.rs

1use std::collections::{HashMap, HashSet};
2use std::net::SocketAddr;
3use std::path::PathBuf;
4use std::sync::{
5    atomic::{AtomicBool, AtomicI64, Ordering},
6    Arc, Once,
7};
8use std::time::{Duration, Instant};
9
10use anyhow::{anyhow, bail, Context, Result};
11use chrono::{DateTime, Utc};
12use clap::ValueEnum;
13use futures::StreamExt;
14use rust_decimal::{prelude::ToPrimitive, Decimal};
15use tokio::sync::{mpsc, Mutex, Notify};
16use tokio::task::JoinHandle;
17use tokio_tungstenite::tungstenite::Message;
18use tracing::{debug, error, info, trace, warn};
19use uuid::Uuid;
20
21use serde_json::{json, Value};
22
23fn ensure_builtin_connectors_registered() {
24    static INIT: Once = Once::new();
25    INIT.call_once(|| {
26        register_connector_factory(Arc::new(PaperFactory::default()));
27        #[cfg(feature = "bybit")]
28        register_bybit_factory();
29        #[cfg(feature = "binance")]
30        register_binance_factory();
31    });
32}
33
34#[cfg(feature = "binance")]
35use tesser_binance::{
36    fill_from_update, order_from_update, register_factory as register_binance_factory,
37    ws::{extract_order_update, BinanceUserDataStream, UserDataStreamEventsResponse},
38    BinanceClient,
39};
40use tesser_broker::{
41    get_connector_factory, register_connector_factory, BrokerResult, ConnectorFactory,
42    ConnectorStream, ConnectorStreamConfig, ExecutionClient, RouterExecutionClient,
43};
44#[cfg(feature = "bybit")]
45use tesser_bybit::ws::{BybitWsExecution, BybitWsOrder, PrivateMessage};
46#[cfg(feature = "bybit")]
47use tesser_bybit::{register_factory as register_bybit_factory, BybitClient, BybitCredentials};
48use tesser_config::{AlertingConfig, ExchangeConfig, PersistenceEngine, RiskManagementConfig};
49use tesser_core::{
50    AccountBalance, AssetId, Candle, ExchangeId, Fill, Interval, Order, OrderBook, OrderStatus,
51    Position, Price, Quantity, Side, Signal, SignalKind, Symbol, Tick,
52};
53use tesser_data::recorder::{ParquetRecorder, RecorderConfig, RecorderHandle};
54use tesser_events::{
55    CandleEvent, Event, EventBus, FillEvent, OrderBookEvent, OrderUpdateEvent, SignalEvent,
56    TickEvent,
57};
58use tesser_execution::{
59    AlgoStateRepository, BasicRiskChecker, ExecutionEngine, FixedOrderSizer, OrderOrchestrator,
60    PanicCloseConfig, PanicObserver, PreTradeRiskChecker, RiskContext, RiskLimits,
61    SqliteAlgoStateRepository, StoredAlgoState,
62};
63use tesser_journal::LmdbJournal;
64use tesser_markets::{InstrumentCatalog, MarketRegistry};
65use tesser_paper::{FeeScheduleConfig, PaperExecutionClient, PaperFactory};
66use tesser_portfolio::{
67    LiveState, Portfolio, PortfolioConfig, SqliteStateRepository, StateRepository,
68};
69use tesser_strategy::{Strategy, StrategyContext};
70
71use crate::alerts::{AlertDispatcher, AlertManager};
72use crate::control;
73use crate::telemetry::{spawn_metrics_server, LiveMetrics};
74use crate::PublicChannel;
75
76/// Unified event type for asynchronous updates from the broker.
77#[derive(Debug)]
78pub enum BrokerEvent {
79    OrderUpdate(Order),
80    Fill(Fill),
81}
82
83struct PanicAlertHook {
84    metrics: Arc<LiveMetrics>,
85    alerts: Arc<AlertManager>,
86}
87
88impl PanicAlertHook {
89    fn new(metrics: Arc<LiveMetrics>, alerts: Arc<AlertManager>) -> Self {
90        Self { metrics, alerts }
91    }
92}
93
94impl PanicObserver for PanicAlertHook {
95    fn on_group_event(&self, group_id: Uuid, symbol: Symbol, quantity: Quantity, reason: &str) {
96        self.metrics.inc_panic_close();
97        let alerts = self.alerts.clone();
98        let title = "Execution group panic close";
99        let message = format!("Group {group_id} panic-closed {symbol} qty={quantity}: {reason}");
100        tokio::spawn(async move {
101            alerts.notify(title, &message).await;
102        });
103    }
104}
105
106#[derive(Clone, Copy, Debug, Eq, PartialEq, ValueEnum)]
107#[value(rename_all = "kebab-case")]
108pub enum ExecutionBackend {
109    Paper,
110    Live,
111}
112
113impl ExecutionBackend {
114    fn is_paper(self) -> bool {
115        matches!(self, Self::Paper)
116    }
117}
118
119#[derive(Clone, Copy, Debug, Eq, PartialEq, ValueEnum)]
120#[value(rename_all = "kebab-case")]
121pub enum PersistenceBackend {
122    Sqlite,
123    Lmdb,
124}
125
126impl From<PersistenceBackend> for PersistenceEngine {
127    fn from(value: PersistenceBackend) -> Self {
128        match value {
129            PersistenceBackend::Sqlite => PersistenceEngine::Sqlite,
130            PersistenceBackend::Lmdb => PersistenceEngine::Lmdb,
131        }
132    }
133}
134
135const DEFAULT_ORDER_BOOK_DEPTH: usize = 50;
136
137pub const fn default_order_book_depth() -> usize {
138    DEFAULT_ORDER_BOOK_DEPTH
139}
140const STRATEGY_LOCK_WARN_THRESHOLD: Duration = Duration::from_millis(25);
141const STRATEGY_CALL_WARN_THRESHOLD: Duration = Duration::from_millis(250);
142const MARKET_EVENT_TIMEOUT: Duration = Duration::from_millis(10);
143
144#[async_trait::async_trait]
145trait LiveMarketStream: Send {
146    async fn next_tick(&mut self) -> BrokerResult<Option<Tick>>;
147    async fn next_candle(&mut self) -> BrokerResult<Option<Candle>>;
148    async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>>;
149}
150
151struct FactoryStreamAdapter {
152    inner: Box<dyn ConnectorStream>,
153}
154
155impl FactoryStreamAdapter {
156    fn new(inner: Box<dyn ConnectorStream>) -> Self {
157        Self { inner }
158    }
159}
160
161#[async_trait::async_trait]
162impl LiveMarketStream for FactoryStreamAdapter {
163    async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
164        self.inner.next_tick().await
165    }
166
167    async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
168        self.inner.next_candle().await
169    }
170
171    async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>> {
172        self.inner.next_order_book().await
173    }
174}
175
176struct RouterMarketStream {
177    tick_rx: mpsc::Receiver<Tick>,
178    candle_rx: mpsc::Receiver<Candle>,
179    book_rx: mpsc::Receiver<OrderBook>,
180    tasks: Vec<JoinHandle<()>>,
181}
182
183impl RouterMarketStream {
184    fn new(streams: Vec<(String, Box<dyn LiveMarketStream>)>, shutdown: ShutdownSignal) -> Self {
185        let (tick_tx, tick_rx) = mpsc::channel(512);
186        let (candle_tx, candle_rx) = mpsc::channel(512);
187        let (book_tx, book_rx) = mpsc::channel(512);
188        let mut tasks = Vec::new();
189        for (name, mut stream) in streams {
190            let tick_tx = tick_tx.clone();
191            let candle_tx = candle_tx.clone();
192            let book_tx = book_tx.clone();
193            let shutdown = shutdown.clone();
194            tasks.push(tokio::spawn(async move {
195                loop {
196                    if shutdown.triggered() {
197                        break;
198                    }
199                    let mut emitted = false;
200
201                    let tick = tokio::select! {
202                        res = stream.next_tick() => res,
203                        _ = shutdown.wait() => break,
204                    };
205                    match tick {
206                        Ok(Some(event)) => {
207                            emitted = true;
208                            if tick_tx.send(event).await.is_err() {
209                                break;
210                            }
211                        }
212                        Ok(None) => {}
213                        Err(err) => {
214                            warn!(exchange = %name, error = %err, "market stream tick failed");
215                            break;
216                        }
217                    }
218
219                    let candle = tokio::select! {
220                        res = stream.next_candle() => res,
221                        _ = shutdown.wait() => break,
222                    };
223                    match candle {
224                        Ok(Some(event)) => {
225                            emitted = true;
226                            if candle_tx.send(event).await.is_err() {
227                                break;
228                            }
229                        }
230                        Ok(None) => {}
231                        Err(err) => {
232                            warn!(exchange = %name, error = %err, "market stream candle failed");
233                            break;
234                        }
235                    }
236
237                    let book = tokio::select! {
238                        res = stream.next_order_book() => res,
239                        _ = shutdown.wait() => break,
240                    };
241                    match book {
242                        Ok(Some(event)) => {
243                            emitted = true;
244                            if book_tx.send(event).await.is_err() {
245                                break;
246                            }
247                        }
248                        Ok(None) => {}
249                        Err(err) => {
250                            warn!(exchange = %name, error = %err, "market stream order book failed");
251                            break;
252                        }
253                    }
254
255                    if !emitted && !shutdown.sleep(Duration::from_millis(5)).await {
256                        break;
257                    }
258                }
259            }));
260        }
261        Self {
262            tick_rx,
263            candle_rx,
264            book_rx,
265            tasks,
266        }
267    }
268}
269
270#[async_trait::async_trait]
271impl LiveMarketStream for RouterMarketStream {
272    async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
273        Ok(self.tick_rx.recv().await)
274    }
275
276    async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
277        Ok(self.candle_rx.recv().await)
278    }
279
280    async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>> {
281        Ok(self.book_rx.recv().await)
282    }
283}
284
285impl Drop for RouterMarketStream {
286    fn drop(&mut self) {
287        for handle in &self.tasks {
288            handle.abort();
289        }
290    }
291}
292
293#[derive(Clone)]
294pub struct PersistenceSettings {
295    pub engine: PersistenceEngine,
296    pub state_path: PathBuf,
297    pub algo_path: PathBuf,
298}
299
300impl PersistenceSettings {
301    pub fn new(engine: PersistenceEngine, state_path: PathBuf) -> Self {
302        let algo_path = match engine {
303            PersistenceEngine::Sqlite => state_path.with_extension("algos.db"),
304            PersistenceEngine::Lmdb => state_path.clone(),
305        };
306        Self {
307            engine,
308            state_path,
309            algo_path,
310        }
311    }
312
313    fn algo_repo_path(&self) -> &PathBuf {
314        &self.algo_path
315    }
316}
317
318struct PersistenceHandles {
319    state: Arc<dyn StateRepository<Snapshot = LiveState>>,
320    algo: Arc<dyn AlgoStateRepository<State = StoredAlgoState>>,
321}
322
323#[derive(Clone)]
324pub struct NamedExchange {
325    pub name: String,
326    pub config: ExchangeConfig,
327}
328
329struct ExchangeRoute {
330    name: String,
331    driver: String,
332    #[cfg(feature = "binance")]
333    ws_url: String,
334    execution: Arc<dyn ExecutionClient>,
335}
336
337struct ExchangeBuildResult {
338    execution_client: Arc<dyn ExecutionClient>,
339    router: Option<Arc<RouterExecutionClient>>,
340    market_stream: Box<dyn LiveMarketStream>,
341    routes: Vec<ExchangeRoute>,
342}
343
344pub struct LiveSessionSettings {
345    pub category: PublicChannel,
346    pub interval: Interval,
347    pub quantity: Quantity,
348    pub slippage_bps: Decimal,
349    pub fee_bps: Decimal,
350    pub history: usize,
351    pub metrics_addr: SocketAddr,
352    pub persistence: PersistenceSettings,
353    pub initial_balances: HashMap<AssetId, Decimal>,
354    pub reporting_currency: AssetId,
355    pub markets_file: Option<PathBuf>,
356    pub alerting: AlertingConfig,
357    pub exec_backend: ExecutionBackend,
358    pub risk: RiskManagementConfig,
359    pub reconciliation_interval: Duration,
360    pub reconciliation_threshold: Decimal,
361    pub orderbook_depth: usize,
362    pub record_path: Option<PathBuf>,
363    pub control_addr: SocketAddr,
364    pub panic_close: PanicCloseConfig,
365}
366
367impl LiveSessionSettings {
368    fn risk_limits(&self) -> RiskLimits {
369        RiskLimits {
370            max_order_quantity: self.risk.max_order_quantity.max(Decimal::ZERO),
371            max_position_quantity: self.risk.max_position_quantity.max(Decimal::ZERO),
372            max_order_notional: self
373                .risk
374                .max_order_notional
375                .and_then(|limit| (limit > Decimal::ZERO).then_some(limit)),
376        }
377    }
378}
379
380fn build_persistence_handles(settings: &LiveSessionSettings) -> Result<PersistenceHandles> {
381    match settings.persistence.engine {
382        PersistenceEngine::Sqlite => {
383            let state_repo: Arc<dyn StateRepository<Snapshot = LiveState>> = Arc::new(
384                SqliteStateRepository::new(settings.persistence.state_path.clone()),
385            );
386            let algo_repo: Arc<dyn AlgoStateRepository<State = StoredAlgoState>> = Arc::new(
387                SqliteAlgoStateRepository::new(settings.persistence.algo_repo_path())?,
388            );
389            Ok(PersistenceHandles {
390                state: state_repo,
391                algo: algo_repo,
392            })
393        }
394        PersistenceEngine::Lmdb => {
395            let journal = Arc::new(LmdbJournal::open(&settings.persistence.state_path)?);
396            let state_repo: Arc<dyn StateRepository<Snapshot = LiveState>> =
397                Arc::new(journal.state_repo());
398            let algo_repo: Arc<dyn AlgoStateRepository<State = StoredAlgoState>> =
399                Arc::new(journal.algo_repo());
400            Ok(PersistenceHandles {
401                state: state_repo,
402                algo: algo_repo,
403            })
404        }
405    }
406}
407
408pub async fn run_live(
409    strategy: Box<dyn Strategy>,
410    symbols: Vec<Symbol>,
411    exchanges: Vec<NamedExchange>,
412    settings: LiveSessionSettings,
413) -> Result<()> {
414    run_live_with_shutdown(
415        strategy,
416        symbols,
417        exchanges,
418        settings,
419        ShutdownSignal::new(),
420    )
421    .await
422}
423
424/// Variant of [`run_live`] that accepts a manually controlled shutdown signal.
425pub async fn run_live_with_shutdown(
426    strategy: Box<dyn Strategy>,
427    symbols: Vec<Symbol>,
428    exchanges: Vec<NamedExchange>,
429    settings: LiveSessionSettings,
430    shutdown: ShutdownSignal,
431) -> Result<()> {
432    if symbols.is_empty() {
433        return Err(anyhow!("strategy did not declare any subscriptions"));
434    }
435    if settings.quantity <= Decimal::ZERO {
436        return Err(anyhow!("--quantity must be positive"));
437    }
438
439    let public_connection = Arc::new(AtomicBool::new(false));
440    let private_connection = if matches!(settings.exec_backend, ExecutionBackend::Live) {
441        Some(Arc::new(AtomicBool::new(false)))
442    } else {
443        None
444    };
445    ensure_builtin_connectors_registered();
446    if exchanges.is_empty() {
447        return Err(anyhow!("no exchange profiles supplied"));
448    }
449    let symbol_codes: Vec<String> = symbols
450        .iter()
451        .map(|symbol| symbol.code().to_string())
452        .collect();
453    let driver_label = exchanges
454        .iter()
455        .map(|ex| ex.config.driver.clone())
456        .collect::<Vec<_>>()
457        .join(",");
458
459    let ExchangeBuildResult {
460        execution_client,
461        router,
462        market_stream,
463        routes,
464    } = build_exchange_routes(
465        &settings,
466        &exchanges,
467        &symbols,
468        &symbol_codes,
469        public_connection.clone(),
470        shutdown.clone(),
471    )
472    .await?;
473    let market_registry = load_market_registry(execution_client.clone(), &settings).await?;
474    if matches!(settings.exec_backend, ExecutionBackend::Live) {
475        info!(drivers = %driver_label, "live execution enabled");
476    }
477    let risk_checker: Arc<dyn PreTradeRiskChecker> =
478        Arc::new(BasicRiskChecker::new(settings.risk_limits()));
479    let execution = ExecutionEngine::new(
480        execution_client.clone(),
481        Box::new(FixedOrderSizer {
482            quantity: settings.quantity,
483        }),
484        risk_checker,
485    );
486
487    let mut bootstrap = None;
488    if matches!(settings.exec_backend, ExecutionBackend::Live) {
489        info!("synchronizing portfolio snapshot from exchange");
490        let positions = execution_client
491            .positions()
492            .await
493            .context("failed to fetch remote positions")?;
494        let balances = execution_client
495            .account_balances()
496            .await
497            .context("failed to fetch remote account balances")?;
498        let mut open_orders = Vec::new();
499        for symbol in &symbols {
500            let mut symbol_orders = execution_client
501                .list_open_orders(*symbol)
502                .await
503                .with_context(|| format!("failed to fetch open orders for {}", symbol.code()))?;
504            open_orders.append(&mut symbol_orders);
505        }
506        bootstrap = Some(LiveBootstrap {
507            positions,
508            balances,
509            open_orders,
510        });
511    }
512
513    let persistence = build_persistence_handles(&settings)?;
514
515    let metrics = Arc::new(LiveMetrics::new());
516    let alerting_cfg = settings.alerting.clone();
517    let dispatcher = AlertDispatcher::new(alerting_cfg.webhook_url.clone());
518    let alerts = Arc::new(AlertManager::new(
519        alerting_cfg,
520        dispatcher,
521        Some(public_connection.clone()),
522        private_connection.clone(),
523    ));
524    let panic_hook: Arc<dyn PanicObserver> =
525        Arc::new(PanicAlertHook::new(metrics.clone(), alerts.clone()));
526
527    // Create orchestrator with execution engine
528    let initial_open_orders = bootstrap
529        .as_ref()
530        .map(|data| data.open_orders.clone())
531        .unwrap_or_default();
532    let orchestrator = OrderOrchestrator::new(
533        Arc::new(execution),
534        persistence.algo.clone(),
535        initial_open_orders,
536        settings.panic_close,
537        Some(panic_hook.clone()),
538    )
539    .await?;
540
541    let runtime = LiveRuntime::new(
542        market_stream,
543        strategy,
544        symbols,
545        routes,
546        router.clone(),
547        orchestrator,
548        persistence.state,
549        settings,
550        metrics,
551        alerts,
552        market_registry,
553        shutdown,
554        public_connection,
555        private_connection,
556        bootstrap,
557    )
558    .await?;
559    runtime.run().await
560}
561
562async fn build_exchange_routes(
563    settings: &LiveSessionSettings,
564    exchanges: &[NamedExchange],
565    symbols: &[Symbol],
566    symbol_codes: &[String],
567    connection_flag: Arc<AtomicBool>,
568    shutdown: ShutdownSignal,
569) -> Result<ExchangeBuildResult> {
570    let mut stream_sources: Vec<(String, Box<dyn LiveMarketStream>)> = Vec::new();
571    let mut router_inputs: HashMap<ExchangeId, Arc<dyn ExecutionClient>> = HashMap::new();
572    let mut routes = Vec::new();
573
574    for exchange in exchanges {
575        let payload = build_exchange_payload(&exchange.config, settings, &exchange.name);
576        let driver = exchange.config.driver.clone();
577        let factory = get_connector_factory(&driver)
578            .ok_or_else(|| anyhow!("driver {} is not registered", driver))?;
579        let stream_config = ConnectorStreamConfig {
580            ws_url: Some(exchange.config.ws_url.clone()),
581            metadata: json!({
582                "category": settings.category.as_path(),
583                "symbols": symbol_codes,
584                "orderbook_depth": settings.orderbook_depth,
585            }),
586            connection_status: Some(connection_flag.clone()),
587        };
588        let mut connector_stream = factory
589            .create_market_stream(&payload, stream_config)
590            .await
591            .map_err(|err| {
592                anyhow!(
593                    "failed to create market stream for {}: {err}",
594                    exchange.name
595                )
596            })?;
597        connector_stream
598            .subscribe(symbol_codes, settings.interval)
599            .await
600            .map_err(|err| anyhow!("failed to subscribe {}: {err}", exchange.name))?;
601        stream_sources.push((
602            exchange.name.clone(),
603            Box::new(FactoryStreamAdapter::new(connector_stream)),
604        ));
605
606        let execution_client =
607            build_single_execution_client(settings, &driver, factory, &payload, symbols).await?;
608        let exchange_id = ExchangeId::from(exchange.name.as_str());
609        router_inputs.insert(exchange_id, execution_client.clone());
610        routes.push(ExchangeRoute {
611            name: exchange.name.clone(),
612            driver,
613            #[cfg(feature = "binance")]
614            ws_url: exchange.config.ws_url.clone(),
615            execution: execution_client.clone(),
616        });
617    }
618
619    let (execution_client, router_handle): (
620        Arc<dyn ExecutionClient>,
621        Option<Arc<RouterExecutionClient>>,
622    ) = if router_inputs.len() == 1 {
623        (router_inputs.into_values().next().unwrap(), None)
624    } else {
625        let router = Arc::new(RouterExecutionClient::new(router_inputs));
626        (router.clone(), Some(router))
627    };
628
629    let market_stream: Box<dyn LiveMarketStream> = if stream_sources.len() == 1 {
630        stream_sources.into_iter().next().unwrap().1
631    } else {
632        Box::new(RouterMarketStream::new(stream_sources, shutdown))
633    };
634
635    Ok(ExchangeBuildResult {
636        execution_client,
637        router: router_handle,
638        market_stream,
639        routes,
640    })
641}
642
643async fn build_single_execution_client(
644    settings: &LiveSessionSettings,
645    driver: &str,
646    connector_factory: Arc<dyn ConnectorFactory>,
647    connector_payload: &Value,
648    symbols: &[Symbol],
649) -> Result<Arc<dyn ExecutionClient>> {
650    match settings.exec_backend {
651        ExecutionBackend::Paper => {
652            if driver == "paper" {
653                return connector_factory
654                    .create_execution_client(connector_payload)
655                    .await
656                    .map_err(|err| anyhow!("failed to create execution client: {err}"));
657            }
658            Ok(Arc::new(PaperExecutionClient::new(
659                format!("paper-{driver}"),
660                symbols.to_vec(),
661                settings.slippage_bps,
662                FeeScheduleConfig::with_defaults(
663                    settings.fee_bps.max(Decimal::ZERO),
664                    settings.fee_bps.max(Decimal::ZERO),
665                )
666                .build_model(),
667            )))
668        }
669        ExecutionBackend::Live => connector_factory
670            .create_execution_client(connector_payload)
671            .await
672            .map_err(|err| anyhow!("failed to create execution client: {err}")),
673    }
674}
675
676struct LiveRuntime {
677    market: Box<dyn LiveMarketStream>,
678    orchestrator: Arc<OrderOrchestrator>,
679    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
680    persisted: Arc<Mutex<LiveState>>,
681    event_bus: Arc<EventBus>,
682    recorder: Option<ParquetRecorder>,
683    control_task: Option<JoinHandle<()>>,
684    shutdown: ShutdownSignal,
685    metrics_task: JoinHandle<()>,
686    alert_task: Option<JoinHandle<()>>,
687    reconciliation_task: Option<JoinHandle<()>>,
688    reconciliation_ctx: Option<Arc<ReconciliationContext>>,
689    private_event_rx: mpsc::Receiver<BrokerEvent>,
690    #[allow(dead_code)]
691    last_private_sync: Arc<tokio::sync::Mutex<Option<DateTime<Utc>>>>,
692    subscriber_handles: Vec<JoinHandle<()>>,
693    connection_monitors: Vec<JoinHandle<()>>,
694    order_timeout_task: JoinHandle<()>,
695    strategy: Arc<Mutex<Box<dyn Strategy>>>,
696    _public_connection: Arc<AtomicBool>,
697    _private_connection: Option<Arc<AtomicBool>>,
698}
699
700struct LiveBootstrap {
701    positions: Vec<Position>,
702    balances: Vec<AccountBalance>,
703    open_orders: Vec<Order>,
704}
705
706impl LiveRuntime {
707    #[allow(clippy::too_many_arguments)]
708    async fn new(
709        market: Box<dyn LiveMarketStream>,
710        mut strategy: Box<dyn Strategy>,
711        symbols: Vec<Symbol>,
712        exchanges: Vec<ExchangeRoute>,
713        router: Option<Arc<RouterExecutionClient>>,
714        orchestrator: OrderOrchestrator,
715        state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
716        settings: LiveSessionSettings,
717        metrics: Arc<LiveMetrics>,
718        alerts: Arc<AlertManager>,
719        market_registry: Arc<MarketRegistry>,
720        shutdown: ShutdownSignal,
721        public_connection: Arc<AtomicBool>,
722        private_connection: Option<Arc<AtomicBool>>,
723        bootstrap: Option<LiveBootstrap>,
724    ) -> Result<Self> {
725        let mut strategy_ctx = StrategyContext::new(settings.history);
726        strategy_ctx.attach_market_registry(market_registry.clone());
727        let mut persisted = match tokio::task::spawn_blocking({
728            let repo = state_repo.clone();
729            move || repo.load()
730        })
731        .await
732        {
733            Ok(Ok(state)) => state,
734            Ok(Err(err)) => {
735                warn!(error = %err, "failed to load live state; starting from defaults");
736                LiveState::default()
737            }
738            Err(err) => {
739                warn!(error = %err, "state load task failed; starting from defaults");
740                LiveState::default()
741            }
742        };
743        let mut live_bootstrap = None;
744        if let Some(data) = bootstrap {
745            persisted.open_orders = data.open_orders;
746            live_bootstrap = Some((data.positions, data.balances));
747        } else if matches!(settings.exec_backend, ExecutionBackend::Live) {
748            warn!("live session missing bootstrap data; continuing without remote snapshot");
749        }
750
751        let portfolio_cfg = PortfolioConfig {
752            initial_balances: settings.initial_balances.clone(),
753            reporting_currency: settings.reporting_currency,
754            max_drawdown: Some(settings.risk.max_drawdown),
755        };
756        let portfolio = if let Some((positions, balances)) = live_bootstrap {
757            Portfolio::from_exchange_state(
758                positions,
759                balances,
760                portfolio_cfg.clone(),
761                market_registry.clone(),
762            )
763        } else if let Some(snapshot) = persisted.portfolio.take() {
764            Portfolio::from_state(snapshot, portfolio_cfg.clone(), market_registry.clone())
765        } else {
766            Portfolio::new(portfolio_cfg.clone(), market_registry.clone())
767        };
768        strategy_ctx.update_positions(portfolio.positions());
769        // Restore strategy state if found in persistence
770        if let Some(state) = persisted.strategy_state.take() {
771            info!("restoring strategy state from persistence");
772            strategy
773                .restore(state)
774                .context("failed to restore strategy state")?;
775        }
776        persisted.portfolio = Some(portfolio.snapshot());
777
778        let mut market_snapshots = HashMap::new();
779        for symbol in &symbols {
780            let mut snapshot = MarketSnapshot::default();
781            if let Some(price) = persisted.last_prices.get(symbol).copied() {
782                snapshot.last_trade = Some(price);
783            }
784            market_snapshots.insert(*symbol, snapshot);
785        }
786
787        metrics.update_connection_status("public", public_connection.load(Ordering::SeqCst));
788        if let Some(flag) = &private_connection {
789            metrics.update_connection_status("private", flag.load(Ordering::SeqCst));
790        }
791        let metrics_task = spawn_metrics_server(metrics.registry(), settings.metrics_addr);
792        let (private_event_tx, private_event_rx) = mpsc::channel(1024);
793        let last_private_sync = Arc::new(tokio::sync::Mutex::new(persisted.last_candle_ts));
794        let alert_task = alerts.spawn_watchdog();
795        let mut connection_monitors = Vec::new();
796        connection_monitors.push(spawn_connection_monitor(
797            shutdown.clone(),
798            public_connection.clone(),
799            metrics.clone(),
800            "public",
801        ));
802        if let Some(flag) = private_connection.clone() {
803            connection_monitors.push(spawn_connection_monitor(
804                shutdown.clone(),
805                flag,
806                metrics.clone(),
807                "private",
808            ));
809        }
810
811        if !settings.exec_backend.is_paper() {
812            let router_handle = router.clone();
813            for route in &exchanges {
814                match route.driver.as_str() {
815                    "bybit" | "" => {
816                        #[cfg(feature = "bybit")]
817                        {
818                            let bybit = route
819                                .execution
820                                .as_ref()
821                                .as_any()
822                                .downcast_ref::<BybitClient>()
823                                .ok_or_else(|| {
824                                    anyhow!("execution client for {} is not Bybit", route.name)
825                                })?;
826                            let creds = bybit.get_credentials().ok_or_else(|| {
827                                anyhow!("live execution requires Bybit credentials")
828                            })?;
829                            spawn_bybit_private_stream(
830                                creds,
831                                bybit.get_ws_url(),
832                                private_event_tx.clone(),
833                                route.execution.clone(),
834                                symbols.clone(),
835                                last_private_sync.clone(),
836                                private_connection.clone(),
837                                metrics.clone(),
838                                router_handle.clone(),
839                                shutdown.clone(),
840                            );
841                        }
842                        #[cfg(not(feature = "bybit"))]
843                        {
844                            bail!("driver 'bybit' is unavailable without the 'bybit' feature");
845                        }
846                    }
847                    "binance" => {
848                        #[cfg(feature = "binance")]
849                        {
850                            spawn_binance_private_stream(
851                                route.execution.clone(),
852                                route.ws_url.clone(),
853                                private_event_tx.clone(),
854                                private_connection.clone(),
855                                metrics.clone(),
856                                router_handle.clone(),
857                                shutdown.clone(),
858                            );
859                        }
860                        #[cfg(not(feature = "binance"))]
861                        {
862                            bail!("driver 'binance' is unavailable without the 'binance' feature");
863                        }
864                    }
865                    "paper" => {}
866                    other => {
867                        bail!("private stream unsupported for driver '{other}'");
868                    }
869                }
870            }
871        }
872
873        let recorder = if let Some(record_path) = settings.record_path.clone() {
874            let config = RecorderConfig {
875                root: record_path.clone(),
876                ..RecorderConfig::default()
877            };
878            match ParquetRecorder::spawn(config).await {
879                Ok(recorder) => {
880                    info!(path = %record_path.display(), "flight recorder enabled");
881                    Some(recorder)
882                }
883                Err(err) => {
884                    warn!(
885                        error = %err,
886                        path = %record_path.display(),
887                        "failed to start flight recorder"
888                    );
889                    None
890                }
891            }
892        } else {
893            None
894        };
895        let recorder_handle = recorder.as_ref().map(|rec| rec.handle());
896
897        let strategy = Arc::new(Mutex::new(strategy));
898        let strategy_ctx = Arc::new(Mutex::new(strategy_ctx));
899        let portfolio = Arc::new(Mutex::new(portfolio));
900        let market_cache = Arc::new(Mutex::new(market_snapshots));
901        let persisted = Arc::new(Mutex::new(persisted));
902        let orchestrator = Arc::new(orchestrator);
903        let event_bus = Arc::new(EventBus::new(2048));
904        let last_data_timestamp = Arc::new(AtomicI64::new(0));
905        let control_task = control::spawn_control_plane(
906            settings.control_addr,
907            control::ControlPlaneComponents {
908                portfolio: portfolio.clone(),
909                orchestrator: orchestrator.clone(),
910                persisted: persisted.clone(),
911                last_data_timestamp: last_data_timestamp.clone(),
912                event_bus: event_bus.clone(),
913                strategy: strategy.clone(),
914                shutdown: shutdown.clone(),
915            },
916        );
917        let reconciliation_ctx = (!settings.exec_backend.is_paper()).then(|| {
918            Arc::new(ReconciliationContext::new(ReconciliationContextConfig {
919                client: orchestrator.execution_engine().client(),
920                portfolio: portfolio.clone(),
921                persisted: persisted.clone(),
922                state_repo: state_repo.clone(),
923                alerts: alerts.clone(),
924                metrics: metrics.clone(),
925                reporting_currency: settings.reporting_currency,
926                threshold: settings.reconciliation_threshold,
927            }))
928        });
929        let reconciliation_task = reconciliation_ctx.as_ref().map(|ctx| {
930            spawn_reconciliation_loop(
931                ctx.clone(),
932                shutdown.clone(),
933                settings.reconciliation_interval,
934            )
935        });
936        let driver_summary = Arc::new(if exchanges.is_empty() {
937            "unknown".to_string()
938        } else {
939            exchanges
940                .iter()
941                .map(|route| route.driver.clone())
942                .collect::<Vec<_>>()
943                .join(",")
944        });
945        let subscriber_handles = spawn_event_subscribers(
946            event_bus.clone(),
947            strategy.clone(),
948            strategy_ctx.clone(),
949            orchestrator.clone(),
950            portfolio.clone(),
951            metrics.clone(),
952            alerts.clone(),
953            market_cache.clone(),
954            state_repo.clone(),
955            persisted.clone(),
956            settings.exec_backend,
957            recorder_handle.clone(),
958            last_data_timestamp.clone(),
959            driver_summary.clone(),
960            market_registry.clone(),
961        );
962        let order_timeout_task = spawn_order_timeout_monitor(
963            orchestrator.clone(),
964            event_bus.clone(),
965            alerts.clone(),
966            shutdown.clone(),
967        );
968
969        info!(
970            symbols = ?symbols,
971            category = ?settings.category,
972            metrics_addr = %settings.metrics_addr,
973            state_path = %settings.persistence.state_path.display(),
974            persistence_engine = ?settings.persistence.engine,
975            history = settings.history,
976            "market stream ready"
977        );
978
979        for symbol in &symbols {
980            let ctx = shared_risk_context(
981                *symbol,
982                &portfolio,
983                &market_cache,
984                &persisted,
985                &market_registry,
986            )
987            .await;
988            orchestrator.update_risk_context(*symbol, ctx);
989        }
990
991        Ok(Self {
992            market,
993            orchestrator,
994            state_repo,
995            persisted,
996            event_bus,
997            recorder,
998            control_task: Some(control_task),
999            shutdown,
1000            metrics_task,
1001            alert_task,
1002            reconciliation_task,
1003            reconciliation_ctx,
1004            private_event_rx,
1005            last_private_sync,
1006            subscriber_handles,
1007            connection_monitors,
1008            order_timeout_task,
1009            strategy,
1010            _public_connection: public_connection,
1011            _private_connection: private_connection,
1012        })
1013    }
1014
1015    async fn run(mut self) -> Result<()> {
1016        info!("live session started");
1017        if let Some(ctx) = self.reconciliation_ctx.as_ref() {
1018            perform_state_reconciliation(ctx.as_ref())
1019                .await
1020                .context("initial state reconciliation failed")?;
1021        }
1022        let backoff = Duration::from_millis(200);
1023        let mut orchestrator_timer = tokio::time::interval(Duration::from_secs(1));
1024
1025        'run: while !self.shutdown.triggered() {
1026            let mut progressed = false;
1027
1028            let tick = tokio::select! {
1029                res = tokio::time::timeout(MARKET_EVENT_TIMEOUT, self.market.next_tick()) => Some(res),
1030                _ = self.shutdown.wait() => None,
1031            };
1032            match tick {
1033                Some(Ok(Ok(Some(tick)))) => {
1034                    progressed = true;
1035                    self.event_bus.publish(Event::Tick(TickEvent { tick }));
1036                }
1037                Some(Ok(Ok(None))) => {}
1038                Some(Ok(Err(err))) => return Err(err.into()),
1039                Some(Err(_)) => {}
1040                None => break 'run,
1041            }
1042
1043            let candle = tokio::select! {
1044                res = tokio::time::timeout(MARKET_EVENT_TIMEOUT, self.market.next_candle()) => Some(res),
1045                _ = self.shutdown.wait() => None,
1046            };
1047            match candle {
1048                Some(Ok(Ok(Some(candle)))) => {
1049                    progressed = true;
1050                    self.event_bus
1051                        .publish(Event::Candle(CandleEvent { candle }));
1052                }
1053                Some(Ok(Ok(None))) => {}
1054                Some(Ok(Err(err))) => return Err(err.into()),
1055                Some(Err(_)) => {}
1056                None => break 'run,
1057            }
1058
1059            let book = tokio::select! {
1060                res = tokio::time::timeout(MARKET_EVENT_TIMEOUT, self.market.next_order_book()) => Some(res),
1061                _ = self.shutdown.wait() => None,
1062            };
1063            match book {
1064                Some(Ok(Ok(Some(book)))) => {
1065                    progressed = true;
1066                    self.event_bus
1067                        .publish(Event::OrderBook(OrderBookEvent { order_book: book }));
1068                }
1069                Some(Ok(Ok(None))) => {}
1070                Some(Ok(Err(err))) => return Err(err.into()),
1071                Some(Err(_)) => {}
1072                None => break 'run,
1073            }
1074
1075            tokio::select! {
1076                biased;
1077                Some(event) = self.private_event_rx.recv() => {
1078                    progressed = true;
1079                    match event {
1080                        BrokerEvent::OrderUpdate(order) => {
1081                            info!(
1082                                order_id = %order.id,
1083                                status = ?order.status,
1084                                symbol = %order.request.symbol,
1085                                "received private order update"
1086                            );
1087                            self.event_bus
1088                                .publish(Event::OrderUpdate(OrderUpdateEvent { order }));
1089                        }
1090                        BrokerEvent::Fill(fill) => {
1091                            info!(
1092                                order_id = %fill.order_id,
1093                                symbol = %fill.symbol,
1094                                qty = %fill.fill_quantity,
1095                                price = %fill.fill_price,
1096                                "received private fill"
1097                            );
1098                            self.event_bus.publish(Event::Fill(FillEvent { fill }));
1099                        }
1100                    }
1101                }
1102                _ = orchestrator_timer.tick() => {
1103                    // Drive TWAP and other time-based algorithms
1104                    if let Err(e) = self.orchestrator.on_timer_tick().await {
1105                        error!("Orchestrator timer tick failed: {}", e);
1106                    }
1107                }
1108                _ = self.shutdown.wait() => break 'run,
1109                else => {}
1110            }
1111
1112            if !progressed && !self.shutdown.sleep(backoff).await {
1113                break;
1114            }
1115        }
1116        info!("live session stopping");
1117        self.metrics_task.abort();
1118        if let Some(handle) = self.alert_task.take() {
1119            handle.abort();
1120        }
1121        if let Some(handle) = self.reconciliation_task.take() {
1122            handle.abort();
1123        }
1124        self.order_timeout_task.abort();
1125        for handle in self.subscriber_handles.drain(..) {
1126            handle.abort();
1127        }
1128        for handle in self.connection_monitors.drain(..) {
1129            handle.abort();
1130        }
1131        if let Err(err) = persist_state(
1132            self.state_repo.clone(),
1133            self.persisted.clone(),
1134            Some(self.strategy.clone()),
1135        )
1136        .await
1137        {
1138            warn!(error = %err, "failed to persist shutdown state");
1139        }
1140        if let Some(task) = self.control_task.take() {
1141            if let Err(err) = task.await {
1142                warn!(error = %err, "control plane server task aborted");
1143            }
1144        }
1145        if let Some(recorder) = self.recorder.take() {
1146            if let Err(err) = recorder.shutdown().await {
1147                warn!(error = %err, "failed to flush flight recorder");
1148            }
1149        }
1150        Ok(())
1151    }
1152}
1153
1154struct ReconciliationContext {
1155    client: Arc<dyn ExecutionClient>,
1156    portfolio: Arc<Mutex<Portfolio>>,
1157    persisted: Arc<Mutex<LiveState>>,
1158    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1159    alerts: Arc<AlertManager>,
1160    metrics: Arc<LiveMetrics>,
1161    reporting_currency: AssetId,
1162    threshold: Decimal,
1163}
1164
1165struct ReconciliationContextConfig {
1166    client: Arc<dyn ExecutionClient>,
1167    portfolio: Arc<Mutex<Portfolio>>,
1168    persisted: Arc<Mutex<LiveState>>,
1169    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1170    alerts: Arc<AlertManager>,
1171    metrics: Arc<LiveMetrics>,
1172    reporting_currency: AssetId,
1173    threshold: Decimal,
1174}
1175
1176impl ReconciliationContext {
1177    fn new(config: ReconciliationContextConfig) -> Self {
1178        let ReconciliationContextConfig {
1179            client,
1180            portfolio,
1181            persisted,
1182            state_repo,
1183            alerts,
1184            metrics,
1185            reporting_currency,
1186            threshold,
1187        } = config;
1188        let min_threshold = Decimal::new(1, 6); // 0.000001 as a practical floor
1189        let threshold = if threshold <= Decimal::ZERO {
1190            min_threshold
1191        } else {
1192            threshold
1193        };
1194        Self {
1195            client,
1196            portfolio,
1197            persisted,
1198            state_repo,
1199            alerts,
1200            metrics,
1201            reporting_currency,
1202            threshold,
1203        }
1204    }
1205}
1206
1207fn spawn_reconciliation_loop(
1208    ctx: Arc<ReconciliationContext>,
1209    shutdown: ShutdownSignal,
1210    interval: Duration,
1211) -> JoinHandle<()> {
1212    tokio::spawn(async move {
1213        while shutdown.sleep(interval).await {
1214            if let Err(err) = perform_state_reconciliation(ctx.as_ref()).await {
1215                error!(error = %err, "periodic state reconciliation failed");
1216            }
1217        }
1218    })
1219}
1220
1221async fn perform_state_reconciliation(ctx: &ReconciliationContext) -> Result<()> {
1222    info!("running state reconciliation");
1223    let remote_positions = ctx
1224        .client
1225        .positions()
1226        .await
1227        .context("failed to fetch remote positions")?;
1228    let remote_balances = ctx
1229        .client
1230        .account_balances()
1231        .await
1232        .context("failed to fetch remote balances")?;
1233    let (local_positions, local_cash) = {
1234        let guard = ctx.portfolio.lock().await;
1235        (guard.positions(), guard.cash())
1236    };
1237
1238    let remote_map = positions_to_map(remote_positions);
1239    let local_map = positions_to_map(local_positions);
1240    let mut tracked_symbols: HashSet<Symbol> = HashSet::new();
1241    tracked_symbols.extend(remote_map.keys().cloned());
1242    tracked_symbols.extend(local_map.keys().cloned());
1243
1244    let mut severe_findings = Vec::new();
1245    for symbol in tracked_symbols {
1246        let local_qty = local_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
1247        let remote_qty = remote_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
1248        let diff = (local_qty - remote_qty).abs();
1249        let diff_value = diff.to_f64().unwrap_or(0.0);
1250        let symbol_name = symbol.code().to_string();
1251        ctx.metrics.update_position_diff(&symbol_name, diff_value);
1252        if diff > Decimal::ZERO {
1253            warn!(
1254                symbol = %symbol_name,
1255                local = %local_qty,
1256                remote = %remote_qty,
1257                diff = %diff,
1258                "position mismatch detected during reconciliation"
1259            );
1260            let pct = normalize_diff(diff, remote_qty);
1261            if pct >= ctx.threshold {
1262                error!(
1263                    symbol = %symbol_name,
1264                    local = %local_qty,
1265                    remote = %remote_qty,
1266                    diff = %diff,
1267                    pct = %pct,
1268                    "position mismatch exceeds threshold"
1269                );
1270                severe_findings.push(format!(
1271                    "{symbol_name} local={local_qty} remote={remote_qty} diff={diff}"
1272                ));
1273            }
1274        }
1275    }
1276
1277    let reporting = ctx.reporting_currency;
1278    let reporting_label = reporting.to_string();
1279    let remote_cash = remote_balances
1280        .iter()
1281        .find(|balance| balance.asset == reporting)
1282        .map(|balance| balance.available)
1283        .unwrap_or_else(|| Decimal::ZERO);
1284    let cash_diff = (remote_cash - local_cash).abs();
1285    ctx.metrics
1286        .update_balance_diff(&reporting_label, cash_diff.to_f64().unwrap_or(0.0));
1287    if cash_diff > Decimal::ZERO {
1288        warn!(
1289            currency = %reporting_label,
1290            local = %local_cash,
1291            remote = %remote_cash,
1292            diff = %cash_diff,
1293            "balance mismatch detected during reconciliation"
1294        );
1295        let pct = normalize_diff(cash_diff, remote_cash);
1296        if pct >= ctx.threshold {
1297            error!(
1298                currency = %reporting_label,
1299                local = %local_cash,
1300                remote = %remote_cash,
1301                diff = %cash_diff,
1302                pct = %pct,
1303                "balance mismatch exceeds threshold"
1304            );
1305            severe_findings.push(format!(
1306                "{reporting_label} balance local={local_cash} remote={remote_cash} diff={cash_diff}"
1307            ));
1308        }
1309    }
1310
1311    if severe_findings.is_empty() {
1312        info!("state reconciliation complete with no critical divergence");
1313        return Ok(());
1314    }
1315
1316    let alert_body = severe_findings.join("; ");
1317    ctx.alerts
1318        .notify("State reconciliation divergence", &alert_body)
1319        .await;
1320    enforce_liquidate_only(ctx).await;
1321    Ok(())
1322}
1323
1324async fn enforce_liquidate_only(ctx: &ReconciliationContext) {
1325    let snapshot = {
1326        let mut guard = ctx.portfolio.lock().await;
1327        if !guard.set_liquidate_only(true) {
1328            return;
1329        }
1330        info!("entering liquidate-only mode due to reconciliation divergence");
1331        guard.snapshot()
1332    };
1333    {
1334        let mut state = ctx.persisted.lock().await;
1335        state.portfolio = Some(snapshot);
1336    }
1337    if let Err(err) = persist_state(ctx.state_repo.clone(), ctx.persisted.clone(), None).await {
1338        warn!(error = %err, "failed to persist liquidate-only transition");
1339    }
1340}
1341
1342fn positions_to_map(positions: Vec<Position>) -> HashMap<Symbol, Decimal> {
1343    let mut map = HashMap::new();
1344    for position in positions {
1345        map.insert(position.symbol, position_signed_qty(&position));
1346    }
1347    map
1348}
1349
1350fn position_signed_qty(position: &Position) -> Decimal {
1351    match position.side {
1352        Some(Side::Buy) => position.quantity,
1353        Some(Side::Sell) => -position.quantity,
1354        None => Decimal::ZERO,
1355    }
1356}
1357
1358fn normalize_diff(diff: Decimal, reference: Decimal) -> Decimal {
1359    if diff <= Decimal::ZERO {
1360        Decimal::ZERO
1361    } else {
1362        let denominator = std::cmp::max(reference.abs(), Decimal::ONE);
1363        diff / denominator
1364    }
1365}
1366
1367fn build_exchange_payload(
1368    exchange: &ExchangeConfig,
1369    settings: &LiveSessionSettings,
1370    name: &str,
1371) -> Value {
1372    let mut payload = serde_json::Map::new();
1373    payload.insert("rest_url".into(), Value::String(exchange.rest_url.clone()));
1374    payload.insert("ws_url".into(), Value::String(exchange.ws_url.clone()));
1375    payload.insert("api_key".into(), Value::String(exchange.api_key.clone()));
1376    payload.insert(
1377        "api_secret".into(),
1378        Value::String(exchange.api_secret.clone()),
1379    );
1380    payload.insert(
1381        "category".into(),
1382        Value::String(settings.category.as_path().to_string()),
1383    );
1384    payload.insert("exchange".into(), Value::String(name.to_string()));
1385    payload.insert(
1386        "orderbook_depth".into(),
1387        Value::Number(serde_json::Number::from(settings.orderbook_depth as u64)),
1388    );
1389    if let Value::Object(extra) = exchange.params.clone() {
1390        for (key, value) in extra {
1391            payload.insert(key, value);
1392        }
1393    }
1394    Value::Object(payload)
1395}
1396
1397#[derive(Default)]
1398struct MarketSnapshot {
1399    last_trade: Option<Price>,
1400    last_trade_ts: Option<DateTime<Utc>>,
1401    last_candle: Option<Candle>,
1402}
1403
1404impl MarketSnapshot {
1405    fn price(&self) -> Option<Price> {
1406        self.last_trade
1407            .or_else(|| self.last_candle.as_ref().map(|c| c.close))
1408    }
1409}
1410
1411pub struct ShutdownSignal {
1412    flag: Arc<AtomicBool>,
1413    notify: Arc<Notify>,
1414}
1415
1416impl ShutdownSignal {
1417    pub fn new() -> Self {
1418        let flag = Arc::new(AtomicBool::new(false));
1419        let notify = Arc::new(Notify::new());
1420        let flag_clone = flag.clone();
1421        let notify_clone = notify.clone();
1422        tokio::spawn(async move {
1423            if tokio::signal::ctrl_c().await.is_ok() {
1424                flag_clone.store(true, Ordering::SeqCst);
1425                notify_clone.notify_waiters();
1426            }
1427        });
1428        Self { flag, notify }
1429    }
1430
1431    pub fn trigger(&self) {
1432        self.flag.store(true, Ordering::SeqCst);
1433        self.notify.notify_waiters();
1434    }
1435
1436    pub fn triggered(&self) -> bool {
1437        self.flag.load(Ordering::SeqCst)
1438    }
1439
1440    pub async fn wait(&self) {
1441        if self.triggered() {
1442            return;
1443        }
1444        self.notify.notified().await;
1445    }
1446
1447    async fn sleep(&self, duration: Duration) -> bool {
1448        tokio::select! {
1449            _ = tokio::time::sleep(duration) => true,
1450            _ = self.notify.notified() => false,
1451        }
1452    }
1453}
1454
1455impl Default for ShutdownSignal {
1456    fn default() -> Self {
1457        Self::new()
1458    }
1459}
1460
1461impl Clone for ShutdownSignal {
1462    fn clone(&self) -> Self {
1463        Self {
1464            flag: self.flag.clone(),
1465            notify: self.notify.clone(),
1466        }
1467    }
1468}
1469
1470#[allow(clippy::too_many_arguments)]
1471fn spawn_event_subscribers(
1472    bus: Arc<EventBus>,
1473    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1474    strategy_ctx: Arc<Mutex<StrategyContext>>,
1475    orchestrator: Arc<OrderOrchestrator>,
1476    portfolio: Arc<Mutex<Portfolio>>,
1477    metrics: Arc<LiveMetrics>,
1478    alerts: Arc<AlertManager>,
1479    market: Arc<Mutex<HashMap<Symbol, MarketSnapshot>>>,
1480    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1481    persisted: Arc<Mutex<LiveState>>,
1482    exec_backend: ExecutionBackend,
1483    recorder: Option<RecorderHandle>,
1484    last_data_timestamp: Arc<AtomicI64>,
1485    driver: Arc<String>,
1486    market_registry: Arc<MarketRegistry>,
1487) -> Vec<JoinHandle<()>> {
1488    let mut handles = Vec::new();
1489    let market_recorder = recorder.clone();
1490
1491    let market_bus = bus.clone();
1492    let market_strategy = strategy.clone();
1493    let market_ctx = strategy_ctx.clone();
1494    let market_metrics = metrics.clone();
1495    let market_alerts = alerts.clone();
1496    let market_state = state_repo.clone();
1497    let market_persisted = persisted.clone();
1498    let market_portfolio = portfolio.clone();
1499    let market_snapshot = market.clone();
1500    let orchestrator_clone = orchestrator.clone();
1501    let market_data_tracker = last_data_timestamp.clone();
1502    let market_catalog = market_registry.clone();
1503    let driver_clone = driver.clone();
1504    handles.push(tokio::spawn(async move {
1505        let recorder = market_recorder;
1506        let mut stream = market_bus.subscribe();
1507        loop {
1508            match stream.recv().await {
1509                Ok(Event::Tick(evt)) => {
1510                    if let Some(handle) = recorder.as_ref() {
1511                        handle.record_tick(evt.tick.clone());
1512                    }
1513                    if let Err(err) = process_tick_event(
1514                        evt.tick,
1515                        market_strategy.clone(),
1516                        market_ctx.clone(),
1517                        market_metrics.clone(),
1518                        market_alerts.clone(),
1519                        market_snapshot.clone(),
1520                        market_portfolio.clone(),
1521                        market_state.clone(),
1522                        market_persisted.clone(),
1523                        market_bus.clone(),
1524                        market_data_tracker.clone(),
1525                        market_catalog.clone(),
1526                    )
1527                    .await
1528                    {
1529                        warn!(error = %err, "tick handler failed");
1530                    }
1531                }
1532                Ok(Event::Candle(evt)) => {
1533                    if let Some(handle) = recorder.as_ref() {
1534                        handle.record_candle(evt.candle.clone());
1535                    }
1536                    if let Err(err) = process_candle_event(
1537                        evt.candle,
1538                        market_strategy.clone(),
1539                        market_ctx.clone(),
1540                        market_metrics.clone(),
1541                        market_alerts.clone(),
1542                        market_snapshot.clone(),
1543                        market_portfolio.clone(),
1544                        orchestrator_clone.clone(),
1545                        exec_backend,
1546                        market_state.clone(),
1547                        market_persisted.clone(),
1548                        market_bus.clone(),
1549                        market_data_tracker.clone(),
1550                        market_catalog.clone(),
1551                    )
1552                    .await
1553                    {
1554                        warn!(error = %err, "candle handler failed");
1555                    }
1556                }
1557                Ok(Event::OrderBook(evt)) => {
1558                    if let Some(handle) = recorder.as_ref() {
1559                        handle.record_order_book(evt.order_book.clone());
1560                    }
1561                    if let Err(err) = process_order_book_event(
1562                        evt.order_book,
1563                        market_strategy.clone(),
1564                        market_ctx.clone(),
1565                        market_metrics.clone(),
1566                        market_alerts.clone(),
1567                        market_snapshot.clone(),
1568                        market_bus.clone(),
1569                        market_data_tracker.clone(),
1570                        driver_clone.clone(),
1571                        market_catalog.clone(),
1572                    )
1573                    .await
1574                    {
1575                        warn!(error = %err, "order book handler failed");
1576                    }
1577                }
1578                Ok(_) => {}
1579                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1580                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1581                    warn!(lag = lag, "market subscriber lagged");
1582                    continue;
1583                }
1584            }
1585        }
1586    }));
1587
1588    let exec_bus = bus.clone();
1589    let exec_portfolio = portfolio.clone();
1590    let exec_market = market.clone();
1591    let exec_persisted = persisted.clone();
1592    let exec_alerts = alerts.clone();
1593    let exec_metrics = metrics.clone();
1594    let exec_orchestrator = orchestrator.clone();
1595    let exec_recorder = recorder.clone();
1596    handles.push(tokio::spawn(async move {
1597        let orchestrator = exec_orchestrator.clone();
1598        let recorder = exec_recorder;
1599        let mut stream = exec_bus.subscribe();
1600        loop {
1601            match stream.recv().await {
1602                Ok(Event::Signal(evt)) => {
1603                    if let Some(handle) = recorder.as_ref() {
1604                        handle.record_signal(evt.signal.clone());
1605                    }
1606                    if let Err(err) = process_signal_event(
1607                        evt.signal,
1608                        orchestrator.clone(),
1609                        exec_portfolio.clone(),
1610                        exec_market.clone(),
1611                        exec_persisted.clone(),
1612                        exec_alerts.clone(),
1613                        exec_metrics.clone(),
1614                        market_registry.clone(),
1615                    )
1616                    .await
1617                    {
1618                        warn!(error = %err, "signal handler failed");
1619                    }
1620                }
1621                Ok(_) => {}
1622                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1623                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1624                    warn!(lag = lag, "signal subscriber lagged");
1625                    continue;
1626                }
1627            }
1628        }
1629    }));
1630
1631    let fill_bus = bus.clone();
1632    let fill_state = state_repo.clone();
1633    let fill_orchestrator = orchestrator.clone();
1634    let fill_persisted = persisted.clone();
1635    let fill_alerts = alerts.clone();
1636    let fill_recorder = recorder.clone();
1637    handles.push(tokio::spawn(async move {
1638        let orchestrator = fill_orchestrator.clone();
1639        let persisted = fill_persisted.clone();
1640        let recorder = fill_recorder;
1641        let mut stream = fill_bus.subscribe();
1642        loop {
1643            match stream.recv().await {
1644                Ok(Event::Fill(evt)) => {
1645                    if let Some(handle) = recorder.as_ref() {
1646                        handle.record_fill(evt.fill.clone());
1647                    }
1648                    if let Err(err) = process_fill_event(
1649                        evt.fill,
1650                        portfolio.clone(),
1651                        strategy.clone(),
1652                        strategy_ctx.clone(),
1653                        orchestrator.clone(),
1654                        metrics.clone(),
1655                        fill_alerts.clone(),
1656                        fill_state.clone(),
1657                        persisted.clone(),
1658                    )
1659                    .await
1660                    {
1661                        warn!(error = ?err, "fill handler failed");
1662                    }
1663                }
1664                Ok(_) => {}
1665                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1666                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1667                    warn!(lag = lag, "fill subscriber lagged");
1668                    continue;
1669                }
1670            }
1671        }
1672    }));
1673
1674    let order_bus = bus.clone();
1675    let order_persisted = persisted.clone();
1676    let order_alerts = alerts.clone();
1677    let order_orchestrator = orchestrator.clone();
1678    // Note: We don't pass strategy to order update handler to avoid lock contention
1679    // on high-frequency updates. Strategy state is snapshotted on candles/fills.
1680    let order_recorder = recorder;
1681    handles.push(tokio::spawn(async move {
1682        let orchestrator = order_orchestrator.clone();
1683        let persisted = order_persisted.clone();
1684        let recorder = order_recorder;
1685        let mut stream = order_bus.subscribe();
1686        loop {
1687            match stream.recv().await {
1688                Ok(Event::OrderUpdate(evt)) => {
1689                    if let Some(handle) = recorder.as_ref() {
1690                        handle.record_order(evt.order.clone());
1691                    }
1692                    if let Err(err) = process_order_update_event(
1693                        evt.order,
1694                        orchestrator.clone(),
1695                        order_alerts.clone(),
1696                        state_repo.clone(),
1697                        persisted.clone(),
1698                    )
1699                    .await
1700                    {
1701                        warn!(error = %err, "order update handler failed");
1702                    }
1703                }
1704                Ok(_) => {}
1705                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1706                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1707                    warn!(lag = lag, "order subscriber lagged");
1708                    continue;
1709                }
1710            }
1711        }
1712    }));
1713
1714    handles
1715}
1716
1717#[allow(clippy::too_many_arguments)]
1718async fn process_tick_event(
1719    tick: Tick,
1720    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1721    strategy_ctx: Arc<Mutex<StrategyContext>>,
1722    metrics: Arc<LiveMetrics>,
1723    alerts: Arc<AlertManager>,
1724    market: Arc<Mutex<HashMap<Symbol, MarketSnapshot>>>,
1725    portfolio: Arc<Mutex<Portfolio>>,
1726    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1727    persisted: Arc<Mutex<LiveState>>,
1728    bus: Arc<EventBus>,
1729    last_data_timestamp: Arc<AtomicI64>,
1730    market_registry: Arc<MarketRegistry>,
1731) -> Result<()> {
1732    metrics.inc_tick();
1733    metrics.update_staleness(0.0);
1734    metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1735    last_data_timestamp.store(tick.exchange_timestamp.timestamp(), Ordering::SeqCst);
1736    alerts.heartbeat().await;
1737    {
1738        let mut guard = market.lock().await;
1739        if let Some(snapshot) = guard.get_mut(&tick.symbol) {
1740            snapshot.last_trade = Some(tick.price);
1741            snapshot.last_trade_ts = Some(tick.exchange_timestamp);
1742        }
1743    }
1744    let mut drawdown_triggered = false;
1745    let mut snapshot_on_trigger = None;
1746    {
1747        let mut guard = portfolio.lock().await;
1748        let was_liquidate_only = guard.liquidate_only();
1749        match guard.update_market_data(tick.symbol, tick.price) {
1750            Ok(_) => {
1751                if !was_liquidate_only && guard.liquidate_only() {
1752                    drawdown_triggered = true;
1753                    snapshot_on_trigger = Some(guard.snapshot());
1754                }
1755            }
1756            Err(err) => {
1757                warn!(
1758                    symbol = %tick.symbol,
1759                    error = %err,
1760                    "failed to refresh market data"
1761                );
1762            }
1763        }
1764    }
1765    {
1766        let mut state = persisted.lock().await;
1767        state.last_prices.insert(tick.symbol, tick.price);
1768        if drawdown_triggered {
1769            if let Some(snapshot) = snapshot_on_trigger.take() {
1770                state.portfolio = Some(snapshot);
1771            }
1772        }
1773    }
1774    if drawdown_triggered {
1775        persist_state(
1776            state_repo.clone(),
1777            persisted.clone(),
1778            Some(strategy.clone()),
1779        )
1780        .await?;
1781        alert_liquidate_only(alerts.clone()).await;
1782    }
1783    {
1784        let mut ctx = strategy_ctx.lock().await;
1785        ctx.push_tick(tick.clone());
1786        let lock_start = Instant::now();
1787        let mut strat = strategy.lock().await;
1788        log_strategy_lock("tick", lock_start.elapsed());
1789        let call_start = Instant::now();
1790        strat
1791            .on_tick(&ctx, &tick)
1792            .await
1793            .context("strategy failure on tick event")?;
1794        log_strategy_call("tick", call_start.elapsed());
1795    }
1796    emit_signals(
1797        strategy.clone(),
1798        bus.clone(),
1799        metrics.clone(),
1800        market_registry.clone(),
1801    )
1802    .await;
1803    debug!(symbol = %tick.symbol, price = %tick.price, "completed tick processing");
1804    Ok(())
1805}
1806
1807#[allow(clippy::too_many_arguments)]
1808async fn process_candle_event(
1809    candle: Candle,
1810    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1811    strategy_ctx: Arc<Mutex<StrategyContext>>,
1812    metrics: Arc<LiveMetrics>,
1813    alerts: Arc<AlertManager>,
1814    market: Arc<Mutex<HashMap<Symbol, MarketSnapshot>>>,
1815    portfolio: Arc<Mutex<Portfolio>>,
1816    orchestrator: Arc<OrderOrchestrator>,
1817    exec_backend: ExecutionBackend,
1818    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1819    persisted: Arc<Mutex<LiveState>>,
1820    bus: Arc<EventBus>,
1821    last_data_timestamp: Arc<AtomicI64>,
1822    market_registry: Arc<MarketRegistry>,
1823) -> Result<()> {
1824    metrics.inc_candle();
1825    metrics.update_staleness(0.0);
1826    metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1827    last_data_timestamp.store(candle.timestamp.timestamp(), Ordering::SeqCst);
1828    alerts.heartbeat().await;
1829    let candle_label = candle.symbol.code().to_string();
1830    metrics.update_price(&candle_label, candle.close.to_f64().unwrap_or(0.0));
1831    {
1832        let mut guard = market.lock().await;
1833        if let Some(snapshot) = guard.get_mut(&candle.symbol) {
1834            snapshot.last_candle = Some(candle.clone());
1835            snapshot.last_trade = Some(candle.close);
1836        }
1837    }
1838    if exec_backend.is_paper() {
1839        let client = orchestrator.execution_engine().client();
1840        if let Some(paper) = client.as_any().downcast_ref::<PaperExecutionClient>() {
1841            paper.update_price(&candle.symbol, candle.close);
1842        }
1843    }
1844    let mut candle_drawdown_triggered = false;
1845    let mut candle_snapshot = None;
1846    {
1847        let mut guard = portfolio.lock().await;
1848        let was_liquidate_only = guard.liquidate_only();
1849        match guard.update_market_data(candle.symbol, candle.close) {
1850            Ok(_) => {
1851                if !was_liquidate_only && guard.liquidate_only() {
1852                    candle_drawdown_triggered = true;
1853                    candle_snapshot = Some(guard.snapshot());
1854                }
1855            }
1856            Err(err) => {
1857                warn!(
1858                    symbol = %candle.symbol,
1859                    error = %err,
1860                    "failed to refresh market data"
1861                );
1862            }
1863        }
1864    }
1865    if candle_drawdown_triggered {
1866        if let Some(snapshot) = candle_snapshot.take() {
1867            let mut persisted_guard = persisted.lock().await;
1868            persisted_guard.portfolio = Some(snapshot);
1869        }
1870        alert_liquidate_only(alerts.clone()).await;
1871    }
1872    {
1873        let mut ctx = strategy_ctx.lock().await;
1874        ctx.push_candle(candle.clone());
1875        let lock_start = Instant::now();
1876        let mut strat = strategy.lock().await;
1877        log_strategy_lock("candle", lock_start.elapsed());
1878        let call_start = Instant::now();
1879        strat
1880            .on_candle(&ctx, &candle)
1881            .await
1882            .context("strategy failure on candle event")?;
1883        log_strategy_call("candle", call_start.elapsed());
1884    }
1885    {
1886        let mut snapshot = persisted.lock().await;
1887        snapshot.last_candle_ts = Some(candle.timestamp);
1888        snapshot.last_prices.insert(candle.symbol, candle.close);
1889    }
1890    persist_state(
1891        state_repo.clone(),
1892        persisted.clone(),
1893        Some(strategy.clone()),
1894    )
1895    .await?;
1896    let ctx = shared_risk_context(
1897        candle.symbol,
1898        &portfolio,
1899        &market,
1900        &persisted,
1901        &market_registry,
1902    )
1903    .await;
1904    orchestrator.update_risk_context(candle.symbol, ctx);
1905    emit_signals(
1906        strategy.clone(),
1907        bus.clone(),
1908        metrics.clone(),
1909        market_registry.clone(),
1910    )
1911    .await;
1912    debug!(symbol = %candle.symbol, close = %candle.close, "completed candle processing");
1913    Ok(())
1914}
1915
1916#[allow(clippy::too_many_arguments)]
1917async fn process_order_book_event(
1918    mut book: OrderBook,
1919    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1920    strategy_ctx: Arc<Mutex<StrategyContext>>,
1921    metrics: Arc<LiveMetrics>,
1922    alerts: Arc<AlertManager>,
1923    _market: Arc<Mutex<HashMap<Symbol, MarketSnapshot>>>,
1924    bus: Arc<EventBus>,
1925    last_data_timestamp: Arc<AtomicI64>,
1926    driver: Arc<String>,
1927    market_registry: Arc<MarketRegistry>,
1928) -> Result<()> {
1929    metrics.update_staleness(0.0);
1930    alerts.heartbeat().await;
1931    last_data_timestamp.store(book.timestamp.timestamp(), Ordering::SeqCst);
1932    let driver_name = driver.as_str();
1933    let local_checksum = if let Some(cs) = book.local_checksum {
1934        cs
1935    } else {
1936        let computed = book.computed_checksum(None);
1937        book.local_checksum = Some(computed);
1938        computed
1939    };
1940    let symbol_label = book.symbol.code().to_string();
1941    if let Some(expected) = book.exchange_checksum {
1942        if expected != local_checksum {
1943            metrics.inc_checksum_mismatch(driver_name, &symbol_label);
1944            alerts
1945                .order_book_checksum_mismatch(driver_name, &symbol_label, expected, local_checksum)
1946                .await;
1947        }
1948    }
1949    {
1950        let mut ctx = strategy_ctx.lock().await;
1951        ctx.push_order_book(book.clone());
1952        let lock_start = Instant::now();
1953        let mut strat = strategy.lock().await;
1954        log_strategy_lock("order_book", lock_start.elapsed());
1955        let call_start = Instant::now();
1956        strat
1957            .on_order_book(&ctx, &book)
1958            .await
1959            .context("strategy failure on order book")?;
1960        log_strategy_call("order_book", call_start.elapsed());
1961    }
1962    emit_signals(
1963        strategy.clone(),
1964        bus.clone(),
1965        metrics.clone(),
1966        market_registry.clone(),
1967    )
1968    .await;
1969    Ok(())
1970}
1971
1972#[allow(clippy::too_many_arguments)]
1973async fn process_signal_event(
1974    signal: Signal,
1975    orchestrator: Arc<OrderOrchestrator>,
1976    portfolio: Arc<Mutex<Portfolio>>,
1977    market: Arc<Mutex<HashMap<Symbol, MarketSnapshot>>>,
1978    persisted: Arc<Mutex<LiveState>>,
1979    alerts: Arc<AlertManager>,
1980    metrics: Arc<LiveMetrics>,
1981    market_registry: Arc<MarketRegistry>,
1982) -> Result<()> {
1983    let ctx = shared_risk_context(
1984        signal.symbol,
1985        &portfolio,
1986        &market,
1987        &persisted,
1988        &market_registry,
1989    )
1990    .await;
1991    orchestrator.update_risk_context(signal.symbol, ctx);
1992    match orchestrator.on_signal(&signal, &ctx).await {
1993        Ok(()) => {
1994            alerts.reset_order_failures().await;
1995        }
1996        Err(err) => {
1997            metrics.inc_order_failure();
1998            metrics.inc_router_failure("orchestrator");
1999            alerts
2000                .order_failure(&format!("orchestrator error: {err}"))
2001                .await;
2002        }
2003    }
2004    Ok(())
2005}
2006
2007fn log_strategy_lock(event: &str, wait: Duration) {
2008    let wait_ms = wait.as_secs_f64() * 1000.0;
2009    if wait >= STRATEGY_LOCK_WARN_THRESHOLD {
2010        warn!(target: "strategy", event, wait_ms, "strategy lock wait exceeded threshold");
2011    } else {
2012        trace!(target: "strategy", event, wait_ms, "strategy lock acquired");
2013    }
2014}
2015
2016fn log_strategy_call(event: &str, elapsed: Duration) {
2017    let duration_ms = elapsed.as_secs_f64() * 1000.0;
2018    if elapsed >= STRATEGY_CALL_WARN_THRESHOLD {
2019        warn!(target: "strategy", event, duration_ms, "strategy call latency above threshold");
2020    } else {
2021        trace!(target: "strategy", event, duration_ms, "strategy call completed");
2022    }
2023}
2024
2025#[allow(clippy::too_many_arguments)]
2026async fn process_fill_event(
2027    fill: Fill,
2028    portfolio: Arc<Mutex<Portfolio>>,
2029    strategy: Arc<Mutex<Box<dyn Strategy>>>,
2030    strategy_ctx: Arc<Mutex<StrategyContext>>,
2031    orchestrator: Arc<OrderOrchestrator>,
2032    metrics: Arc<LiveMetrics>,
2033    alerts: Arc<AlertManager>,
2034    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
2035    persisted: Arc<Mutex<LiveState>>,
2036) -> Result<()> {
2037    let mut drawdown_triggered = false;
2038    {
2039        let mut guard = portfolio.lock().await;
2040        let was_liquidate_only = guard.liquidate_only();
2041        guard
2042            .apply_fill(&fill)
2043            .with_context(|| format!("Failed to apply fill to portfolio for {}", fill.symbol))?;
2044        if !was_liquidate_only && guard.liquidate_only() {
2045            drawdown_triggered = true;
2046        }
2047        let snapshot = guard.snapshot();
2048        let mut persisted_guard = persisted.lock().await;
2049        persisted_guard.portfolio = Some(snapshot);
2050    }
2051    {
2052        let positions = {
2053            let guard = portfolio.lock().await;
2054            guard.positions()
2055        };
2056        let mut ctx = strategy_ctx.lock().await;
2057        ctx.update_positions(positions);
2058    }
2059    orchestrator.on_fill(&fill).await.ok();
2060    {
2061        let ctx = strategy_ctx.lock().await;
2062        let lock_start = Instant::now();
2063        let mut strat = strategy.lock().await;
2064        log_strategy_lock("fill", lock_start.elapsed());
2065        let call_start = Instant::now();
2066        strat
2067            .on_fill(&ctx, &fill)
2068            .await
2069            .context("Strategy failed on fill event")?;
2070        log_strategy_call("fill", call_start.elapsed());
2071    }
2072    let equity = {
2073        let guard = portfolio.lock().await;
2074        guard.equity()
2075    };
2076    if let Some(value) = equity.to_f64() {
2077        metrics.update_equity(value);
2078    }
2079    alerts.update_equity(equity).await;
2080    metrics.inc_order();
2081    alerts
2082        .notify(
2083            "Order Filled",
2084            &format!(
2085                "order filled: {}@{} ({})",
2086                fill.fill_quantity,
2087                fill.fill_price,
2088                match fill.side {
2089                    Side::Buy => "buy",
2090                    Side::Sell => "sell",
2091                }
2092            ),
2093        )
2094        .await;
2095    if drawdown_triggered {
2096        alert_liquidate_only(alerts.clone()).await;
2097    }
2098    persist_state(
2099        state_repo.clone(),
2100        persisted.clone(),
2101        Some(strategy.clone()),
2102    )
2103    .await?;
2104    Ok(())
2105}
2106
2107async fn process_order_update_event(
2108    order: Order,
2109    orchestrator: Arc<OrderOrchestrator>,
2110    alerts: Arc<AlertManager>,
2111    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
2112    persisted: Arc<Mutex<LiveState>>,
2113) -> Result<()> {
2114    orchestrator.on_order_update(&order).await;
2115    if matches!(order.status, OrderStatus::Rejected) {
2116        error!(
2117            order_id = %order.id,
2118            symbol = %order.request.symbol,
2119            "order rejected by exchange"
2120        );
2121        alerts.order_failure("order rejected by exchange").await;
2122        alerts
2123            .notify(
2124                "Order rejected",
2125                &format!(
2126                    "Order {} for {} was rejected",
2127                    order.id, order.request.symbol
2128                ),
2129            )
2130            .await;
2131    }
2132    {
2133        let mut snapshot = persisted.lock().await;
2134        let mut found = false;
2135        for existing in &mut snapshot.open_orders {
2136            if existing.id == order.id {
2137                *existing = order.clone();
2138                found = true;
2139                break;
2140            }
2141        }
2142        if !found {
2143            snapshot.open_orders.push(order.clone());
2144        }
2145        if matches!(
2146            order.status,
2147            OrderStatus::Filled | OrderStatus::Canceled | OrderStatus::Rejected
2148        ) {
2149            snapshot.open_orders.retain(|o| o.id != order.id);
2150        }
2151    }
2152    persist_state(state_repo, persisted, None).await?;
2153    Ok(())
2154}
2155
2156async fn emit_signals(
2157    strategy: Arc<Mutex<Box<dyn Strategy>>>,
2158    bus: Arc<EventBus>,
2159    metrics: Arc<LiveMetrics>,
2160    market_registry: Arc<MarketRegistry>,
2161) {
2162    let signals = {
2163        let mut strat = strategy.lock().await;
2164        let drained = strat.drain_signals();
2165        debug!(count = drained.len(), "strategy drained signals");
2166        drained
2167    };
2168    if signals.is_empty() {
2169        return;
2170    }
2171    metrics.inc_signals(signals.len());
2172    let mut normalized = signals;
2173    normalize_group_quantities(&mut normalized, &market_registry);
2174    for signal in normalized {
2175        debug!(id = %signal.id, symbol = %signal.symbol, kind = ?signal.kind, "publishing signal event");
2176        bus.publish(Event::Signal(SignalEvent { signal }));
2177    }
2178}
2179
2180fn normalize_group_quantities(signals: &mut [Signal], registry: &MarketRegistry) {
2181    use std::collections::HashMap;
2182
2183    assign_implicit_group_ids(signals);
2184
2185    let mut groups: HashMap<Uuid, Vec<usize>> = HashMap::new();
2186    for (idx, signal) in signals.iter().enumerate() {
2187        if let Some(group_id) = signal.group_id {
2188            groups.entry(group_id).or_default().push(idx);
2189        }
2190    }
2191    for indices in groups.values() {
2192        if indices.len() < 2 {
2193            continue;
2194        }
2195        let mut quantity = indices
2196            .iter()
2197            .filter_map(|idx| signals[*idx].quantity)
2198            .find(|qty| *qty > Decimal::ZERO);
2199        let mut step = Decimal::ZERO;
2200        for idx in indices {
2201            let symbol = signals[*idx].symbol;
2202            let Some(instr) = registry.get(symbol) else {
2203                quantity = None;
2204                break;
2205            };
2206            if instr.lot_size > step {
2207                step = instr.lot_size;
2208            }
2209        }
2210        let Some(mut qty) = quantity else {
2211            continue;
2212        };
2213        if step > Decimal::ZERO {
2214            qty = (qty / step).floor() * step;
2215        }
2216        if qty <= Decimal::ZERO {
2217            continue;
2218        }
2219        for idx in indices {
2220            signals[*idx].quantity = Some(qty);
2221        }
2222    }
2223}
2224
2225fn assign_implicit_group_ids(signals: &mut [Signal]) {
2226    use std::collections::HashMap;
2227
2228    let mut note_groups: HashMap<String, Vec<usize>> = HashMap::new();
2229    for (idx, signal) in signals.iter().enumerate() {
2230        if signal.group_id.is_some() {
2231            continue;
2232        }
2233        if let Some(note) = signal.note.as_deref() {
2234            if !note.is_empty() {
2235                note_groups.entry(note.to_string()).or_default().push(idx);
2236            }
2237        }
2238    }
2239    for indices in note_groups.values() {
2240        if indices.len() < 2 {
2241            continue;
2242        }
2243        let group = Uuid::new_v4();
2244        for idx in indices {
2245            signals[*idx].group_id = Some(group);
2246        }
2247    }
2248
2249    let mut untagged: Vec<usize> = signals
2250        .iter()
2251        .enumerate()
2252        .filter(|(_, signal)| signal.group_id.is_none())
2253        .map(|(idx, _)| idx)
2254        .collect();
2255    if untagged.len() == 2 {
2256        let a = signals[untagged[0]].kind;
2257        let b = signals[untagged[1]].kind;
2258        if signal_kind_family(a) == signal_kind_family(b) && signal_kind_family(a).is_some() {
2259            let group = Uuid::new_v4();
2260            for idx in untagged.drain(..) {
2261                signals[idx].group_id = Some(group);
2262            }
2263        }
2264    }
2265}
2266
2267fn signal_kind_family(kind: SignalKind) -> Option<u8> {
2268    match kind {
2269        SignalKind::EnterLong | SignalKind::EnterShort => Some(0),
2270        SignalKind::ExitLong | SignalKind::ExitShort | SignalKind::Flatten => Some(1),
2271    }
2272}
2273
2274async fn persist_state(
2275    repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
2276    persisted: Arc<Mutex<LiveState>>,
2277    strategy: Option<Arc<Mutex<Box<dyn Strategy>>>>,
2278) -> Result<()> {
2279    if let Some(strat_lock) = strategy {
2280        // Snapshot strategy state before cloning the full state for persistence
2281        let strat = strat_lock.lock().await;
2282        if let Ok(json_state) = strat.snapshot() {
2283            let mut guard = persisted.lock().await;
2284            guard.strategy_state = Some(json_state);
2285        } else {
2286            warn!("failed to snapshot strategy state");
2287        }
2288    }
2289
2290    let snapshot = {
2291        let guard = persisted.lock().await;
2292        guard.clone()
2293    };
2294    tokio::task::spawn_blocking(move || repo.save(&snapshot))
2295        .await
2296        .map_err(|err| anyhow!("state persistence task failed: {err}"))?
2297        .map_err(|err| anyhow!(err.to_string()))
2298}
2299
2300async fn shared_risk_context(
2301    symbol: Symbol,
2302    portfolio: &Arc<Mutex<Portfolio>>,
2303    market: &Arc<Mutex<HashMap<Symbol, MarketSnapshot>>>,
2304    persisted: &Arc<Mutex<LiveState>>,
2305    registry: &Arc<MarketRegistry>,
2306) -> RiskContext {
2307    let instrument = registry.get(symbol);
2308    let (instrument_kind, base_asset, quote_asset, settlement_asset) = instrument
2309        .map(|instrument| {
2310            (
2311                Some(instrument.kind),
2312                instrument.base,
2313                instrument.quote,
2314                instrument.settlement_currency,
2315            )
2316        })
2317        .unwrap_or((
2318            None,
2319            AssetId::unspecified(),
2320            AssetId::unspecified(),
2321            AssetId::unspecified(),
2322        ));
2323    let (
2324        signed_qty,
2325        equity,
2326        venue_equity,
2327        liquidate_only,
2328        base_available,
2329        quote_available,
2330        settlement_available,
2331    ) = {
2332        let guard = portfolio.lock().await;
2333        (
2334            guard.signed_position_qty(symbol),
2335            guard.equity(),
2336            guard.exchange_equity(symbol.exchange),
2337            guard.liquidate_only(),
2338            guard
2339                .balance(base_asset)
2340                .map(|cash| cash.quantity)
2341                .unwrap_or_default(),
2342            guard
2343                .balance(quote_asset)
2344                .map(|cash| cash.quantity)
2345                .unwrap_or_default(),
2346            guard
2347                .balance(settlement_asset)
2348                .map(|cash| cash.quantity)
2349                .unwrap_or_default(),
2350        )
2351    };
2352    let observed_price = {
2353        let guard = market.lock().await;
2354        guard.get(&symbol).and_then(|snapshot| snapshot.price())
2355    };
2356    let last_price = if let Some(price) = observed_price {
2357        price
2358    } else {
2359        let guard = persisted.lock().await;
2360        guard
2361            .last_prices
2362            .get(&symbol)
2363            .copied()
2364            .unwrap_or(Decimal::ZERO)
2365    };
2366    RiskContext {
2367        symbol,
2368        exchange: symbol.exchange,
2369        signed_position_qty: signed_qty,
2370        portfolio_equity: equity,
2371        exchange_equity: venue_equity,
2372        last_price,
2373        liquidate_only,
2374        instrument_kind,
2375        base_asset,
2376        quote_asset,
2377        settlement_asset,
2378        base_available,
2379        quote_available,
2380        settlement_available,
2381    }
2382}
2383
2384async fn alert_liquidate_only(alerts: Arc<AlertManager>) {
2385    alerts
2386        .notify(
2387            "Max drawdown triggered",
2388            "Portfolio entered liquidate-only mode; new exposure blocked until review",
2389        )
2390        .await;
2391}
2392
2393fn spawn_connection_monitor(
2394    shutdown: ShutdownSignal,
2395    flag: Arc<AtomicBool>,
2396    metrics: Arc<LiveMetrics>,
2397    stream: &'static str,
2398) -> JoinHandle<()> {
2399    tokio::spawn(async move {
2400        loop {
2401            metrics.update_connection_status(stream, flag.load(Ordering::SeqCst));
2402            if !shutdown.sleep(Duration::from_secs(5)).await {
2403                break;
2404            }
2405        }
2406    })
2407}
2408
2409fn spawn_order_timeout_monitor(
2410    orchestrator: Arc<OrderOrchestrator>,
2411    bus: Arc<EventBus>,
2412    alerts: Arc<AlertManager>,
2413    shutdown: ShutdownSignal,
2414) -> JoinHandle<()> {
2415    tokio::spawn(async move {
2416        let mut ticker = tokio::time::interval(tesser_execution::orchestrator::ORDER_POLL_INTERVAL);
2417        loop {
2418            ticker.tick().await;
2419            if shutdown.triggered() {
2420                break;
2421            }
2422            match orchestrator.poll_stale_orders().await {
2423                Ok(updates) => {
2424                    for order in updates {
2425                        if matches!(order.status, OrderStatus::Rejected | OrderStatus::Canceled) {
2426                            let message = format!(
2427                                "Order {} for {} timed out after {}s",
2428                                order.id,
2429                                order.request.symbol,
2430                                tesser_execution::orchestrator::ORDER_TIMEOUT.as_secs()
2431                            );
2432                            error!(%message);
2433                            alerts.order_failure(&message).await;
2434                            alerts.notify("Order timeout", &message).await;
2435                        }
2436                        bus.publish(Event::OrderUpdate(OrderUpdateEvent { order }));
2437                    }
2438                }
2439                Err(err) => {
2440                    warn!(error = %err, "order timeout monitor failed");
2441                }
2442            }
2443        }
2444    })
2445}
2446
2447async fn load_market_registry(
2448    client: Arc<dyn ExecutionClient>,
2449    settings: &LiveSessionSettings,
2450) -> Result<Arc<MarketRegistry>> {
2451    let mut catalog = InstrumentCatalog::new();
2452    let mut loaded_local = false;
2453    if let Some(path) = &settings.markets_file {
2454        catalog
2455            .add_file(path)
2456            .with_context(|| format!("failed to load markets from {}", path.display()))?;
2457        loaded_local = true;
2458    }
2459
2460    if !loaded_local && !settings.exec_backend.is_paper() {
2461        let instruments = client
2462            .list_instruments(settings.category.as_path())
2463            .await
2464            .context("failed to fetch instruments from execution client")?;
2465        catalog
2466            .add_instruments(instruments)
2467            .map_err(|err| anyhow!(err.to_string()))?;
2468    } else if catalog.is_empty() {
2469        return Err(anyhow!(
2470            "paper execution requires --markets-file when exchange metadata is unavailable"
2471        ));
2472    }
2473
2474    if catalog.is_empty() {
2475        return Err(anyhow!(
2476            "no market metadata available; supply --markets-file or use a live exchange"
2477        ));
2478    }
2479
2480    let registry = catalog.build().map_err(|err| anyhow!(err.to_string()))?;
2481    Ok(Arc::new(registry))
2482}
2483
2484#[cfg(feature = "bybit")]
2485#[allow(clippy::too_many_arguments)]
2486fn spawn_bybit_private_stream(
2487    creds: BybitCredentials,
2488    ws_url: String,
2489    private_tx: mpsc::Sender<BrokerEvent>,
2490    exec_client: Arc<dyn ExecutionClient>,
2491    symbols: Vec<Symbol>,
2492    last_sync: Arc<tokio::sync::Mutex<Option<DateTime<Utc>>>>,
2493    private_connection_flag: Option<Arc<AtomicBool>>,
2494    metrics: Arc<LiveMetrics>,
2495    router: Option<Arc<RouterExecutionClient>>,
2496    shutdown: ShutdownSignal,
2497) {
2498    let exchange_id = exec_client
2499        .as_any()
2500        .downcast_ref::<BybitClient>()
2501        .map(|client| client.exchange())
2502        .unwrap_or(ExchangeId::UNSPECIFIED);
2503    let venue_symbols: Vec<Symbol> = symbols
2504        .iter()
2505        .copied()
2506        .filter(|symbol| symbol.exchange == exchange_id)
2507        .collect();
2508    tokio::spawn(async move {
2509        loop {
2510            match tesser_bybit::ws::connect_private(
2511                &ws_url,
2512                &creds,
2513                private_connection_flag.clone(),
2514            )
2515            .await
2516            {
2517                Ok(mut socket) => {
2518                    if let Some(flag) = &private_connection_flag {
2519                        flag.store(true, Ordering::SeqCst);
2520                    }
2521                    metrics.update_connection_status("private", true);
2522                    info!("Connected to Bybit private WebSocket stream");
2523                    for symbol in &venue_symbols {
2524                        match exec_client.list_open_orders(*symbol).await {
2525                            Ok(orders) => {
2526                                for mut order in orders {
2527                                    if let Some(router) = &router {
2528                                        order = router.normalize_order_event(exchange_id, order);
2529                                    }
2530                                    if let Err(err) =
2531                                        private_tx.send(BrokerEvent::OrderUpdate(order)).await
2532                                    {
2533                                        error!("failed to send reconciled order update: {err}");
2534                                    }
2535                                }
2536                            }
2537                            Err(e) => {
2538                                error!(
2539                                    "failed to reconcile open orders for {}: {e}",
2540                                    symbol.code()
2541                                );
2542                            }
2543                        }
2544                    }
2545                    if let Some(bybit) = exec_client.as_any().downcast_ref::<BybitClient>() {
2546                        let since = {
2547                            let guard = last_sync.lock().await;
2548                            guard.unwrap_or_else(|| Utc::now() - chrono::Duration::minutes(30))
2549                        };
2550                        match bybit.list_executions_since(since).await {
2551                            Ok(fills) => {
2552                                for mut fill in fills {
2553                                    if let Some(router) = &router {
2554                                        match router.normalize_fill_event(exchange_id, fill) {
2555                                            Some(normalized) => fill = normalized,
2556                                            None => {
2557                                                metrics.inc_router_failure("orphan_fill");
2558                                                continue;
2559                                            }
2560                                        }
2561                                    }
2562                                    if let Err(err) = private_tx.send(BrokerEvent::Fill(fill)).await
2563                                    {
2564                                        error!("failed to send reconciled fill: {err}");
2565                                    }
2566                                }
2567                            }
2568                            Err(e) => {
2569                                error!("failed to reconcile executions since {:?}: {}", since, e);
2570                            }
2571                        }
2572                        let mut guard = last_sync.lock().await;
2573                        *guard = Some(Utc::now());
2574                    }
2575
2576                    while let Some(msg) = socket.next().await {
2577                        if shutdown.triggered() {
2578                            break;
2579                        }
2580                        if let Ok(Message::Text(text)) = msg {
2581                            if let Ok(value) = serde_json::from_str::<serde_json::Value>(&text) {
2582                                if let Some(topic) = value.get("topic").and_then(|v| v.as_str()) {
2583                                    match topic {
2584                                        "order" => {
2585                                            if let Ok(msg) = serde_json::from_value::<
2586                                                PrivateMessage<BybitWsOrder>,
2587                                            >(
2588                                                value.clone()
2589                                            ) {
2590                                                for update in msg.data {
2591                                                    if let Ok(mut order) =
2592                                                        update.to_tesser_order(exchange_id, None)
2593                                                    {
2594                                                        if let Some(router) = &router {
2595                                                            order = router.normalize_order_event(
2596                                                                exchange_id,
2597                                                                order,
2598                                                            );
2599                                                        }
2600                                                        if let Err(err) = private_tx
2601                                                            .send(BrokerEvent::OrderUpdate(order))
2602                                                            .await
2603                                                        {
2604                                                            error!(
2605                                                                "failed to send private order update: {err}"
2606                                                            );
2607                                                        }
2608                                                    }
2609                                                }
2610                                            }
2611                                        }
2612                                        "execution" => {
2613                                            if let Ok(msg) = serde_json::from_value::<
2614                                                PrivateMessage<BybitWsExecution>,
2615                                            >(
2616                                                value.clone()
2617                                            ) {
2618                                                for exec in msg.data {
2619                                                    if let Ok(mut fill) =
2620                                                        exec.to_tesser_fill(exchange_id)
2621                                                    {
2622                                                        if let Some(router) = &router {
2623                                                            match router.normalize_fill_event(
2624                                                                exchange_id,
2625                                                                fill,
2626                                                            ) {
2627                                                                Some(normalized) => {
2628                                                                    fill = normalized
2629                                                                }
2630                                                                None => {
2631                                                                    metrics.inc_router_failure(
2632                                                                        "orphan_fill",
2633                                                                    );
2634                                                                    continue;
2635                                                                }
2636                                                            }
2637                                                        }
2638                                                        if let Err(err) = private_tx
2639                                                            .send(BrokerEvent::Fill(fill))
2640                                                            .await
2641                                                        {
2642                                                            error!(
2643                                                                "failed to send private fill event: {err}"
2644                                                            );
2645                                                        }
2646                                                    }
2647                                                }
2648                                            }
2649                                        }
2650                                        _ => {}
2651                                    }
2652                                }
2653                            }
2654                        }
2655                    }
2656                }
2657                Err(e) => {
2658                    if let Some(flag) = &private_connection_flag {
2659                        flag.store(false, Ordering::SeqCst);
2660                    }
2661                    metrics.update_connection_status("private", false);
2662                    error!("Bybit private WebSocket connection failed: {e}. Retrying...");
2663                    tokio::time::sleep(Duration::from_secs(5)).await;
2664                }
2665            }
2666            if shutdown.triggered() {
2667                break;
2668            }
2669        }
2670    });
2671}
2672
2673#[cfg(feature = "binance")]
2674#[allow(clippy::too_many_arguments)]
2675fn spawn_binance_private_stream(
2676    exec_client: Arc<dyn ExecutionClient>,
2677    ws_url: String,
2678    private_tx: mpsc::Sender<BrokerEvent>,
2679    private_connection_flag: Option<Arc<AtomicBool>>,
2680    metrics: Arc<LiveMetrics>,
2681    router: Option<Arc<RouterExecutionClient>>,
2682    shutdown: ShutdownSignal,
2683) {
2684    let router_handle = router.clone();
2685    tokio::spawn(async move {
2686        let router = router_handle;
2687        loop {
2688            let Some(binance) = exec_client
2689                .as_ref()
2690                .as_any()
2691                .downcast_ref::<BinanceClient>()
2692            else {
2693                warn!("execution client is not Binance");
2694                return;
2695            };
2696            let exchange = binance.exchange();
2697            let listen_key = match binance.start_user_stream().await {
2698                Ok(key) => key,
2699                Err(err) => {
2700                    error!("failed to start Binance user stream: {err}");
2701                    tokio::time::sleep(Duration::from_secs(5)).await;
2702                    continue;
2703                }
2704            };
2705            match BinanceUserDataStream::connect(&ws_url, &listen_key).await {
2706                Ok(user_stream) => {
2707                    if let Some(flag) = &private_connection_flag {
2708                        flag.store(true, Ordering::SeqCst);
2709                    }
2710                    metrics.update_connection_status("private", true);
2711                    let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
2712                    let tx_orders = private_tx.clone();
2713                    let exchange_id = exchange;
2714                    let router_for_event = router.clone();
2715                    let metrics_for_event = metrics.clone();
2716                    user_stream.on_event(move |event| {
2717                        if let Some(update) = extract_order_update(&event) {
2718                            if let Some(mut order) = order_from_update(exchange_id, update) {
2719                                if let Some(router) = &router_for_event {
2720                                    order = router.normalize_order_event(exchange_id, order);
2721                                }
2722                                let _ = tx_orders.blocking_send(BrokerEvent::OrderUpdate(order));
2723                            }
2724                            if let Some(mut fill) = fill_from_update(exchange_id, update) {
2725                                if let Some(router) = &router_for_event {
2726                                    match router.normalize_fill_event(exchange_id, fill) {
2727                                        Some(normalized) => fill = normalized,
2728                                        None => {
2729                                            metrics_for_event.inc_router_failure("orphan_fill");
2730                                            return;
2731                                        }
2732                                    }
2733                                }
2734                                let _ = tx_orders.blocking_send(BrokerEvent::Fill(fill));
2735                            }
2736                        }
2737                        if matches!(event, UserDataStreamEventsResponse::ListenKeyExpired(_)) {
2738                            let _ = reconnect_tx.try_send(());
2739                        }
2740                    });
2741                    let keepalive_client = exec_client.clone();
2742                    let keepalive_handle = tokio::spawn(async move {
2743                        let mut interval = tokio::time::interval(Duration::from_secs(30 * 60));
2744                        loop {
2745                            interval.tick().await;
2746                            let Some(client) = keepalive_client
2747                                .as_ref()
2748                                .as_any()
2749                                .downcast_ref::<BinanceClient>()
2750                            else {
2751                                break;
2752                            };
2753                            if client.keepalive_user_stream().await.is_err() {
2754                                break;
2755                            }
2756                        }
2757                    });
2758                    tokio::select! {
2759                        _ = reconnect_rx.recv() => {
2760                            warn!("binance listen key expired; reconnecting");
2761                        }
2762                        _ = shutdown.wait() => {
2763                            keepalive_handle.abort();
2764                            let _ = user_stream.unsubscribe().await;
2765                            return;
2766                        }
2767                    }
2768                    keepalive_handle.abort();
2769                    let _ = user_stream.unsubscribe().await;
2770                }
2771                Err(err) => {
2772                    error!("failed to connect to Binance user stream: {err}");
2773                }
2774            }
2775            if let Some(flag) = &private_connection_flag {
2776                flag.store(false, Ordering::SeqCst);
2777            }
2778            metrics.update_connection_status("private", false);
2779            if shutdown.triggered() {
2780                break;
2781            }
2782            tokio::time::sleep(Duration::from_secs(5)).await;
2783        }
2784    });
2785}
2786
2787#[cfg(test)]
2788mod tests {
2789    use super::*;
2790    use std::collections::VecDeque;
2791    use tesser_core::OrderBookLevel;
2792
2793    struct StaticStream {
2794        ticks: VecDeque<Tick>,
2795        candles: VecDeque<Candle>,
2796        books: VecDeque<OrderBook>,
2797    }
2798
2799    impl StaticStream {
2800        fn new(ticks: Vec<Tick>, candles: Vec<Candle>, books: Vec<OrderBook>) -> Self {
2801            Self {
2802                ticks: ticks.into(),
2803                candles: candles.into(),
2804                books: books.into(),
2805            }
2806        }
2807    }
2808
2809    #[async_trait::async_trait]
2810    impl LiveMarketStream for StaticStream {
2811        async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
2812            Ok(self.ticks.pop_front())
2813        }
2814
2815        async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
2816            Ok(self.candles.pop_front())
2817        }
2818
2819        async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>> {
2820            Ok(self.books.pop_front())
2821        }
2822    }
2823
2824    fn build_tick(exchange: &str, price: i64) -> Tick {
2825        Tick {
2826            symbol: Symbol::from(exchange),
2827            price: Decimal::from(price),
2828            size: Decimal::ONE,
2829            side: Side::Buy,
2830            exchange_timestamp: Utc::now(),
2831            received_at: Utc::now(),
2832        }
2833    }
2834
2835    fn build_candle(exchange: &str, close: i64) -> Candle {
2836        Candle {
2837            symbol: Symbol::from(exchange),
2838            interval: Interval::OneMinute,
2839            open: Decimal::from(close),
2840            high: Decimal::from(close),
2841            low: Decimal::from(close),
2842            close: Decimal::from(close),
2843            volume: Decimal::ONE,
2844            timestamp: Utc::now(),
2845        }
2846    }
2847
2848    fn build_book(exchange: &str, price: i64) -> OrderBook {
2849        OrderBook {
2850            symbol: Symbol::from(exchange),
2851            bids: vec![OrderBookLevel {
2852                price: Decimal::from(price),
2853                size: Decimal::ONE,
2854            }],
2855            asks: vec![OrderBookLevel {
2856                price: Decimal::from(price + 1),
2857                size: Decimal::ONE,
2858            }],
2859            timestamp: Utc::now(),
2860            exchange_checksum: None,
2861            local_checksum: None,
2862        }
2863    }
2864
2865    #[tokio::test]
2866    async fn router_market_stream_fans_in_events() {
2867        let shutdown = ShutdownSignal::new();
2868        let stream_a = Box::new(StaticStream::new(
2869            vec![build_tick("A", 1), build_tick("A", 2)],
2870            vec![build_candle("A", 10)],
2871            vec![build_book("A", 5)],
2872        ));
2873        let stream_b = Box::new(StaticStream::new(
2874            vec![build_tick("B", 3)],
2875            vec![build_candle("B", 20)],
2876            vec![build_book("B", 15)],
2877        ));
2878        let mut router = RouterMarketStream::new(
2879            vec![("A".into(), stream_a), ("B".into(), stream_b)],
2880            shutdown.clone(),
2881        );
2882
2883        let first = router.next_tick().await.unwrap().unwrap();
2884        let second = router.next_tick().await.unwrap().unwrap();
2885        let third = router.next_tick().await.unwrap().unwrap();
2886        assert_eq!(first.symbol, Symbol::from("A"));
2887        assert_eq!(second.symbol, Symbol::from("A"));
2888        assert_eq!(third.symbol, Symbol::from("B"));
2889
2890        let candle_a = router.next_candle().await.unwrap().unwrap();
2891        let candle_b = router.next_candle().await.unwrap().unwrap();
2892        assert_eq!(candle_a.symbol, Symbol::from("A"));
2893        assert_eq!(candle_b.symbol, Symbol::from("B"));
2894
2895        let book_a = router.next_order_book().await.unwrap().unwrap();
2896        let book_b = router.next_order_book().await.unwrap().unwrap();
2897        assert_eq!(book_a.bids[0].price, Decimal::from(5));
2898        assert_eq!(book_b.asks[0].price, Decimal::from(16));
2899
2900        shutdown.trigger();
2901    }
2902}