tesser_cli/
live.rs

1use std::collections::{HashMap, HashSet};
2use std::net::SocketAddr;
3use std::path::PathBuf;
4use std::sync::{
5    atomic::{AtomicBool, AtomicI64, Ordering},
6    Arc, Once,
7};
8use std::time::{Duration, Instant};
9
10use anyhow::{anyhow, bail, Context, Result};
11use chrono::{DateTime, Utc};
12use clap::ValueEnum;
13use futures::StreamExt;
14use rust_decimal::{prelude::ToPrimitive, Decimal};
15use tokio::sync::{mpsc, Mutex, Notify};
16use tokio::task::JoinHandle;
17use tokio_tungstenite::tungstenite::Message;
18use tracing::{error, info, trace, warn};
19
20use serde_json::{json, Value};
21
22fn ensure_builtin_connectors_registered() {
23    static INIT: Once = Once::new();
24    INIT.call_once(|| {
25        register_connector_factory(Arc::new(PaperFactory::default()));
26        #[cfg(feature = "bybit")]
27        register_bybit_factory();
28        #[cfg(feature = "binance")]
29        register_binance_factory();
30    });
31}
32
33#[cfg(feature = "binance")]
34use tesser_binance::{
35    fill_from_update, order_from_update, register_factory as register_binance_factory,
36    ws::{extract_order_update, BinanceUserDataStream, UserDataStreamEventsResponse},
37    BinanceClient,
38};
39use tesser_broker::{
40    get_connector_factory, register_connector_factory, BrokerResult, ConnectorFactory,
41    ConnectorStream, ConnectorStreamConfig, ExecutionClient,
42};
43#[cfg(feature = "bybit")]
44use tesser_bybit::ws::{BybitWsExecution, BybitWsOrder, PrivateMessage};
45#[cfg(feature = "bybit")]
46use tesser_bybit::{register_factory as register_bybit_factory, BybitClient, BybitCredentials};
47use tesser_config::{AlertingConfig, ExchangeConfig, PersistenceEngine, RiskManagementConfig};
48use tesser_core::{
49    AccountBalance, Candle, Fill, Interval, Order, OrderBook, OrderStatus, Position, Price,
50    Quantity, Side, Signal, Symbol, Tick,
51};
52use tesser_data::recorder::{ParquetRecorder, RecorderConfig, RecorderHandle};
53use tesser_events::{
54    CandleEvent, Event, EventBus, FillEvent, OrderBookEvent, OrderUpdateEvent, SignalEvent,
55    TickEvent,
56};
57use tesser_execution::{
58    AlgoStateRepository, BasicRiskChecker, ExecutionEngine, FixedOrderSizer, OrderOrchestrator,
59    PreTradeRiskChecker, RiskContext, RiskLimits, SqliteAlgoStateRepository, StoredAlgoState,
60};
61use tesser_journal::LmdbJournal;
62use tesser_markets::MarketRegistry;
63use tesser_paper::{PaperExecutionClient, PaperFactory};
64use tesser_portfolio::{
65    LiveState, Portfolio, PortfolioConfig, SqliteStateRepository, StateRepository,
66};
67use tesser_strategy::{Strategy, StrategyContext};
68
69use crate::alerts::{AlertDispatcher, AlertManager};
70use crate::control;
71use crate::telemetry::{spawn_metrics_server, LiveMetrics};
72use crate::PublicChannel;
73
74/// Unified event type for asynchronous updates from the broker.
75#[derive(Debug)]
76pub enum BrokerEvent {
77    OrderUpdate(Order),
78    Fill(Fill),
79}
80
81#[derive(Clone, Copy, Debug, Eq, PartialEq, ValueEnum)]
82#[value(rename_all = "kebab-case")]
83pub enum ExecutionBackend {
84    Paper,
85    Live,
86}
87
88impl ExecutionBackend {
89    fn is_paper(self) -> bool {
90        matches!(self, Self::Paper)
91    }
92}
93
94#[derive(Clone, Copy, Debug, Eq, PartialEq, ValueEnum)]
95#[value(rename_all = "kebab-case")]
96pub enum PersistenceBackend {
97    Sqlite,
98    Lmdb,
99}
100
101impl From<PersistenceBackend> for PersistenceEngine {
102    fn from(value: PersistenceBackend) -> Self {
103        match value {
104            PersistenceBackend::Sqlite => PersistenceEngine::Sqlite,
105            PersistenceBackend::Lmdb => PersistenceEngine::Lmdb,
106        }
107    }
108}
109
110const DEFAULT_ORDER_BOOK_DEPTH: usize = 50;
111
112pub const fn default_order_book_depth() -> usize {
113    DEFAULT_ORDER_BOOK_DEPTH
114}
115const STRATEGY_LOCK_WARN_THRESHOLD: Duration = Duration::from_millis(25);
116const STRATEGY_CALL_WARN_THRESHOLD: Duration = Duration::from_millis(250);
117
118#[async_trait::async_trait]
119trait LiveMarketStream: Send {
120    async fn next_tick(&mut self) -> BrokerResult<Option<Tick>>;
121    async fn next_candle(&mut self) -> BrokerResult<Option<Candle>>;
122    async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>>;
123}
124
125struct FactoryStreamAdapter {
126    inner: Box<dyn ConnectorStream>,
127}
128
129impl FactoryStreamAdapter {
130    fn new(inner: Box<dyn ConnectorStream>) -> Self {
131        Self { inner }
132    }
133}
134
135#[async_trait::async_trait]
136impl LiveMarketStream for FactoryStreamAdapter {
137    async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
138        self.inner.next_tick().await
139    }
140
141    async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
142        self.inner.next_candle().await
143    }
144
145    async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>> {
146        self.inner.next_order_book().await
147    }
148}
149
150#[derive(Clone)]
151pub struct PersistenceSettings {
152    pub engine: PersistenceEngine,
153    pub state_path: PathBuf,
154    pub algo_path: PathBuf,
155}
156
157impl PersistenceSettings {
158    pub fn new(engine: PersistenceEngine, state_path: PathBuf) -> Self {
159        let algo_path = match engine {
160            PersistenceEngine::Sqlite => state_path.with_extension("algos.db"),
161            PersistenceEngine::Lmdb => state_path.clone(),
162        };
163        Self {
164            engine,
165            state_path,
166            algo_path,
167        }
168    }
169
170    fn algo_repo_path(&self) -> &PathBuf {
171        &self.algo_path
172    }
173}
174
175struct PersistenceHandles {
176    state: Arc<dyn StateRepository<Snapshot = LiveState>>,
177    algo: Arc<dyn AlgoStateRepository<State = StoredAlgoState>>,
178}
179
180pub struct LiveSessionSettings {
181    pub category: PublicChannel,
182    pub interval: Interval,
183    pub quantity: Quantity,
184    pub slippage_bps: Decimal,
185    pub fee_bps: Decimal,
186    pub history: usize,
187    pub metrics_addr: SocketAddr,
188    pub persistence: PersistenceSettings,
189    pub initial_balances: HashMap<Symbol, Decimal>,
190    pub reporting_currency: Symbol,
191    pub markets_file: Option<PathBuf>,
192    pub alerting: AlertingConfig,
193    pub exec_backend: ExecutionBackend,
194    pub risk: RiskManagementConfig,
195    pub reconciliation_interval: Duration,
196    pub reconciliation_threshold: Decimal,
197    pub driver: String,
198    pub orderbook_depth: usize,
199    pub record_path: Option<PathBuf>,
200    pub control_addr: SocketAddr,
201}
202
203impl LiveSessionSettings {
204    fn risk_limits(&self) -> RiskLimits {
205        RiskLimits {
206            max_order_quantity: self.risk.max_order_quantity.max(Decimal::ZERO),
207            max_position_quantity: self.risk.max_position_quantity.max(Decimal::ZERO),
208        }
209    }
210}
211
212fn build_persistence_handles(settings: &LiveSessionSettings) -> Result<PersistenceHandles> {
213    match settings.persistence.engine {
214        PersistenceEngine::Sqlite => {
215            let state_repo: Arc<dyn StateRepository<Snapshot = LiveState>> = Arc::new(
216                SqliteStateRepository::new(settings.persistence.state_path.clone()),
217            );
218            let algo_repo: Arc<dyn AlgoStateRepository<State = StoredAlgoState>> = Arc::new(
219                SqliteAlgoStateRepository::new(settings.persistence.algo_repo_path())?,
220            );
221            Ok(PersistenceHandles {
222                state: state_repo,
223                algo: algo_repo,
224            })
225        }
226        PersistenceEngine::Lmdb => {
227            let journal = Arc::new(LmdbJournal::open(&settings.persistence.state_path)?);
228            let state_repo: Arc<dyn StateRepository<Snapshot = LiveState>> =
229                Arc::new(journal.state_repo());
230            let algo_repo: Arc<dyn AlgoStateRepository<State = StoredAlgoState>> =
231                Arc::new(journal.algo_repo());
232            Ok(PersistenceHandles {
233                state: state_repo,
234                algo: algo_repo,
235            })
236        }
237    }
238}
239
240pub async fn run_live(
241    strategy: Box<dyn Strategy>,
242    symbols: Vec<String>,
243    exchange: ExchangeConfig,
244    settings: LiveSessionSettings,
245) -> Result<()> {
246    run_live_with_shutdown(strategy, symbols, exchange, settings, ShutdownSignal::new()).await
247}
248
249/// Variant of [`run_live`] that accepts a manually controlled shutdown signal.
250pub async fn run_live_with_shutdown(
251    strategy: Box<dyn Strategy>,
252    symbols: Vec<String>,
253    exchange: ExchangeConfig,
254    settings: LiveSessionSettings,
255    shutdown: ShutdownSignal,
256) -> Result<()> {
257    if symbols.is_empty() {
258        return Err(anyhow!("strategy did not declare any subscriptions"));
259    }
260    if settings.quantity <= Decimal::ZERO {
261        return Err(anyhow!("--quantity must be positive"));
262    }
263
264    let public_connection = Arc::new(AtomicBool::new(false));
265    let private_connection = if matches!(settings.exec_backend, ExecutionBackend::Live) {
266        Some(Arc::new(AtomicBool::new(false)))
267    } else {
268        None
269    };
270    ensure_builtin_connectors_registered();
271    let connector_payload = build_exchange_payload(&exchange, &settings);
272    let connector_factory = get_connector_factory(&settings.driver)
273        .ok_or_else(|| anyhow!("driver {} is not registered", settings.driver))?;
274    let stream_config = ConnectorStreamConfig {
275        ws_url: Some(exchange.ws_url.clone()),
276        metadata: json!({
277            "category": settings.category.as_path(),
278            "symbols": symbols.clone(),
279            "orderbook_depth": settings.orderbook_depth,
280        }),
281        connection_status: Some(public_connection.clone()),
282    };
283    let mut connector_stream = connector_factory
284        .create_market_stream(&connector_payload, stream_config)
285        .await
286        .map_err(|err| anyhow!("failed to create market stream: {err}"))?;
287    connector_stream
288        .subscribe(&symbols, settings.interval)
289        .await
290        .map_err(|err| anyhow!("failed to subscribe via connector: {err}"))?;
291    let market_stream: Box<dyn LiveMarketStream> =
292        Box::new(FactoryStreamAdapter::new(connector_stream));
293
294    let execution_client =
295        build_execution_client(&settings, connector_factory.clone(), &connector_payload).await?;
296    let market_registry = load_market_registry(execution_client.clone(), &settings).await?;
297    if matches!(settings.exec_backend, ExecutionBackend::Live) {
298        info!(
299            rest = %exchange.rest_url,
300            driver = ?settings.driver,
301            "live execution enabled via {:?} REST",
302            settings.driver
303        );
304    }
305    let risk_checker: Arc<dyn PreTradeRiskChecker> =
306        Arc::new(BasicRiskChecker::new(settings.risk_limits()));
307    let execution = ExecutionEngine::new(
308        execution_client.clone(),
309        Box::new(FixedOrderSizer {
310            quantity: settings.quantity,
311        }),
312        risk_checker,
313    );
314
315    let mut bootstrap = None;
316    if matches!(settings.exec_backend, ExecutionBackend::Live) {
317        info!("synchronizing portfolio snapshot from exchange");
318        let positions = execution_client
319            .positions()
320            .await
321            .context("failed to fetch remote positions")?;
322        let balances = execution_client
323            .account_balances()
324            .await
325            .context("failed to fetch remote account balances")?;
326        let mut open_orders = Vec::new();
327        for symbol in &symbols {
328            let mut symbol_orders = execution_client
329                .list_open_orders(symbol)
330                .await
331                .with_context(|| format!("failed to fetch open orders for {symbol}"))?;
332            open_orders.append(&mut symbol_orders);
333        }
334        bootstrap = Some(LiveBootstrap {
335            positions,
336            balances,
337            open_orders,
338        });
339    }
340
341    let persistence = build_persistence_handles(&settings)?;
342
343    // Create orchestrator with execution engine
344    let initial_open_orders = bootstrap
345        .as_ref()
346        .map(|data| data.open_orders.clone())
347        .unwrap_or_default();
348    let orchestrator = OrderOrchestrator::new(
349        Arc::new(execution),
350        persistence.algo.clone(),
351        initial_open_orders,
352    )
353    .await?;
354
355    let runtime = LiveRuntime::new(
356        market_stream,
357        strategy,
358        symbols,
359        orchestrator,
360        persistence.state,
361        settings,
362        exchange.ws_url.clone(),
363        market_registry,
364        shutdown,
365        public_connection,
366        private_connection,
367        bootstrap,
368    )
369    .await?;
370    runtime.run().await
371}
372
373async fn build_execution_client(
374    settings: &LiveSessionSettings,
375    connector_factory: Arc<dyn ConnectorFactory>,
376    connector_payload: &Value,
377) -> Result<Arc<dyn ExecutionClient>> {
378    match settings.exec_backend {
379        ExecutionBackend::Paper => {
380            if settings.driver == "paper" {
381                return connector_factory
382                    .create_execution_client(connector_payload)
383                    .await
384                    .map_err(|err| anyhow!("failed to create execution client: {err}"));
385            }
386            Ok(Arc::new(PaperExecutionClient::new(
387                "paper".to_string(),
388                vec!["BTCUSDT".to_string()],
389                settings.slippage_bps,
390                settings.fee_bps,
391            )))
392        }
393        ExecutionBackend::Live => connector_factory
394            .create_execution_client(connector_payload)
395            .await
396            .map_err(|err| anyhow!("failed to create execution client: {err}")),
397    }
398}
399
400struct LiveRuntime {
401    market: Box<dyn LiveMarketStream>,
402    orchestrator: Arc<OrderOrchestrator>,
403    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
404    persisted: Arc<Mutex<LiveState>>,
405    event_bus: Arc<EventBus>,
406    recorder: Option<ParquetRecorder>,
407    control_task: Option<JoinHandle<()>>,
408    shutdown: ShutdownSignal,
409    metrics_task: JoinHandle<()>,
410    alert_task: Option<JoinHandle<()>>,
411    reconciliation_task: Option<JoinHandle<()>>,
412    reconciliation_ctx: Option<Arc<ReconciliationContext>>,
413    private_event_rx: mpsc::Receiver<BrokerEvent>,
414    #[allow(dead_code)]
415    last_private_sync: Arc<tokio::sync::Mutex<Option<DateTime<Utc>>>>,
416    subscriber_handles: Vec<JoinHandle<()>>,
417    connection_monitors: Vec<JoinHandle<()>>,
418    order_timeout_task: JoinHandle<()>,
419    strategy: Arc<Mutex<Box<dyn Strategy>>>,
420    _public_connection: Arc<AtomicBool>,
421    _private_connection: Option<Arc<AtomicBool>>,
422}
423
424struct LiveBootstrap {
425    positions: Vec<Position>,
426    balances: Vec<AccountBalance>,
427    open_orders: Vec<Order>,
428}
429
430impl LiveRuntime {
431    #[allow(clippy::too_many_arguments)]
432    async fn new(
433        market: Box<dyn LiveMarketStream>,
434        mut strategy: Box<dyn Strategy>,
435        symbols: Vec<String>,
436        orchestrator: OrderOrchestrator,
437        state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
438        settings: LiveSessionSettings,
439        #[cfg_attr(not(feature = "binance"), allow(unused_variables))] exchange_ws_url: String,
440        market_registry: Arc<MarketRegistry>,
441        shutdown: ShutdownSignal,
442        public_connection: Arc<AtomicBool>,
443        private_connection: Option<Arc<AtomicBool>>,
444        bootstrap: Option<LiveBootstrap>,
445    ) -> Result<Self> {
446        let mut strategy_ctx = StrategyContext::new(settings.history);
447        let driver = Arc::new(settings.driver.clone());
448        let mut persisted = match tokio::task::spawn_blocking({
449            let repo = state_repo.clone();
450            move || repo.load()
451        })
452        .await
453        {
454            Ok(Ok(state)) => state,
455            Ok(Err(err)) => {
456                warn!(error = %err, "failed to load live state; starting from defaults");
457                LiveState::default()
458            }
459            Err(err) => {
460                warn!(error = %err, "state load task failed; starting from defaults");
461                LiveState::default()
462            }
463        };
464        let mut live_bootstrap = None;
465        if let Some(data) = bootstrap {
466            persisted.open_orders = data.open_orders;
467            live_bootstrap = Some((data.positions, data.balances));
468        } else if matches!(settings.exec_backend, ExecutionBackend::Live) {
469            warn!("live session missing bootstrap data; continuing without remote snapshot");
470        }
471
472        let portfolio_cfg = PortfolioConfig {
473            initial_balances: settings.initial_balances.clone(),
474            reporting_currency: settings.reporting_currency.clone(),
475            max_drawdown: Some(settings.risk.max_drawdown),
476        };
477        let portfolio = if let Some((positions, balances)) = live_bootstrap {
478            Portfolio::from_exchange_state(
479                positions,
480                balances,
481                portfolio_cfg.clone(),
482                market_registry.clone(),
483            )
484        } else if let Some(snapshot) = persisted.portfolio.take() {
485            Portfolio::from_state(snapshot, portfolio_cfg.clone(), market_registry.clone())
486        } else {
487            Portfolio::new(portfolio_cfg.clone(), market_registry.clone())
488        };
489        strategy_ctx.update_positions(portfolio.positions());
490        // Restore strategy state if found in persistence
491        if let Some(state) = persisted.strategy_state.take() {
492            info!("restoring strategy state from persistence");
493            strategy
494                .restore(state)
495                .context("failed to restore strategy state")?;
496        }
497        persisted.portfolio = Some(portfolio.snapshot());
498
499        let mut market_snapshots = HashMap::new();
500        for symbol in &symbols {
501            let mut snapshot = MarketSnapshot::default();
502            if let Some(price) = persisted.last_prices.get(symbol).copied() {
503                snapshot.last_trade = Some(price);
504            }
505            market_snapshots.insert(symbol.clone(), snapshot);
506        }
507
508        let metrics = LiveMetrics::new();
509        metrics.update_connection_status("public", public_connection.load(Ordering::SeqCst));
510        if let Some(flag) = &private_connection {
511            metrics.update_connection_status("private", flag.load(Ordering::SeqCst));
512        }
513        let metrics_task = spawn_metrics_server(metrics.registry(), settings.metrics_addr);
514        let dispatcher = AlertDispatcher::new(settings.alerting.webhook_url.clone());
515        let alerts = AlertManager::new(
516            settings.alerting,
517            dispatcher,
518            Some(public_connection.clone()),
519            private_connection.clone(),
520        );
521        let (private_event_tx, private_event_rx) = mpsc::channel(1024);
522        let last_private_sync = Arc::new(tokio::sync::Mutex::new(persisted.last_candle_ts));
523        let alerts = Arc::new(alerts);
524        let alert_task = alerts.spawn_watchdog();
525        let metrics = Arc::new(metrics);
526        let mut connection_monitors = Vec::new();
527        connection_monitors.push(spawn_connection_monitor(
528            shutdown.clone(),
529            public_connection.clone(),
530            metrics.clone(),
531            "public",
532        ));
533        if let Some(flag) = private_connection.clone() {
534            connection_monitors.push(spawn_connection_monitor(
535                shutdown.clone(),
536                flag,
537                metrics.clone(),
538                "private",
539            ));
540        }
541
542        if !settings.exec_backend.is_paper() {
543            let execution_engine = orchestrator.execution_engine();
544            let exec_client = execution_engine.client();
545            match settings.driver.as_str() {
546                "bybit" | "" => {
547                    #[cfg(feature = "bybit")]
548                    {
549                        let bybit = exec_client
550                            .as_ref()
551                            .as_any()
552                            .downcast_ref::<BybitClient>()
553                            .ok_or_else(|| anyhow!("execution client is not Bybit"))?;
554                        let creds = bybit
555                            .get_credentials()
556                            .ok_or_else(|| anyhow!("live execution requires Bybit credentials"))?;
557                        spawn_bybit_private_stream(
558                            creds,
559                            bybit.get_ws_url(),
560                            private_event_tx.clone(),
561                            exec_client.clone(),
562                            symbols.clone(),
563                            last_private_sync.clone(),
564                            private_connection.clone(),
565                            metrics.clone(),
566                            shutdown.clone(),
567                        );
568                    }
569                    #[cfg(not(feature = "bybit"))]
570                    {
571                        bail!("driver 'bybit' is unavailable without the 'bybit' feature");
572                    }
573                }
574                "binance" => {
575                    #[cfg(feature = "binance")]
576                    {
577                        spawn_binance_private_stream(
578                            exec_client.clone(),
579                            exchange_ws_url.clone(),
580                            private_event_tx.clone(),
581                            private_connection.clone(),
582                            metrics.clone(),
583                            shutdown.clone(),
584                        );
585                    }
586                    #[cfg(not(feature = "binance"))]
587                    {
588                        bail!("driver 'binance' is unavailable without the 'binance' feature");
589                    }
590                }
591                "paper" => {}
592                other => {
593                    bail!("private stream unsupported for driver '{other}'");
594                }
595            }
596        }
597
598        let recorder = if let Some(record_path) = settings.record_path.clone() {
599            let config = RecorderConfig {
600                root: record_path.clone(),
601                ..RecorderConfig::default()
602            };
603            match ParquetRecorder::spawn(config).await {
604                Ok(recorder) => {
605                    info!(path = %record_path.display(), "flight recorder enabled");
606                    Some(recorder)
607                }
608                Err(err) => {
609                    warn!(
610                        error = %err,
611                        path = %record_path.display(),
612                        "failed to start flight recorder"
613                    );
614                    None
615                }
616            }
617        } else {
618            None
619        };
620        let recorder_handle = recorder.as_ref().map(|rec| rec.handle());
621
622        let strategy = Arc::new(Mutex::new(strategy));
623        let strategy_ctx = Arc::new(Mutex::new(strategy_ctx));
624        let portfolio = Arc::new(Mutex::new(portfolio));
625        let market_cache = Arc::new(Mutex::new(market_snapshots));
626        let persisted = Arc::new(Mutex::new(persisted));
627        let orchestrator = Arc::new(orchestrator);
628        let event_bus = Arc::new(EventBus::new(2048));
629        let last_data_timestamp = Arc::new(AtomicI64::new(0));
630        let control_task = control::spawn_control_plane(
631            settings.control_addr,
632            portfolio.clone(),
633            orchestrator.clone(),
634            persisted.clone(),
635            last_data_timestamp.clone(),
636            shutdown.clone(),
637        );
638        let reconciliation_ctx = (!settings.exec_backend.is_paper()).then(|| {
639            Arc::new(ReconciliationContext::new(ReconciliationContextConfig {
640                client: orchestrator.execution_engine().client(),
641                portfolio: portfolio.clone(),
642                persisted: persisted.clone(),
643                state_repo: state_repo.clone(),
644                alerts: alerts.clone(),
645                metrics: metrics.clone(),
646                reporting_currency: settings.reporting_currency.clone(),
647                threshold: settings.reconciliation_threshold,
648            }))
649        });
650        let reconciliation_task = reconciliation_ctx.as_ref().map(|ctx| {
651            spawn_reconciliation_loop(
652                ctx.clone(),
653                shutdown.clone(),
654                settings.reconciliation_interval,
655            )
656        });
657        let subscriber_handles = spawn_event_subscribers(
658            event_bus.clone(),
659            strategy.clone(),
660            strategy_ctx.clone(),
661            orchestrator.clone(),
662            portfolio.clone(),
663            metrics.clone(),
664            alerts.clone(),
665            market_cache.clone(),
666            state_repo.clone(),
667            persisted.clone(),
668            settings.exec_backend,
669            recorder_handle.clone(),
670            last_data_timestamp.clone(),
671            driver.clone(),
672        );
673        let order_timeout_task = spawn_order_timeout_monitor(
674            orchestrator.clone(),
675            event_bus.clone(),
676            alerts.clone(),
677            shutdown.clone(),
678        );
679
680        info!(
681            symbols = ?symbols,
682            category = ?settings.category,
683            metrics_addr = %settings.metrics_addr,
684            state_path = %settings.persistence.state_path.display(),
685            persistence_engine = ?settings.persistence.engine,
686            history = settings.history,
687            "market stream ready"
688        );
689
690        for symbol in &symbols {
691            let ctx = shared_risk_context(symbol, &portfolio, &market_cache, &persisted).await;
692            orchestrator.update_risk_context(symbol.clone(), ctx);
693        }
694
695        Ok(Self {
696            market,
697            orchestrator,
698            state_repo,
699            persisted,
700            event_bus,
701            recorder,
702            control_task: Some(control_task),
703            shutdown,
704            metrics_task,
705            alert_task,
706            reconciliation_task,
707            reconciliation_ctx,
708            private_event_rx,
709            last_private_sync,
710            subscriber_handles,
711            connection_monitors,
712            order_timeout_task,
713            strategy,
714            _public_connection: public_connection,
715            _private_connection: private_connection,
716        })
717    }
718
719    async fn run(mut self) -> Result<()> {
720        info!("live session started");
721        if let Some(ctx) = self.reconciliation_ctx.as_ref() {
722            perform_state_reconciliation(ctx.as_ref())
723                .await
724                .context("initial state reconciliation failed")?;
725        }
726        let backoff = Duration::from_millis(200);
727        let mut orchestrator_timer = tokio::time::interval(Duration::from_secs(1));
728
729        while !self.shutdown.triggered() {
730            let mut progressed = false;
731
732            if let Some(tick) = self.market.next_tick().await? {
733                progressed = true;
734                self.event_bus.publish(Event::Tick(TickEvent { tick }));
735            }
736
737            if let Some(candle) = self.market.next_candle().await? {
738                progressed = true;
739                self.event_bus
740                    .publish(Event::Candle(CandleEvent { candle }));
741            }
742
743            if let Some(book) = self.market.next_order_book().await? {
744                progressed = true;
745                self.event_bus
746                    .publish(Event::OrderBook(OrderBookEvent { order_book: book }));
747            }
748
749            tokio::select! {
750                biased;
751                Some(event) = self.private_event_rx.recv() => {
752                    progressed = true;
753                    match event {
754                        BrokerEvent::OrderUpdate(order) => {
755                            info!(
756                                order_id = %order.id,
757                                status = ?order.status,
758                                symbol = %order.request.symbol,
759                                "received private order update"
760                            );
761                            self.event_bus
762                                .publish(Event::OrderUpdate(OrderUpdateEvent { order }));
763                        }
764                        BrokerEvent::Fill(fill) => {
765                            info!(
766                                order_id = %fill.order_id,
767                                symbol = %fill.symbol,
768                                qty = %fill.fill_quantity,
769                                price = %fill.fill_price,
770                                "received private fill"
771                            );
772                            self.event_bus.publish(Event::Fill(FillEvent { fill }));
773                        }
774                    }
775                }
776                _ = orchestrator_timer.tick() => {
777                    // Drive TWAP and other time-based algorithms
778                    if let Err(e) = self.orchestrator.on_timer_tick().await {
779                        error!("Orchestrator timer tick failed: {}", e);
780                    }
781                }
782                else => {}
783            }
784
785            if !progressed && !self.shutdown.sleep(backoff).await {
786                break;
787            }
788        }
789        info!("live session stopping");
790        self.metrics_task.abort();
791        if let Some(handle) = self.alert_task.take() {
792            handle.abort();
793        }
794        if let Some(handle) = self.reconciliation_task.take() {
795            handle.abort();
796        }
797        self.order_timeout_task.abort();
798        for handle in self.subscriber_handles.drain(..) {
799            handle.abort();
800        }
801        for handle in self.connection_monitors.drain(..) {
802            handle.abort();
803        }
804        if let Err(err) = persist_state(
805            self.state_repo.clone(),
806            self.persisted.clone(),
807            Some(self.strategy.clone()),
808        )
809        .await
810        {
811            warn!(error = %err, "failed to persist shutdown state");
812        }
813        if let Some(task) = self.control_task.take() {
814            if let Err(err) = task.await {
815                warn!(error = %err, "control plane server task aborted");
816            }
817        }
818        if let Some(recorder) = self.recorder.take() {
819            if let Err(err) = recorder.shutdown().await {
820                warn!(error = %err, "failed to flush flight recorder");
821            }
822        }
823        Ok(())
824    }
825}
826
827struct ReconciliationContext {
828    client: Arc<dyn ExecutionClient>,
829    portfolio: Arc<Mutex<Portfolio>>,
830    persisted: Arc<Mutex<LiveState>>,
831    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
832    alerts: Arc<AlertManager>,
833    metrics: Arc<LiveMetrics>,
834    reporting_currency: Symbol,
835    threshold: Decimal,
836}
837
838struct ReconciliationContextConfig {
839    client: Arc<dyn ExecutionClient>,
840    portfolio: Arc<Mutex<Portfolio>>,
841    persisted: Arc<Mutex<LiveState>>,
842    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
843    alerts: Arc<AlertManager>,
844    metrics: Arc<LiveMetrics>,
845    reporting_currency: Symbol,
846    threshold: Decimal,
847}
848
849impl ReconciliationContext {
850    fn new(config: ReconciliationContextConfig) -> Self {
851        let ReconciliationContextConfig {
852            client,
853            portfolio,
854            persisted,
855            state_repo,
856            alerts,
857            metrics,
858            reporting_currency,
859            threshold,
860        } = config;
861        let min_threshold = Decimal::new(1, 6); // 0.000001 as a practical floor
862        let threshold = if threshold <= Decimal::ZERO {
863            min_threshold
864        } else {
865            threshold
866        };
867        Self {
868            client,
869            portfolio,
870            persisted,
871            state_repo,
872            alerts,
873            metrics,
874            reporting_currency,
875            threshold,
876        }
877    }
878}
879
880fn spawn_reconciliation_loop(
881    ctx: Arc<ReconciliationContext>,
882    shutdown: ShutdownSignal,
883    interval: Duration,
884) -> JoinHandle<()> {
885    tokio::spawn(async move {
886        while shutdown.sleep(interval).await {
887            if let Err(err) = perform_state_reconciliation(ctx.as_ref()).await {
888                error!(error = %err, "periodic state reconciliation failed");
889            }
890        }
891    })
892}
893
894async fn perform_state_reconciliation(ctx: &ReconciliationContext) -> Result<()> {
895    info!("running state reconciliation");
896    let remote_positions = ctx
897        .client
898        .positions()
899        .await
900        .context("failed to fetch remote positions")?;
901    let remote_balances = ctx
902        .client
903        .account_balances()
904        .await
905        .context("failed to fetch remote balances")?;
906    let (local_positions, local_cash) = {
907        let guard = ctx.portfolio.lock().await;
908        (guard.positions(), guard.cash())
909    };
910
911    let remote_map = positions_to_map(remote_positions);
912    let local_map = positions_to_map(local_positions);
913    let mut tracked_symbols: HashSet<String> = HashSet::new();
914    tracked_symbols.extend(remote_map.keys().cloned());
915    tracked_symbols.extend(local_map.keys().cloned());
916
917    let mut severe_findings = Vec::new();
918    for symbol in tracked_symbols {
919        let local_qty = local_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
920        let remote_qty = remote_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
921        let diff = (local_qty - remote_qty).abs();
922        let diff_value = diff.to_f64().unwrap_or(0.0);
923        ctx.metrics.update_position_diff(&symbol, diff_value);
924        if diff > Decimal::ZERO {
925            warn!(
926                symbol = %symbol,
927                local = %local_qty,
928                remote = %remote_qty,
929                diff = %diff,
930                "position mismatch detected during reconciliation"
931            );
932            let pct = normalize_diff(diff, remote_qty);
933            if pct >= ctx.threshold {
934                error!(
935                    symbol = %symbol,
936                    local = %local_qty,
937                    remote = %remote_qty,
938                    diff = %diff,
939                    pct = %pct,
940                    "position mismatch exceeds threshold"
941                );
942                severe_findings.push(format!(
943                    "{symbol} local={local_qty} remote={remote_qty} diff={diff}"
944                ));
945            }
946        }
947    }
948
949    let reporting = ctx.reporting_currency.as_str();
950    let remote_cash = remote_balances
951        .iter()
952        .find(|balance| balance.currency == reporting)
953        .map(|balance| balance.available)
954        .unwrap_or_else(|| Decimal::ZERO);
955    let cash_diff = (remote_cash - local_cash).abs();
956    ctx.metrics
957        .update_balance_diff(reporting, cash_diff.to_f64().unwrap_or(0.0));
958    if cash_diff > Decimal::ZERO {
959        warn!(
960            currency = %reporting,
961            local = %local_cash,
962            remote = %remote_cash,
963            diff = %cash_diff,
964            "balance mismatch detected during reconciliation"
965        );
966        let pct = normalize_diff(cash_diff, remote_cash);
967        if pct >= ctx.threshold {
968            error!(
969                currency = %reporting,
970                local = %local_cash,
971                remote = %remote_cash,
972                diff = %cash_diff,
973                pct = %pct,
974                "balance mismatch exceeds threshold"
975            );
976            severe_findings.push(format!(
977                "{reporting} balance local={local_cash} remote={remote_cash} diff={cash_diff}"
978            ));
979        }
980    }
981
982    if severe_findings.is_empty() {
983        info!("state reconciliation complete with no critical divergence");
984        return Ok(());
985    }
986
987    let alert_body = severe_findings.join("; ");
988    ctx.alerts
989        .notify("State reconciliation divergence", &alert_body)
990        .await;
991    enforce_liquidate_only(ctx).await;
992    Ok(())
993}
994
995async fn enforce_liquidate_only(ctx: &ReconciliationContext) {
996    let snapshot = {
997        let mut guard = ctx.portfolio.lock().await;
998        if !guard.set_liquidate_only(true) {
999            return;
1000        }
1001        info!("entering liquidate-only mode due to reconciliation divergence");
1002        guard.snapshot()
1003    };
1004    {
1005        let mut state = ctx.persisted.lock().await;
1006        state.portfolio = Some(snapshot);
1007    }
1008    if let Err(err) = persist_state(ctx.state_repo.clone(), ctx.persisted.clone(), None).await {
1009        warn!(error = %err, "failed to persist liquidate-only transition");
1010    }
1011}
1012
1013fn positions_to_map(positions: Vec<Position>) -> HashMap<String, Decimal> {
1014    let mut map = HashMap::new();
1015    for position in positions {
1016        map.insert(position.symbol.clone(), position_signed_qty(&position));
1017    }
1018    map
1019}
1020
1021fn position_signed_qty(position: &Position) -> Decimal {
1022    match position.side {
1023        Some(Side::Buy) => position.quantity,
1024        Some(Side::Sell) => -position.quantity,
1025        None => Decimal::ZERO,
1026    }
1027}
1028
1029fn normalize_diff(diff: Decimal, reference: Decimal) -> Decimal {
1030    if diff <= Decimal::ZERO {
1031        Decimal::ZERO
1032    } else {
1033        let denominator = std::cmp::max(reference.abs(), Decimal::ONE);
1034        diff / denominator
1035    }
1036}
1037
1038fn build_exchange_payload(exchange: &ExchangeConfig, settings: &LiveSessionSettings) -> Value {
1039    let mut payload = serde_json::Map::new();
1040    payload.insert("rest_url".into(), Value::String(exchange.rest_url.clone()));
1041    payload.insert("ws_url".into(), Value::String(exchange.ws_url.clone()));
1042    payload.insert("api_key".into(), Value::String(exchange.api_key.clone()));
1043    payload.insert(
1044        "api_secret".into(),
1045        Value::String(exchange.api_secret.clone()),
1046    );
1047    payload.insert(
1048        "category".into(),
1049        Value::String(settings.category.as_path().to_string()),
1050    );
1051    payload.insert(
1052        "orderbook_depth".into(),
1053        Value::Number(serde_json::Number::from(settings.orderbook_depth as u64)),
1054    );
1055    if let Value::Object(extra) = exchange.params.clone() {
1056        for (key, value) in extra {
1057            payload.insert(key, value);
1058        }
1059    }
1060    Value::Object(payload)
1061}
1062
1063#[derive(Default)]
1064struct MarketSnapshot {
1065    last_trade: Option<Price>,
1066    last_trade_ts: Option<DateTime<Utc>>,
1067    last_candle: Option<Candle>,
1068}
1069
1070impl MarketSnapshot {
1071    fn price(&self) -> Option<Price> {
1072        self.last_trade
1073            .or_else(|| self.last_candle.as_ref().map(|c| c.close))
1074    }
1075}
1076
1077pub struct ShutdownSignal {
1078    flag: Arc<AtomicBool>,
1079    notify: Arc<Notify>,
1080}
1081
1082impl ShutdownSignal {
1083    pub fn new() -> Self {
1084        let flag = Arc::new(AtomicBool::new(false));
1085        let notify = Arc::new(Notify::new());
1086        let flag_clone = flag.clone();
1087        let notify_clone = notify.clone();
1088        tokio::spawn(async move {
1089            if tokio::signal::ctrl_c().await.is_ok() {
1090                flag_clone.store(true, Ordering::SeqCst);
1091                notify_clone.notify_waiters();
1092            }
1093        });
1094        Self { flag, notify }
1095    }
1096
1097    pub fn trigger(&self) {
1098        self.flag.store(true, Ordering::SeqCst);
1099        self.notify.notify_waiters();
1100    }
1101
1102    pub fn triggered(&self) -> bool {
1103        self.flag.load(Ordering::SeqCst)
1104    }
1105
1106    pub async fn wait(&self) {
1107        if self.triggered() {
1108            return;
1109        }
1110        self.notify.notified().await;
1111    }
1112
1113    async fn sleep(&self, duration: Duration) -> bool {
1114        tokio::select! {
1115            _ = tokio::time::sleep(duration) => true,
1116            _ = self.notify.notified() => false,
1117        }
1118    }
1119}
1120
1121impl Default for ShutdownSignal {
1122    fn default() -> Self {
1123        Self::new()
1124    }
1125}
1126
1127impl Clone for ShutdownSignal {
1128    fn clone(&self) -> Self {
1129        Self {
1130            flag: self.flag.clone(),
1131            notify: self.notify.clone(),
1132        }
1133    }
1134}
1135
1136#[allow(clippy::too_many_arguments)]
1137fn spawn_event_subscribers(
1138    bus: Arc<EventBus>,
1139    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1140    strategy_ctx: Arc<Mutex<StrategyContext>>,
1141    orchestrator: Arc<OrderOrchestrator>,
1142    portfolio: Arc<Mutex<Portfolio>>,
1143    metrics: Arc<LiveMetrics>,
1144    alerts: Arc<AlertManager>,
1145    market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1146    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1147    persisted: Arc<Mutex<LiveState>>,
1148    exec_backend: ExecutionBackend,
1149    recorder: Option<RecorderHandle>,
1150    last_data_timestamp: Arc<AtomicI64>,
1151    driver: Arc<String>,
1152) -> Vec<JoinHandle<()>> {
1153    let mut handles = Vec::new();
1154    let market_recorder = recorder.clone();
1155
1156    let market_bus = bus.clone();
1157    let market_strategy = strategy.clone();
1158    let market_ctx = strategy_ctx.clone();
1159    let market_metrics = metrics.clone();
1160    let market_alerts = alerts.clone();
1161    let market_state = state_repo.clone();
1162    let market_persisted = persisted.clone();
1163    let market_portfolio = portfolio.clone();
1164    let market_snapshot = market.clone();
1165    let orchestrator_clone = orchestrator.clone();
1166    let market_data_tracker = last_data_timestamp.clone();
1167    let driver_clone = driver.clone();
1168    handles.push(tokio::spawn(async move {
1169        let recorder = market_recorder;
1170        let mut stream = market_bus.subscribe();
1171        loop {
1172            match stream.recv().await {
1173                Ok(Event::Tick(evt)) => {
1174                    if let Some(handle) = recorder.as_ref() {
1175                        handle.record_tick(evt.tick.clone());
1176                    }
1177                    if let Err(err) = process_tick_event(
1178                        evt.tick,
1179                        market_strategy.clone(),
1180                        market_ctx.clone(),
1181                        market_metrics.clone(),
1182                        market_alerts.clone(),
1183                        market_snapshot.clone(),
1184                        market_portfolio.clone(),
1185                        market_state.clone(),
1186                        market_persisted.clone(),
1187                        market_bus.clone(),
1188                        market_data_tracker.clone(),
1189                    )
1190                    .await
1191                    {
1192                        warn!(error = %err, "tick handler failed");
1193                    }
1194                }
1195                Ok(Event::Candle(evt)) => {
1196                    if let Some(handle) = recorder.as_ref() {
1197                        handle.record_candle(evt.candle.clone());
1198                    }
1199                    if let Err(err) = process_candle_event(
1200                        evt.candle,
1201                        market_strategy.clone(),
1202                        market_ctx.clone(),
1203                        market_metrics.clone(),
1204                        market_alerts.clone(),
1205                        market_snapshot.clone(),
1206                        market_portfolio.clone(),
1207                        orchestrator_clone.clone(),
1208                        exec_backend,
1209                        market_state.clone(),
1210                        market_persisted.clone(),
1211                        market_bus.clone(),
1212                        market_data_tracker.clone(),
1213                    )
1214                    .await
1215                    {
1216                        warn!(error = %err, "candle handler failed");
1217                    }
1218                }
1219                Ok(Event::OrderBook(evt)) => {
1220                    if let Err(err) = process_order_book_event(
1221                        evt.order_book,
1222                        market_strategy.clone(),
1223                        market_ctx.clone(),
1224                        market_metrics.clone(),
1225                        market_alerts.clone(),
1226                        market_snapshot.clone(),
1227                        market_bus.clone(),
1228                        market_data_tracker.clone(),
1229                        driver_clone.clone(),
1230                    )
1231                    .await
1232                    {
1233                        warn!(error = %err, "order book handler failed");
1234                    }
1235                }
1236                Ok(_) => {}
1237                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1238                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1239                    warn!(lag = lag, "market subscriber lagged");
1240                    continue;
1241                }
1242            }
1243        }
1244    }));
1245
1246    let exec_bus = bus.clone();
1247    let exec_portfolio = portfolio.clone();
1248    let exec_market = market.clone();
1249    let exec_persisted = persisted.clone();
1250    let exec_alerts = alerts.clone();
1251    let exec_metrics = metrics.clone();
1252    let exec_orchestrator = orchestrator.clone();
1253    handles.push(tokio::spawn(async move {
1254        let orchestrator = exec_orchestrator.clone();
1255        let mut stream = exec_bus.subscribe();
1256        loop {
1257            match stream.recv().await {
1258                Ok(Event::Signal(evt)) => {
1259                    if let Err(err) = process_signal_event(
1260                        evt.signal,
1261                        orchestrator.clone(),
1262                        exec_portfolio.clone(),
1263                        exec_market.clone(),
1264                        exec_persisted.clone(),
1265                        exec_alerts.clone(),
1266                        exec_metrics.clone(),
1267                    )
1268                    .await
1269                    {
1270                        warn!(error = %err, "signal handler failed");
1271                    }
1272                }
1273                Ok(_) => {}
1274                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1275                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1276                    warn!(lag = lag, "signal subscriber lagged");
1277                    continue;
1278                }
1279            }
1280        }
1281    }));
1282
1283    let fill_bus = bus.clone();
1284    let fill_state = state_repo.clone();
1285    let fill_orchestrator = orchestrator.clone();
1286    let fill_persisted = persisted.clone();
1287    let fill_alerts = alerts.clone();
1288    let fill_recorder = recorder.clone();
1289    handles.push(tokio::spawn(async move {
1290        let orchestrator = fill_orchestrator.clone();
1291        let persisted = fill_persisted.clone();
1292        let recorder = fill_recorder;
1293        let mut stream = fill_bus.subscribe();
1294        loop {
1295            match stream.recv().await {
1296                Ok(Event::Fill(evt)) => {
1297                    if let Some(handle) = recorder.as_ref() {
1298                        handle.record_fill(evt.fill.clone());
1299                    }
1300                    if let Err(err) = process_fill_event(
1301                        evt.fill,
1302                        portfolio.clone(),
1303                        strategy.clone(),
1304                        strategy_ctx.clone(),
1305                        orchestrator.clone(),
1306                        metrics.clone(),
1307                        fill_alerts.clone(),
1308                        fill_state.clone(),
1309                        persisted.clone(),
1310                    )
1311                    .await
1312                    {
1313                        warn!(error = %err, "fill handler failed");
1314                    }
1315                }
1316                Ok(_) => {}
1317                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1318                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1319                    warn!(lag = lag, "fill subscriber lagged");
1320                    continue;
1321                }
1322            }
1323        }
1324    }));
1325
1326    let order_bus = bus.clone();
1327    let order_persisted = persisted.clone();
1328    let order_alerts = alerts.clone();
1329    let order_orchestrator = orchestrator.clone();
1330    // Note: We don't pass strategy to order update handler to avoid lock contention
1331    // on high-frequency updates. Strategy state is snapshotted on candles/fills.
1332    let order_recorder = recorder;
1333    handles.push(tokio::spawn(async move {
1334        let orchestrator = order_orchestrator.clone();
1335        let persisted = order_persisted.clone();
1336        let recorder = order_recorder;
1337        let mut stream = order_bus.subscribe();
1338        loop {
1339            match stream.recv().await {
1340                Ok(Event::OrderUpdate(evt)) => {
1341                    if let Some(handle) = recorder.as_ref() {
1342                        handle.record_order(evt.order.clone());
1343                    }
1344                    if let Err(err) = process_order_update_event(
1345                        evt.order,
1346                        orchestrator.clone(),
1347                        order_alerts.clone(),
1348                        state_repo.clone(),
1349                        persisted.clone(),
1350                    )
1351                    .await
1352                    {
1353                        warn!(error = %err, "order update handler failed");
1354                    }
1355                }
1356                Ok(_) => {}
1357                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1358                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1359                    warn!(lag = lag, "order subscriber lagged");
1360                    continue;
1361                }
1362            }
1363        }
1364    }));
1365
1366    handles
1367}
1368
1369#[allow(clippy::too_many_arguments)]
1370async fn process_tick_event(
1371    tick: Tick,
1372    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1373    strategy_ctx: Arc<Mutex<StrategyContext>>,
1374    metrics: Arc<LiveMetrics>,
1375    alerts: Arc<AlertManager>,
1376    market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1377    portfolio: Arc<Mutex<Portfolio>>,
1378    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1379    persisted: Arc<Mutex<LiveState>>,
1380    bus: Arc<EventBus>,
1381    last_data_timestamp: Arc<AtomicI64>,
1382) -> Result<()> {
1383    metrics.inc_tick();
1384    metrics.update_staleness(0.0);
1385    metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1386    last_data_timestamp.store(tick.exchange_timestamp.timestamp(), Ordering::SeqCst);
1387    alerts.heartbeat().await;
1388    {
1389        let mut guard = market.lock().await;
1390        if let Some(snapshot) = guard.get_mut(&tick.symbol) {
1391            snapshot.last_trade = Some(tick.price);
1392            snapshot.last_trade_ts = Some(tick.exchange_timestamp);
1393        }
1394    }
1395    let mut drawdown_triggered = false;
1396    let mut snapshot_on_trigger = None;
1397    {
1398        let mut guard = portfolio.lock().await;
1399        let was_liquidate_only = guard.liquidate_only();
1400        match guard.update_market_data(&tick.symbol, tick.price) {
1401            Ok(_) => {
1402                if !was_liquidate_only && guard.liquidate_only() {
1403                    drawdown_triggered = true;
1404                    snapshot_on_trigger = Some(guard.snapshot());
1405                }
1406            }
1407            Err(err) => {
1408                warn!(
1409                    symbol = %tick.symbol,
1410                    error = %err,
1411                    "failed to refresh market data"
1412                );
1413            }
1414        }
1415    }
1416    {
1417        let mut state = persisted.lock().await;
1418        state.last_prices.insert(tick.symbol.clone(), tick.price);
1419        if drawdown_triggered {
1420            if let Some(snapshot) = snapshot_on_trigger.take() {
1421                state.portfolio = Some(snapshot);
1422            }
1423        }
1424    }
1425    if drawdown_triggered {
1426        persist_state(
1427            state_repo.clone(),
1428            persisted.clone(),
1429            Some(strategy.clone()),
1430        )
1431        .await?;
1432        alert_liquidate_only(alerts.clone()).await;
1433    }
1434    {
1435        let mut ctx = strategy_ctx.lock().await;
1436        ctx.push_tick(tick.clone());
1437        let lock_start = Instant::now();
1438        let mut strat = strategy.lock().await;
1439        log_strategy_lock("tick", lock_start.elapsed());
1440        let call_start = Instant::now();
1441        strat
1442            .on_tick(&ctx, &tick)
1443            .await
1444            .context("strategy failure on tick event")?;
1445        log_strategy_call("tick", call_start.elapsed());
1446    }
1447    emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1448    Ok(())
1449}
1450
1451#[allow(clippy::too_many_arguments)]
1452async fn process_candle_event(
1453    candle: Candle,
1454    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1455    strategy_ctx: Arc<Mutex<StrategyContext>>,
1456    metrics: Arc<LiveMetrics>,
1457    alerts: Arc<AlertManager>,
1458    market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1459    portfolio: Arc<Mutex<Portfolio>>,
1460    orchestrator: Arc<OrderOrchestrator>,
1461    exec_backend: ExecutionBackend,
1462    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1463    persisted: Arc<Mutex<LiveState>>,
1464    bus: Arc<EventBus>,
1465    last_data_timestamp: Arc<AtomicI64>,
1466) -> Result<()> {
1467    metrics.inc_candle();
1468    metrics.update_staleness(0.0);
1469    metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1470    last_data_timestamp.store(candle.timestamp.timestamp(), Ordering::SeqCst);
1471    alerts.heartbeat().await;
1472    metrics.update_price(&candle.symbol, candle.close.to_f64().unwrap_or(0.0));
1473    {
1474        let mut guard = market.lock().await;
1475        if let Some(snapshot) = guard.get_mut(&candle.symbol) {
1476            snapshot.last_candle = Some(candle.clone());
1477            snapshot.last_trade = Some(candle.close);
1478        }
1479    }
1480    if exec_backend.is_paper() {
1481        let client = orchestrator.execution_engine().client();
1482        if let Some(paper) = client.as_any().downcast_ref::<PaperExecutionClient>() {
1483            paper.update_price(&candle.symbol, candle.close);
1484        }
1485    }
1486    let mut candle_drawdown_triggered = false;
1487    let mut candle_snapshot = None;
1488    {
1489        let mut guard = portfolio.lock().await;
1490        let was_liquidate_only = guard.liquidate_only();
1491        match guard.update_market_data(&candle.symbol, candle.close) {
1492            Ok(_) => {
1493                if !was_liquidate_only && guard.liquidate_only() {
1494                    candle_drawdown_triggered = true;
1495                    candle_snapshot = Some(guard.snapshot());
1496                }
1497            }
1498            Err(err) => {
1499                warn!(
1500                    symbol = %candle.symbol,
1501                    error = %err,
1502                    "failed to refresh market data"
1503                );
1504            }
1505        }
1506    }
1507    if candle_drawdown_triggered {
1508        if let Some(snapshot) = candle_snapshot.take() {
1509            let mut persisted_guard = persisted.lock().await;
1510            persisted_guard.portfolio = Some(snapshot);
1511        }
1512        alert_liquidate_only(alerts.clone()).await;
1513    }
1514    {
1515        let mut ctx = strategy_ctx.lock().await;
1516        ctx.push_candle(candle.clone());
1517        let lock_start = Instant::now();
1518        let mut strat = strategy.lock().await;
1519        log_strategy_lock("candle", lock_start.elapsed());
1520        let call_start = Instant::now();
1521        strat
1522            .on_candle(&ctx, &candle)
1523            .await
1524            .context("strategy failure on candle event")?;
1525        log_strategy_call("candle", call_start.elapsed());
1526    }
1527    {
1528        let mut snapshot = persisted.lock().await;
1529        snapshot.last_candle_ts = Some(candle.timestamp);
1530        snapshot
1531            .last_prices
1532            .insert(candle.symbol.clone(), candle.close);
1533    }
1534    persist_state(
1535        state_repo.clone(),
1536        persisted.clone(),
1537        Some(strategy.clone()),
1538    )
1539    .await?;
1540    let ctx = shared_risk_context(&candle.symbol, &portfolio, &market, &persisted).await;
1541    orchestrator.update_risk_context(candle.symbol.clone(), ctx);
1542    emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1543    Ok(())
1544}
1545
1546#[allow(clippy::too_many_arguments)]
1547async fn process_order_book_event(
1548    mut book: OrderBook,
1549    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1550    strategy_ctx: Arc<Mutex<StrategyContext>>,
1551    metrics: Arc<LiveMetrics>,
1552    alerts: Arc<AlertManager>,
1553    _market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1554    bus: Arc<EventBus>,
1555    last_data_timestamp: Arc<AtomicI64>,
1556    driver: Arc<String>,
1557) -> Result<()> {
1558    metrics.update_staleness(0.0);
1559    alerts.heartbeat().await;
1560    last_data_timestamp.store(book.timestamp.timestamp(), Ordering::SeqCst);
1561    let driver_name = driver.as_str();
1562    let local_checksum = if let Some(cs) = book.local_checksum {
1563        cs
1564    } else {
1565        let computed = book.computed_checksum(None);
1566        book.local_checksum = Some(computed);
1567        computed
1568    };
1569    if let Some(expected) = book.exchange_checksum {
1570        if expected != local_checksum {
1571            metrics.inc_checksum_mismatch(driver_name, &book.symbol);
1572            alerts
1573                .order_book_checksum_mismatch(driver_name, &book.symbol, expected, local_checksum)
1574                .await;
1575        }
1576    }
1577    {
1578        let mut ctx = strategy_ctx.lock().await;
1579        ctx.push_order_book(book.clone());
1580        let lock_start = Instant::now();
1581        let mut strat = strategy.lock().await;
1582        log_strategy_lock("order_book", lock_start.elapsed());
1583        let call_start = Instant::now();
1584        strat
1585            .on_order_book(&ctx, &book)
1586            .await
1587            .context("strategy failure on order book")?;
1588        log_strategy_call("order_book", call_start.elapsed());
1589    }
1590    emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1591    Ok(())
1592}
1593
1594async fn process_signal_event(
1595    signal: Signal,
1596    orchestrator: Arc<OrderOrchestrator>,
1597    portfolio: Arc<Mutex<Portfolio>>,
1598    market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1599    persisted: Arc<Mutex<LiveState>>,
1600    alerts: Arc<AlertManager>,
1601    metrics: Arc<LiveMetrics>,
1602) -> Result<()> {
1603    let ctx = shared_risk_context(&signal.symbol, &portfolio, &market, &persisted).await;
1604    orchestrator.update_risk_context(signal.symbol.clone(), ctx);
1605    match orchestrator.on_signal(&signal, &ctx).await {
1606        Ok(()) => {
1607            alerts.reset_order_failures().await;
1608        }
1609        Err(err) => {
1610            metrics.inc_order_failure();
1611            alerts
1612                .order_failure(&format!("orchestrator error: {err}"))
1613                .await;
1614        }
1615    }
1616    Ok(())
1617}
1618
1619fn log_strategy_lock(event: &str, wait: Duration) {
1620    let wait_ms = wait.as_secs_f64() * 1000.0;
1621    if wait >= STRATEGY_LOCK_WARN_THRESHOLD {
1622        warn!(target: "strategy", event, wait_ms, "strategy lock wait exceeded threshold");
1623    } else {
1624        trace!(target: "strategy", event, wait_ms, "strategy lock acquired");
1625    }
1626}
1627
1628fn log_strategy_call(event: &str, elapsed: Duration) {
1629    let duration_ms = elapsed.as_secs_f64() * 1000.0;
1630    if elapsed >= STRATEGY_CALL_WARN_THRESHOLD {
1631        warn!(target: "strategy", event, duration_ms, "strategy call latency above threshold");
1632    } else {
1633        trace!(target: "strategy", event, duration_ms, "strategy call completed");
1634    }
1635}
1636
1637#[allow(clippy::too_many_arguments)]
1638async fn process_fill_event(
1639    fill: Fill,
1640    portfolio: Arc<Mutex<Portfolio>>,
1641    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1642    strategy_ctx: Arc<Mutex<StrategyContext>>,
1643    orchestrator: Arc<OrderOrchestrator>,
1644    metrics: Arc<LiveMetrics>,
1645    alerts: Arc<AlertManager>,
1646    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1647    persisted: Arc<Mutex<LiveState>>,
1648) -> Result<()> {
1649    let mut drawdown_triggered = false;
1650    {
1651        let mut guard = portfolio.lock().await;
1652        let was_liquidate_only = guard.liquidate_only();
1653        guard
1654            .apply_fill(&fill)
1655            .context("Failed to apply fill to portfolio")?;
1656        if !was_liquidate_only && guard.liquidate_only() {
1657            drawdown_triggered = true;
1658        }
1659        let snapshot = guard.snapshot();
1660        let mut persisted_guard = persisted.lock().await;
1661        persisted_guard.portfolio = Some(snapshot);
1662    }
1663    {
1664        let positions = {
1665            let guard = portfolio.lock().await;
1666            guard.positions()
1667        };
1668        let mut ctx = strategy_ctx.lock().await;
1669        ctx.update_positions(positions);
1670    }
1671    orchestrator.on_fill(&fill).await.ok();
1672    {
1673        let ctx = strategy_ctx.lock().await;
1674        let lock_start = Instant::now();
1675        let mut strat = strategy.lock().await;
1676        log_strategy_lock("fill", lock_start.elapsed());
1677        let call_start = Instant::now();
1678        strat
1679            .on_fill(&ctx, &fill)
1680            .await
1681            .context("Strategy failed on fill event")?;
1682        log_strategy_call("fill", call_start.elapsed());
1683    }
1684    let equity = {
1685        let guard = portfolio.lock().await;
1686        guard.equity()
1687    };
1688    if let Some(value) = equity.to_f64() {
1689        metrics.update_equity(value);
1690    }
1691    alerts.update_equity(equity).await;
1692    metrics.inc_order();
1693    alerts
1694        .notify(
1695            "Order Filled",
1696            &format!(
1697                "order filled: {}@{} ({})",
1698                fill.fill_quantity,
1699                fill.fill_price,
1700                match fill.side {
1701                    Side::Buy => "buy",
1702                    Side::Sell => "sell",
1703                }
1704            ),
1705        )
1706        .await;
1707    if drawdown_triggered {
1708        alert_liquidate_only(alerts.clone()).await;
1709    }
1710    persist_state(
1711        state_repo.clone(),
1712        persisted.clone(),
1713        Some(strategy.clone()),
1714    )
1715    .await?;
1716    Ok(())
1717}
1718
1719async fn process_order_update_event(
1720    order: Order,
1721    orchestrator: Arc<OrderOrchestrator>,
1722    alerts: Arc<AlertManager>,
1723    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1724    persisted: Arc<Mutex<LiveState>>,
1725) -> Result<()> {
1726    orchestrator.on_order_update(&order);
1727    if matches!(order.status, OrderStatus::Rejected) {
1728        error!(
1729            order_id = %order.id,
1730            symbol = %order.request.symbol,
1731            "order rejected by exchange"
1732        );
1733        alerts.order_failure("order rejected by exchange").await;
1734        alerts
1735            .notify(
1736                "Order rejected",
1737                &format!(
1738                    "Order {} for {} was rejected",
1739                    order.id, order.request.symbol
1740                ),
1741            )
1742            .await;
1743    }
1744    {
1745        let mut snapshot = persisted.lock().await;
1746        let mut found = false;
1747        for existing in &mut snapshot.open_orders {
1748            if existing.id == order.id {
1749                *existing = order.clone();
1750                found = true;
1751                break;
1752            }
1753        }
1754        if !found {
1755            snapshot.open_orders.push(order.clone());
1756        }
1757        if matches!(
1758            order.status,
1759            OrderStatus::Filled | OrderStatus::Canceled | OrderStatus::Rejected
1760        ) {
1761            snapshot.open_orders.retain(|o| o.id != order.id);
1762        }
1763    }
1764    persist_state(state_repo, persisted, None).await?;
1765    Ok(())
1766}
1767
1768async fn emit_signals(
1769    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1770    bus: Arc<EventBus>,
1771    metrics: Arc<LiveMetrics>,
1772) {
1773    let signals = {
1774        let mut strat = strategy.lock().await;
1775        strat.drain_signals()
1776    };
1777    if signals.is_empty() {
1778        return;
1779    }
1780    metrics.inc_signals(signals.len());
1781    for signal in signals {
1782        bus.publish(Event::Signal(SignalEvent { signal }));
1783    }
1784}
1785
1786async fn persist_state(
1787    repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1788    persisted: Arc<Mutex<LiveState>>,
1789    strategy: Option<Arc<Mutex<Box<dyn Strategy>>>>,
1790) -> Result<()> {
1791    if let Some(strat_lock) = strategy {
1792        // Snapshot strategy state before cloning the full state for persistence
1793        let strat = strat_lock.lock().await;
1794        if let Ok(json_state) = strat.snapshot() {
1795            let mut guard = persisted.lock().await;
1796            guard.strategy_state = Some(json_state);
1797        } else {
1798            warn!("failed to snapshot strategy state");
1799        }
1800    }
1801
1802    let snapshot = {
1803        let guard = persisted.lock().await;
1804        guard.clone()
1805    };
1806    tokio::task::spawn_blocking(move || repo.save(&snapshot))
1807        .await
1808        .map_err(|err| anyhow!("state persistence task failed: {err}"))?
1809        .map_err(|err| anyhow!(err.to_string()))
1810}
1811
1812async fn shared_risk_context(
1813    symbol: &str,
1814    portfolio: &Arc<Mutex<Portfolio>>,
1815    market: &Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1816    persisted: &Arc<Mutex<LiveState>>,
1817) -> RiskContext {
1818    let (signed_qty, equity, liquidate_only) = {
1819        let guard = portfolio.lock().await;
1820        (
1821            guard.signed_position_qty(symbol),
1822            guard.equity(),
1823            guard.liquidate_only(),
1824        )
1825    };
1826    let observed_price = {
1827        let guard = market.lock().await;
1828        guard.get(symbol).and_then(|snapshot| snapshot.price())
1829    };
1830    let last_price = if let Some(price) = observed_price {
1831        price
1832    } else {
1833        let guard = persisted.lock().await;
1834        guard
1835            .last_prices
1836            .get(symbol)
1837            .copied()
1838            .unwrap_or(Decimal::ZERO)
1839    };
1840    RiskContext {
1841        signed_position_qty: signed_qty,
1842        portfolio_equity: equity,
1843        last_price,
1844        liquidate_only,
1845    }
1846}
1847
1848async fn alert_liquidate_only(alerts: Arc<AlertManager>) {
1849    alerts
1850        .notify(
1851            "Max drawdown triggered",
1852            "Portfolio entered liquidate-only mode; new exposure blocked until review",
1853        )
1854        .await;
1855}
1856
1857fn spawn_connection_monitor(
1858    shutdown: ShutdownSignal,
1859    flag: Arc<AtomicBool>,
1860    metrics: Arc<LiveMetrics>,
1861    stream: &'static str,
1862) -> JoinHandle<()> {
1863    tokio::spawn(async move {
1864        loop {
1865            metrics.update_connection_status(stream, flag.load(Ordering::SeqCst));
1866            if !shutdown.sleep(Duration::from_secs(5)).await {
1867                break;
1868            }
1869        }
1870    })
1871}
1872
1873fn spawn_order_timeout_monitor(
1874    orchestrator: Arc<OrderOrchestrator>,
1875    bus: Arc<EventBus>,
1876    alerts: Arc<AlertManager>,
1877    shutdown: ShutdownSignal,
1878) -> JoinHandle<()> {
1879    tokio::spawn(async move {
1880        let mut ticker = tokio::time::interval(tesser_execution::orchestrator::ORDER_POLL_INTERVAL);
1881        loop {
1882            ticker.tick().await;
1883            if shutdown.triggered() {
1884                break;
1885            }
1886            match orchestrator.poll_stale_orders().await {
1887                Ok(updates) => {
1888                    for order in updates {
1889                        if matches!(order.status, OrderStatus::Rejected | OrderStatus::Canceled) {
1890                            let message = format!(
1891                                "Order {} for {} timed out after {}s",
1892                                order.id,
1893                                order.request.symbol,
1894                                tesser_execution::orchestrator::ORDER_TIMEOUT.as_secs()
1895                            );
1896                            error!(%message);
1897                            alerts.order_failure(&message).await;
1898                            alerts.notify("Order timeout", &message).await;
1899                        }
1900                        bus.publish(Event::OrderUpdate(OrderUpdateEvent { order }));
1901                    }
1902                }
1903                Err(err) => {
1904                    warn!(error = %err, "order timeout monitor failed");
1905                }
1906            }
1907        }
1908    })
1909}
1910
1911async fn load_market_registry(
1912    client: Arc<dyn ExecutionClient>,
1913    settings: &LiveSessionSettings,
1914) -> Result<Arc<MarketRegistry>> {
1915    if let Some(path) = &settings.markets_file {
1916        let registry = MarketRegistry::load_from_file(path)
1917            .with_context(|| format!("failed to load markets from {}", path.display()))?;
1918        return Ok(Arc::new(registry));
1919    }
1920
1921    if settings.exec_backend.is_paper() {
1922        return Err(anyhow!(
1923            "paper execution requires --markets-file when exchange metadata is unavailable"
1924        ));
1925    }
1926
1927    let instruments = client
1928        .list_instruments(settings.category.as_path())
1929        .await
1930        .context("failed to fetch instruments from execution client")?;
1931    let registry =
1932        MarketRegistry::from_instruments(instruments).map_err(|err| anyhow!(err.to_string()))?;
1933    Ok(Arc::new(registry))
1934}
1935
1936#[cfg(feature = "bybit")]
1937#[allow(clippy::too_many_arguments)]
1938fn spawn_bybit_private_stream(
1939    creds: BybitCredentials,
1940    ws_url: String,
1941    private_tx: mpsc::Sender<BrokerEvent>,
1942    exec_client: Arc<dyn ExecutionClient>,
1943    symbols: Vec<String>,
1944    last_sync: Arc<tokio::sync::Mutex<Option<DateTime<Utc>>>>,
1945    private_connection_flag: Option<Arc<AtomicBool>>,
1946    metrics: Arc<LiveMetrics>,
1947    shutdown: ShutdownSignal,
1948) {
1949    tokio::spawn(async move {
1950        loop {
1951            match tesser_bybit::ws::connect_private(
1952                &ws_url,
1953                &creds,
1954                private_connection_flag.clone(),
1955            )
1956            .await
1957            {
1958                Ok(mut socket) => {
1959                    if let Some(flag) = &private_connection_flag {
1960                        flag.store(true, Ordering::SeqCst);
1961                    }
1962                    metrics.update_connection_status("private", true);
1963                    info!("Connected to Bybit private WebSocket stream");
1964                    for symbol in &symbols {
1965                        match exec_client.list_open_orders(symbol).await {
1966                            Ok(orders) => {
1967                                for order in orders {
1968                                    if let Err(err) =
1969                                        private_tx.send(BrokerEvent::OrderUpdate(order)).await
1970                                    {
1971                                        error!("failed to send reconciled order update: {err}");
1972                                    }
1973                                }
1974                            }
1975                            Err(e) => {
1976                                error!("failed to reconcile open orders for {symbol}: {e}");
1977                            }
1978                        }
1979                    }
1980                    if let Some(bybit) = exec_client.as_any().downcast_ref::<BybitClient>() {
1981                        let since = {
1982                            let guard = last_sync.lock().await;
1983                            guard.unwrap_or_else(|| Utc::now() - chrono::Duration::minutes(30))
1984                        };
1985                        match bybit.list_executions_since(since).await {
1986                            Ok(fills) => {
1987                                for fill in fills {
1988                                    if let Err(err) = private_tx.send(BrokerEvent::Fill(fill)).await
1989                                    {
1990                                        error!("failed to send reconciled fill: {err}");
1991                                    }
1992                                }
1993                            }
1994                            Err(e) => {
1995                                error!("failed to reconcile executions since {:?}: {}", since, e);
1996                            }
1997                        }
1998                        let mut guard = last_sync.lock().await;
1999                        *guard = Some(Utc::now());
2000                    }
2001
2002                    while let Some(msg) = socket.next().await {
2003                        if shutdown.triggered() {
2004                            break;
2005                        }
2006                        if let Ok(Message::Text(text)) = msg {
2007                            if let Ok(value) = serde_json::from_str::<serde_json::Value>(&text) {
2008                                if let Some(topic) = value.get("topic").and_then(|v| v.as_str()) {
2009                                    match topic {
2010                                        "order" => {
2011                                            if let Ok(msg) = serde_json::from_value::<
2012                                                PrivateMessage<BybitWsOrder>,
2013                                            >(
2014                                                value.clone()
2015                                            ) {
2016                                                for update in msg.data {
2017                                                    if let Ok(order) = update.to_tesser_order(None)
2018                                                    {
2019                                                        if let Err(err) = private_tx
2020                                                            .send(BrokerEvent::OrderUpdate(order))
2021                                                            .await
2022                                                        {
2023                                                            error!(
2024                                                                "failed to send private order update: {err}"
2025                                                            );
2026                                                        }
2027                                                    }
2028                                                }
2029                                            }
2030                                        }
2031                                        "execution" => {
2032                                            if let Ok(msg) = serde_json::from_value::<
2033                                                PrivateMessage<BybitWsExecution>,
2034                                            >(
2035                                                value.clone()
2036                                            ) {
2037                                                for exec in msg.data {
2038                                                    if let Ok(fill) = exec.to_tesser_fill() {
2039                                                        if let Err(err) = private_tx
2040                                                            .send(BrokerEvent::Fill(fill))
2041                                                            .await
2042                                                        {
2043                                                            error!(
2044                                                                "failed to send private fill event: {err}"
2045                                                            );
2046                                                        }
2047                                                    }
2048                                                }
2049                                            }
2050                                        }
2051                                        _ => {}
2052                                    }
2053                                }
2054                            }
2055                        }
2056                    }
2057                }
2058                Err(e) => {
2059                    if let Some(flag) = &private_connection_flag {
2060                        flag.store(false, Ordering::SeqCst);
2061                    }
2062                    metrics.update_connection_status("private", false);
2063                    error!("Bybit private WebSocket connection failed: {e}. Retrying...");
2064                    tokio::time::sleep(Duration::from_secs(5)).await;
2065                }
2066            }
2067            if shutdown.triggered() {
2068                break;
2069            }
2070        }
2071    });
2072}
2073
2074#[cfg(feature = "binance")]
2075#[allow(clippy::too_many_arguments)]
2076fn spawn_binance_private_stream(
2077    exec_client: Arc<dyn ExecutionClient>,
2078    ws_url: String,
2079    private_tx: mpsc::Sender<BrokerEvent>,
2080    private_connection_flag: Option<Arc<AtomicBool>>,
2081    metrics: Arc<LiveMetrics>,
2082    shutdown: ShutdownSignal,
2083) {
2084    tokio::spawn(async move {
2085        loop {
2086            let Some(binance) = exec_client
2087                .as_ref()
2088                .as_any()
2089                .downcast_ref::<BinanceClient>()
2090            else {
2091                warn!("execution client is not Binance");
2092                return;
2093            };
2094            let listen_key = match binance.start_user_stream().await {
2095                Ok(key) => key,
2096                Err(err) => {
2097                    error!("failed to start Binance user stream: {err}");
2098                    tokio::time::sleep(Duration::from_secs(5)).await;
2099                    continue;
2100                }
2101            };
2102            match BinanceUserDataStream::connect(&ws_url, &listen_key).await {
2103                Ok(user_stream) => {
2104                    if let Some(flag) = &private_connection_flag {
2105                        flag.store(true, Ordering::SeqCst);
2106                    }
2107                    metrics.update_connection_status("private", true);
2108                    let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
2109                    let tx_orders = private_tx.clone();
2110                    user_stream.on_event(move |event| {
2111                        if let Some(update) = extract_order_update(&event) {
2112                            if let Some(order) = order_from_update(update) {
2113                                let _ = tx_orders.blocking_send(BrokerEvent::OrderUpdate(order));
2114                            }
2115                            if let Some(fill) = fill_from_update(update) {
2116                                let _ = tx_orders.blocking_send(BrokerEvent::Fill(fill));
2117                            }
2118                        }
2119                        if matches!(event, UserDataStreamEventsResponse::ListenKeyExpired(_)) {
2120                            let _ = reconnect_tx.try_send(());
2121                        }
2122                    });
2123                    let keepalive_client = exec_client.clone();
2124                    let keepalive_handle = tokio::spawn(async move {
2125                        let mut interval = tokio::time::interval(Duration::from_secs(30 * 60));
2126                        loop {
2127                            interval.tick().await;
2128                            let Some(client) = keepalive_client
2129                                .as_ref()
2130                                .as_any()
2131                                .downcast_ref::<BinanceClient>()
2132                            else {
2133                                break;
2134                            };
2135                            if client.keepalive_user_stream().await.is_err() {
2136                                break;
2137                            }
2138                        }
2139                    });
2140                    tokio::select! {
2141                        _ = reconnect_rx.recv() => {
2142                            warn!("binance listen key expired; reconnecting");
2143                        }
2144                        _ = shutdown.wait() => {
2145                            keepalive_handle.abort();
2146                            let _ = user_stream.unsubscribe().await;
2147                            return;
2148                        }
2149                    }
2150                    keepalive_handle.abort();
2151                    let _ = user_stream.unsubscribe().await;
2152                }
2153                Err(err) => {
2154                    error!("failed to connect to Binance user stream: {err}");
2155                }
2156            }
2157            if let Some(flag) = &private_connection_flag {
2158                flag.store(false, Ordering::SeqCst);
2159            }
2160            metrics.update_connection_status("private", false);
2161            if shutdown.triggered() {
2162                break;
2163            }
2164            tokio::time::sleep(Duration::from_secs(5)).await;
2165        }
2166    });
2167}