tesser_cli/
live.rs

1use std::collections::{HashMap, HashSet};
2use std::net::SocketAddr;
3use std::path::PathBuf;
4use std::sync::{
5    atomic::{AtomicBool, AtomicI64, Ordering},
6    Arc, Once,
7};
8use std::time::{Duration, Instant};
9
10use anyhow::{anyhow, bail, Context, Result};
11use chrono::{DateTime, Utc};
12use clap::ValueEnum;
13use futures::StreamExt;
14use rust_decimal::{prelude::ToPrimitive, Decimal};
15use tokio::sync::{mpsc, Mutex, Notify};
16use tokio::task::JoinHandle;
17use tokio_tungstenite::tungstenite::Message;
18use tracing::{debug, error, info, trace, warn};
19
20use serde_json::{json, Value};
21
22fn ensure_builtin_connectors_registered() {
23    static INIT: Once = Once::new();
24    INIT.call_once(|| {
25        register_connector_factory(Arc::new(PaperFactory::default()));
26        #[cfg(feature = "bybit")]
27        register_bybit_factory();
28        #[cfg(feature = "binance")]
29        register_binance_factory();
30    });
31}
32
33#[cfg(feature = "binance")]
34use tesser_binance::{
35    fill_from_update, order_from_update, register_factory as register_binance_factory,
36    ws::{extract_order_update, BinanceUserDataStream, UserDataStreamEventsResponse},
37    BinanceClient,
38};
39use tesser_broker::{
40    get_connector_factory, register_connector_factory, BrokerResult, ConnectorFactory,
41    ConnectorStream, ConnectorStreamConfig, ExecutionClient,
42};
43#[cfg(feature = "bybit")]
44use tesser_bybit::ws::{BybitWsExecution, BybitWsOrder, PrivateMessage};
45#[cfg(feature = "bybit")]
46use tesser_bybit::{register_factory as register_bybit_factory, BybitClient, BybitCredentials};
47use tesser_config::{AlertingConfig, ExchangeConfig, PersistenceEngine, RiskManagementConfig};
48use tesser_core::{
49    AccountBalance, Candle, Fill, Interval, Order, OrderBook, OrderStatus, Position, Price,
50    Quantity, Side, Signal, Symbol, Tick,
51};
52use tesser_data::recorder::{ParquetRecorder, RecorderConfig, RecorderHandle};
53use tesser_events::{
54    CandleEvent, Event, EventBus, FillEvent, OrderBookEvent, OrderUpdateEvent, SignalEvent,
55    TickEvent,
56};
57use tesser_execution::{
58    AlgoStateRepository, BasicRiskChecker, ExecutionEngine, FixedOrderSizer, OrderOrchestrator,
59    PreTradeRiskChecker, RiskContext, RiskLimits, SqliteAlgoStateRepository, StoredAlgoState,
60};
61use tesser_journal::LmdbJournal;
62use tesser_markets::MarketRegistry;
63use tesser_paper::{PaperExecutionClient, PaperFactory};
64use tesser_portfolio::{
65    LiveState, Portfolio, PortfolioConfig, SqliteStateRepository, StateRepository,
66};
67use tesser_strategy::{Strategy, StrategyContext};
68
69use crate::alerts::{AlertDispatcher, AlertManager};
70use crate::control;
71use crate::telemetry::{spawn_metrics_server, LiveMetrics};
72use crate::PublicChannel;
73
74/// Unified event type for asynchronous updates from the broker.
75#[derive(Debug)]
76pub enum BrokerEvent {
77    OrderUpdate(Order),
78    Fill(Fill),
79}
80
81#[derive(Clone, Copy, Debug, Eq, PartialEq, ValueEnum)]
82#[value(rename_all = "kebab-case")]
83pub enum ExecutionBackend {
84    Paper,
85    Live,
86}
87
88impl ExecutionBackend {
89    fn is_paper(self) -> bool {
90        matches!(self, Self::Paper)
91    }
92}
93
94#[derive(Clone, Copy, Debug, Eq, PartialEq, ValueEnum)]
95#[value(rename_all = "kebab-case")]
96pub enum PersistenceBackend {
97    Sqlite,
98    Lmdb,
99}
100
101impl From<PersistenceBackend> for PersistenceEngine {
102    fn from(value: PersistenceBackend) -> Self {
103        match value {
104            PersistenceBackend::Sqlite => PersistenceEngine::Sqlite,
105            PersistenceBackend::Lmdb => PersistenceEngine::Lmdb,
106        }
107    }
108}
109
110const DEFAULT_ORDER_BOOK_DEPTH: usize = 50;
111
112pub const fn default_order_book_depth() -> usize {
113    DEFAULT_ORDER_BOOK_DEPTH
114}
115const STRATEGY_LOCK_WARN_THRESHOLD: Duration = Duration::from_millis(25);
116const STRATEGY_CALL_WARN_THRESHOLD: Duration = Duration::from_millis(250);
117
118#[async_trait::async_trait]
119trait LiveMarketStream: Send {
120    async fn next_tick(&mut self) -> BrokerResult<Option<Tick>>;
121    async fn next_candle(&mut self) -> BrokerResult<Option<Candle>>;
122    async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>>;
123}
124
125struct FactoryStreamAdapter {
126    inner: Box<dyn ConnectorStream>,
127}
128
129impl FactoryStreamAdapter {
130    fn new(inner: Box<dyn ConnectorStream>) -> Self {
131        Self { inner }
132    }
133}
134
135#[async_trait::async_trait]
136impl LiveMarketStream for FactoryStreamAdapter {
137    async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
138        self.inner.next_tick().await
139    }
140
141    async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
142        self.inner.next_candle().await
143    }
144
145    async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>> {
146        self.inner.next_order_book().await
147    }
148}
149
150#[derive(Clone)]
151pub struct PersistenceSettings {
152    pub engine: PersistenceEngine,
153    pub state_path: PathBuf,
154    pub algo_path: PathBuf,
155}
156
157impl PersistenceSettings {
158    pub fn new(engine: PersistenceEngine, state_path: PathBuf) -> Self {
159        let algo_path = match engine {
160            PersistenceEngine::Sqlite => state_path.with_extension("algos.db"),
161            PersistenceEngine::Lmdb => state_path.clone(),
162        };
163        Self {
164            engine,
165            state_path,
166            algo_path,
167        }
168    }
169
170    fn algo_repo_path(&self) -> &PathBuf {
171        &self.algo_path
172    }
173}
174
175struct PersistenceHandles {
176    state: Arc<dyn StateRepository<Snapshot = LiveState>>,
177    algo: Arc<dyn AlgoStateRepository<State = StoredAlgoState>>,
178}
179
180pub struct LiveSessionSettings {
181    pub category: PublicChannel,
182    pub interval: Interval,
183    pub quantity: Quantity,
184    pub slippage_bps: Decimal,
185    pub fee_bps: Decimal,
186    pub history: usize,
187    pub metrics_addr: SocketAddr,
188    pub persistence: PersistenceSettings,
189    pub initial_balances: HashMap<Symbol, Decimal>,
190    pub reporting_currency: Symbol,
191    pub markets_file: Option<PathBuf>,
192    pub alerting: AlertingConfig,
193    pub exec_backend: ExecutionBackend,
194    pub risk: RiskManagementConfig,
195    pub reconciliation_interval: Duration,
196    pub reconciliation_threshold: Decimal,
197    pub driver: String,
198    pub orderbook_depth: usize,
199    pub record_path: Option<PathBuf>,
200    pub control_addr: SocketAddr,
201}
202
203impl LiveSessionSettings {
204    fn risk_limits(&self) -> RiskLimits {
205        RiskLimits {
206            max_order_quantity: self.risk.max_order_quantity.max(Decimal::ZERO),
207            max_position_quantity: self.risk.max_position_quantity.max(Decimal::ZERO),
208        }
209    }
210}
211
212fn build_persistence_handles(settings: &LiveSessionSettings) -> Result<PersistenceHandles> {
213    match settings.persistence.engine {
214        PersistenceEngine::Sqlite => {
215            let state_repo: Arc<dyn StateRepository<Snapshot = LiveState>> = Arc::new(
216                SqliteStateRepository::new(settings.persistence.state_path.clone()),
217            );
218            let algo_repo: Arc<dyn AlgoStateRepository<State = StoredAlgoState>> = Arc::new(
219                SqliteAlgoStateRepository::new(settings.persistence.algo_repo_path())?,
220            );
221            Ok(PersistenceHandles {
222                state: state_repo,
223                algo: algo_repo,
224            })
225        }
226        PersistenceEngine::Lmdb => {
227            let journal = Arc::new(LmdbJournal::open(&settings.persistence.state_path)?);
228            let state_repo: Arc<dyn StateRepository<Snapshot = LiveState>> =
229                Arc::new(journal.state_repo());
230            let algo_repo: Arc<dyn AlgoStateRepository<State = StoredAlgoState>> =
231                Arc::new(journal.algo_repo());
232            Ok(PersistenceHandles {
233                state: state_repo,
234                algo: algo_repo,
235            })
236        }
237    }
238}
239
240pub async fn run_live(
241    strategy: Box<dyn Strategy>,
242    symbols: Vec<String>,
243    exchange: ExchangeConfig,
244    settings: LiveSessionSettings,
245) -> Result<()> {
246    run_live_with_shutdown(strategy, symbols, exchange, settings, ShutdownSignal::new()).await
247}
248
249/// Variant of [`run_live`] that accepts a manually controlled shutdown signal.
250pub async fn run_live_with_shutdown(
251    strategy: Box<dyn Strategy>,
252    symbols: Vec<String>,
253    exchange: ExchangeConfig,
254    settings: LiveSessionSettings,
255    shutdown: ShutdownSignal,
256) -> Result<()> {
257    if symbols.is_empty() {
258        return Err(anyhow!("strategy did not declare any subscriptions"));
259    }
260    if settings.quantity <= Decimal::ZERO {
261        return Err(anyhow!("--quantity must be positive"));
262    }
263
264    let public_connection = Arc::new(AtomicBool::new(false));
265    let private_connection = if matches!(settings.exec_backend, ExecutionBackend::Live) {
266        Some(Arc::new(AtomicBool::new(false)))
267    } else {
268        None
269    };
270    ensure_builtin_connectors_registered();
271    let connector_payload = build_exchange_payload(&exchange, &settings);
272    let connector_factory = get_connector_factory(&settings.driver)
273        .ok_or_else(|| anyhow!("driver {} is not registered", settings.driver))?;
274    let stream_config = ConnectorStreamConfig {
275        ws_url: Some(exchange.ws_url.clone()),
276        metadata: json!({
277            "category": settings.category.as_path(),
278            "symbols": symbols.clone(),
279            "orderbook_depth": settings.orderbook_depth,
280        }),
281        connection_status: Some(public_connection.clone()),
282    };
283    let mut connector_stream = connector_factory
284        .create_market_stream(&connector_payload, stream_config)
285        .await
286        .map_err(|err| anyhow!("failed to create market stream: {err}"))?;
287    connector_stream
288        .subscribe(&symbols, settings.interval)
289        .await
290        .map_err(|err| anyhow!("failed to subscribe via connector: {err}"))?;
291    let market_stream: Box<dyn LiveMarketStream> =
292        Box::new(FactoryStreamAdapter::new(connector_stream));
293
294    let execution_client =
295        build_execution_client(&settings, connector_factory.clone(), &connector_payload).await?;
296    let market_registry = load_market_registry(execution_client.clone(), &settings).await?;
297    if matches!(settings.exec_backend, ExecutionBackend::Live) {
298        info!(
299            rest = %exchange.rest_url,
300            driver = ?settings.driver,
301            "live execution enabled via {:?} REST",
302            settings.driver
303        );
304    }
305    let risk_checker: Arc<dyn PreTradeRiskChecker> =
306        Arc::new(BasicRiskChecker::new(settings.risk_limits()));
307    let execution = ExecutionEngine::new(
308        execution_client.clone(),
309        Box::new(FixedOrderSizer {
310            quantity: settings.quantity,
311        }),
312        risk_checker,
313    );
314
315    let mut bootstrap = None;
316    if matches!(settings.exec_backend, ExecutionBackend::Live) {
317        info!("synchronizing portfolio snapshot from exchange");
318        let positions = execution_client
319            .positions()
320            .await
321            .context("failed to fetch remote positions")?;
322        let balances = execution_client
323            .account_balances()
324            .await
325            .context("failed to fetch remote account balances")?;
326        let mut open_orders = Vec::new();
327        for symbol in &symbols {
328            let mut symbol_orders = execution_client
329                .list_open_orders(symbol)
330                .await
331                .with_context(|| format!("failed to fetch open orders for {symbol}"))?;
332            open_orders.append(&mut symbol_orders);
333        }
334        bootstrap = Some(LiveBootstrap {
335            positions,
336            balances,
337            open_orders,
338        });
339    }
340
341    let persistence = build_persistence_handles(&settings)?;
342
343    // Create orchestrator with execution engine
344    let initial_open_orders = bootstrap
345        .as_ref()
346        .map(|data| data.open_orders.clone())
347        .unwrap_or_default();
348    let orchestrator = OrderOrchestrator::new(
349        Arc::new(execution),
350        persistence.algo.clone(),
351        initial_open_orders,
352    )
353    .await?;
354
355    let runtime = LiveRuntime::new(
356        market_stream,
357        strategy,
358        symbols,
359        orchestrator,
360        persistence.state,
361        settings,
362        exchange.ws_url.clone(),
363        market_registry,
364        shutdown,
365        public_connection,
366        private_connection,
367        bootstrap,
368    )
369    .await?;
370    runtime.run().await
371}
372
373async fn build_execution_client(
374    settings: &LiveSessionSettings,
375    connector_factory: Arc<dyn ConnectorFactory>,
376    connector_payload: &Value,
377) -> Result<Arc<dyn ExecutionClient>> {
378    match settings.exec_backend {
379        ExecutionBackend::Paper => {
380            if settings.driver == "paper" {
381                return connector_factory
382                    .create_execution_client(connector_payload)
383                    .await
384                    .map_err(|err| anyhow!("failed to create execution client: {err}"));
385            }
386            Ok(Arc::new(PaperExecutionClient::new(
387                "paper".to_string(),
388                vec!["BTCUSDT".to_string()],
389                settings.slippage_bps,
390                settings.fee_bps,
391            )))
392        }
393        ExecutionBackend::Live => connector_factory
394            .create_execution_client(connector_payload)
395            .await
396            .map_err(|err| anyhow!("failed to create execution client: {err}")),
397    }
398}
399
400struct LiveRuntime {
401    market: Box<dyn LiveMarketStream>,
402    orchestrator: Arc<OrderOrchestrator>,
403    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
404    persisted: Arc<Mutex<LiveState>>,
405    event_bus: Arc<EventBus>,
406    recorder: Option<ParquetRecorder>,
407    control_task: Option<JoinHandle<()>>,
408    shutdown: ShutdownSignal,
409    metrics_task: JoinHandle<()>,
410    alert_task: Option<JoinHandle<()>>,
411    reconciliation_task: Option<JoinHandle<()>>,
412    reconciliation_ctx: Option<Arc<ReconciliationContext>>,
413    private_event_rx: mpsc::Receiver<BrokerEvent>,
414    #[allow(dead_code)]
415    last_private_sync: Arc<tokio::sync::Mutex<Option<DateTime<Utc>>>>,
416    subscriber_handles: Vec<JoinHandle<()>>,
417    connection_monitors: Vec<JoinHandle<()>>,
418    order_timeout_task: JoinHandle<()>,
419    strategy: Arc<Mutex<Box<dyn Strategy>>>,
420    _public_connection: Arc<AtomicBool>,
421    _private_connection: Option<Arc<AtomicBool>>,
422}
423
424struct LiveBootstrap {
425    positions: Vec<Position>,
426    balances: Vec<AccountBalance>,
427    open_orders: Vec<Order>,
428}
429
430impl LiveRuntime {
431    #[allow(clippy::too_many_arguments)]
432    async fn new(
433        market: Box<dyn LiveMarketStream>,
434        mut strategy: Box<dyn Strategy>,
435        symbols: Vec<String>,
436        orchestrator: OrderOrchestrator,
437        state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
438        settings: LiveSessionSettings,
439        #[cfg_attr(not(feature = "binance"), allow(unused_variables))] exchange_ws_url: String,
440        market_registry: Arc<MarketRegistry>,
441        shutdown: ShutdownSignal,
442        public_connection: Arc<AtomicBool>,
443        private_connection: Option<Arc<AtomicBool>>,
444        bootstrap: Option<LiveBootstrap>,
445    ) -> Result<Self> {
446        let mut strategy_ctx = StrategyContext::new(settings.history);
447        let driver = Arc::new(settings.driver.clone());
448        let mut persisted = match tokio::task::spawn_blocking({
449            let repo = state_repo.clone();
450            move || repo.load()
451        })
452        .await
453        {
454            Ok(Ok(state)) => state,
455            Ok(Err(err)) => {
456                warn!(error = %err, "failed to load live state; starting from defaults");
457                LiveState::default()
458            }
459            Err(err) => {
460                warn!(error = %err, "state load task failed; starting from defaults");
461                LiveState::default()
462            }
463        };
464        let mut live_bootstrap = None;
465        if let Some(data) = bootstrap {
466            persisted.open_orders = data.open_orders;
467            live_bootstrap = Some((data.positions, data.balances));
468        } else if matches!(settings.exec_backend, ExecutionBackend::Live) {
469            warn!("live session missing bootstrap data; continuing without remote snapshot");
470        }
471
472        let portfolio_cfg = PortfolioConfig {
473            initial_balances: settings.initial_balances.clone(),
474            reporting_currency: settings.reporting_currency.clone(),
475            max_drawdown: Some(settings.risk.max_drawdown),
476        };
477        let portfolio = if let Some((positions, balances)) = live_bootstrap {
478            Portfolio::from_exchange_state(
479                positions,
480                balances,
481                portfolio_cfg.clone(),
482                market_registry.clone(),
483            )
484        } else if let Some(snapshot) = persisted.portfolio.take() {
485            Portfolio::from_state(snapshot, portfolio_cfg.clone(), market_registry.clone())
486        } else {
487            Portfolio::new(portfolio_cfg.clone(), market_registry.clone())
488        };
489        strategy_ctx.update_positions(portfolio.positions());
490        // Restore strategy state if found in persistence
491        if let Some(state) = persisted.strategy_state.take() {
492            info!("restoring strategy state from persistence");
493            strategy
494                .restore(state)
495                .context("failed to restore strategy state")?;
496        }
497        persisted.portfolio = Some(portfolio.snapshot());
498
499        let mut market_snapshots = HashMap::new();
500        for symbol in &symbols {
501            let mut snapshot = MarketSnapshot::default();
502            if let Some(price) = persisted.last_prices.get(symbol).copied() {
503                snapshot.last_trade = Some(price);
504            }
505            market_snapshots.insert(symbol.clone(), snapshot);
506        }
507
508        let metrics = LiveMetrics::new();
509        metrics.update_connection_status("public", public_connection.load(Ordering::SeqCst));
510        if let Some(flag) = &private_connection {
511            metrics.update_connection_status("private", flag.load(Ordering::SeqCst));
512        }
513        let metrics_task = spawn_metrics_server(metrics.registry(), settings.metrics_addr);
514        let dispatcher = AlertDispatcher::new(settings.alerting.webhook_url.clone());
515        let alerts = AlertManager::new(
516            settings.alerting,
517            dispatcher,
518            Some(public_connection.clone()),
519            private_connection.clone(),
520        );
521        let (private_event_tx, private_event_rx) = mpsc::channel(1024);
522        let last_private_sync = Arc::new(tokio::sync::Mutex::new(persisted.last_candle_ts));
523        let alerts = Arc::new(alerts);
524        let alert_task = alerts.spawn_watchdog();
525        let metrics = Arc::new(metrics);
526        let mut connection_monitors = Vec::new();
527        connection_monitors.push(spawn_connection_monitor(
528            shutdown.clone(),
529            public_connection.clone(),
530            metrics.clone(),
531            "public",
532        ));
533        if let Some(flag) = private_connection.clone() {
534            connection_monitors.push(spawn_connection_monitor(
535                shutdown.clone(),
536                flag,
537                metrics.clone(),
538                "private",
539            ));
540        }
541
542        if !settings.exec_backend.is_paper() {
543            let execution_engine = orchestrator.execution_engine();
544            let exec_client = execution_engine.client();
545            match settings.driver.as_str() {
546                "bybit" | "" => {
547                    #[cfg(feature = "bybit")]
548                    {
549                        let bybit = exec_client
550                            .as_ref()
551                            .as_any()
552                            .downcast_ref::<BybitClient>()
553                            .ok_or_else(|| anyhow!("execution client is not Bybit"))?;
554                        let creds = bybit
555                            .get_credentials()
556                            .ok_or_else(|| anyhow!("live execution requires Bybit credentials"))?;
557                        spawn_bybit_private_stream(
558                            creds,
559                            bybit.get_ws_url(),
560                            private_event_tx.clone(),
561                            exec_client.clone(),
562                            symbols.clone(),
563                            last_private_sync.clone(),
564                            private_connection.clone(),
565                            metrics.clone(),
566                            shutdown.clone(),
567                        );
568                    }
569                    #[cfg(not(feature = "bybit"))]
570                    {
571                        bail!("driver 'bybit' is unavailable without the 'bybit' feature");
572                    }
573                }
574                "binance" => {
575                    #[cfg(feature = "binance")]
576                    {
577                        spawn_binance_private_stream(
578                            exec_client.clone(),
579                            exchange_ws_url.clone(),
580                            private_event_tx.clone(),
581                            private_connection.clone(),
582                            metrics.clone(),
583                            shutdown.clone(),
584                        );
585                    }
586                    #[cfg(not(feature = "binance"))]
587                    {
588                        bail!("driver 'binance' is unavailable without the 'binance' feature");
589                    }
590                }
591                "paper" => {}
592                other => {
593                    bail!("private stream unsupported for driver '{other}'");
594                }
595            }
596        }
597
598        let recorder = if let Some(record_path) = settings.record_path.clone() {
599            let config = RecorderConfig {
600                root: record_path.clone(),
601                ..RecorderConfig::default()
602            };
603            match ParquetRecorder::spawn(config).await {
604                Ok(recorder) => {
605                    info!(path = %record_path.display(), "flight recorder enabled");
606                    Some(recorder)
607                }
608                Err(err) => {
609                    warn!(
610                        error = %err,
611                        path = %record_path.display(),
612                        "failed to start flight recorder"
613                    );
614                    None
615                }
616            }
617        } else {
618            None
619        };
620        let recorder_handle = recorder.as_ref().map(|rec| rec.handle());
621
622        let strategy = Arc::new(Mutex::new(strategy));
623        let strategy_ctx = Arc::new(Mutex::new(strategy_ctx));
624        let portfolio = Arc::new(Mutex::new(portfolio));
625        let market_cache = Arc::new(Mutex::new(market_snapshots));
626        let persisted = Arc::new(Mutex::new(persisted));
627        let orchestrator = Arc::new(orchestrator);
628        let event_bus = Arc::new(EventBus::new(2048));
629        let last_data_timestamp = Arc::new(AtomicI64::new(0));
630        let control_task = control::spawn_control_plane(
631            settings.control_addr,
632            portfolio.clone(),
633            orchestrator.clone(),
634            persisted.clone(),
635            last_data_timestamp.clone(),
636            event_bus.clone(),
637            shutdown.clone(),
638        );
639        let reconciliation_ctx = (!settings.exec_backend.is_paper()).then(|| {
640            Arc::new(ReconciliationContext::new(ReconciliationContextConfig {
641                client: orchestrator.execution_engine().client(),
642                portfolio: portfolio.clone(),
643                persisted: persisted.clone(),
644                state_repo: state_repo.clone(),
645                alerts: alerts.clone(),
646                metrics: metrics.clone(),
647                reporting_currency: settings.reporting_currency.clone(),
648                threshold: settings.reconciliation_threshold,
649            }))
650        });
651        let reconciliation_task = reconciliation_ctx.as_ref().map(|ctx| {
652            spawn_reconciliation_loop(
653                ctx.clone(),
654                shutdown.clone(),
655                settings.reconciliation_interval,
656            )
657        });
658        let subscriber_handles = spawn_event_subscribers(
659            event_bus.clone(),
660            strategy.clone(),
661            strategy_ctx.clone(),
662            orchestrator.clone(),
663            portfolio.clone(),
664            metrics.clone(),
665            alerts.clone(),
666            market_cache.clone(),
667            state_repo.clone(),
668            persisted.clone(),
669            settings.exec_backend,
670            recorder_handle.clone(),
671            last_data_timestamp.clone(),
672            driver.clone(),
673        );
674        let order_timeout_task = spawn_order_timeout_monitor(
675            orchestrator.clone(),
676            event_bus.clone(),
677            alerts.clone(),
678            shutdown.clone(),
679        );
680
681        info!(
682            symbols = ?symbols,
683            category = ?settings.category,
684            metrics_addr = %settings.metrics_addr,
685            state_path = %settings.persistence.state_path.display(),
686            persistence_engine = ?settings.persistence.engine,
687            history = settings.history,
688            "market stream ready"
689        );
690
691        for symbol in &symbols {
692            let ctx = shared_risk_context(symbol, &portfolio, &market_cache, &persisted).await;
693            orchestrator.update_risk_context(symbol.clone(), ctx);
694        }
695
696        Ok(Self {
697            market,
698            orchestrator,
699            state_repo,
700            persisted,
701            event_bus,
702            recorder,
703            control_task: Some(control_task),
704            shutdown,
705            metrics_task,
706            alert_task,
707            reconciliation_task,
708            reconciliation_ctx,
709            private_event_rx,
710            last_private_sync,
711            subscriber_handles,
712            connection_monitors,
713            order_timeout_task,
714            strategy,
715            _public_connection: public_connection,
716            _private_connection: private_connection,
717        })
718    }
719
720    async fn run(mut self) -> Result<()> {
721        info!("live session started");
722        if let Some(ctx) = self.reconciliation_ctx.as_ref() {
723            perform_state_reconciliation(ctx.as_ref())
724                .await
725                .context("initial state reconciliation failed")?;
726        }
727        let backoff = Duration::from_millis(200);
728        let mut orchestrator_timer = tokio::time::interval(Duration::from_secs(1));
729
730        'run: while !self.shutdown.triggered() {
731            let mut progressed = false;
732
733            let tick = tokio::select! {
734                res = self.market.next_tick() => Some(res),
735                _ = self.shutdown.wait() => None,
736            };
737            match tick {
738                Some(res) => {
739                    if let Some(tick) = res? {
740                        progressed = true;
741                        self.event_bus.publish(Event::Tick(TickEvent { tick }));
742                    }
743                }
744                None => break 'run,
745            }
746
747            let candle = tokio::select! {
748                res = self.market.next_candle() => Some(res),
749                _ = self.shutdown.wait() => None,
750            };
751            match candle {
752                Some(res) => {
753                    if let Some(candle) = res? {
754                        progressed = true;
755                        self.event_bus
756                            .publish(Event::Candle(CandleEvent { candle }));
757                    }
758                }
759                None => break 'run,
760            }
761
762            let book = tokio::select! {
763                res = self.market.next_order_book() => Some(res),
764                _ = self.shutdown.wait() => None,
765            };
766            match book {
767                Some(res) => {
768                    if let Some(book) = res? {
769                        progressed = true;
770                        self.event_bus
771                            .publish(Event::OrderBook(OrderBookEvent { order_book: book }));
772                    }
773                }
774                None => break 'run,
775            }
776
777            tokio::select! {
778                biased;
779                Some(event) = self.private_event_rx.recv() => {
780                    progressed = true;
781                    match event {
782                        BrokerEvent::OrderUpdate(order) => {
783                            info!(
784                                order_id = %order.id,
785                                status = ?order.status,
786                                symbol = %order.request.symbol,
787                                "received private order update"
788                            );
789                            self.event_bus
790                                .publish(Event::OrderUpdate(OrderUpdateEvent { order }));
791                        }
792                        BrokerEvent::Fill(fill) => {
793                            info!(
794                                order_id = %fill.order_id,
795                                symbol = %fill.symbol,
796                                qty = %fill.fill_quantity,
797                                price = %fill.fill_price,
798                                "received private fill"
799                            );
800                            self.event_bus.publish(Event::Fill(FillEvent { fill }));
801                        }
802                    }
803                }
804                _ = orchestrator_timer.tick() => {
805                    // Drive TWAP and other time-based algorithms
806                    if let Err(e) = self.orchestrator.on_timer_tick().await {
807                        error!("Orchestrator timer tick failed: {}", e);
808                    }
809                }
810                _ = self.shutdown.wait() => break 'run,
811                else => {}
812            }
813
814            if !progressed && !self.shutdown.sleep(backoff).await {
815                break;
816            }
817        }
818        info!("live session stopping");
819        self.metrics_task.abort();
820        if let Some(handle) = self.alert_task.take() {
821            handle.abort();
822        }
823        if let Some(handle) = self.reconciliation_task.take() {
824            handle.abort();
825        }
826        self.order_timeout_task.abort();
827        for handle in self.subscriber_handles.drain(..) {
828            handle.abort();
829        }
830        for handle in self.connection_monitors.drain(..) {
831            handle.abort();
832        }
833        if let Err(err) = persist_state(
834            self.state_repo.clone(),
835            self.persisted.clone(),
836            Some(self.strategy.clone()),
837        )
838        .await
839        {
840            warn!(error = %err, "failed to persist shutdown state");
841        }
842        if let Some(task) = self.control_task.take() {
843            if let Err(err) = task.await {
844                warn!(error = %err, "control plane server task aborted");
845            }
846        }
847        if let Some(recorder) = self.recorder.take() {
848            if let Err(err) = recorder.shutdown().await {
849                warn!(error = %err, "failed to flush flight recorder");
850            }
851        }
852        Ok(())
853    }
854}
855
856struct ReconciliationContext {
857    client: Arc<dyn ExecutionClient>,
858    portfolio: Arc<Mutex<Portfolio>>,
859    persisted: Arc<Mutex<LiveState>>,
860    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
861    alerts: Arc<AlertManager>,
862    metrics: Arc<LiveMetrics>,
863    reporting_currency: Symbol,
864    threshold: Decimal,
865}
866
867struct ReconciliationContextConfig {
868    client: Arc<dyn ExecutionClient>,
869    portfolio: Arc<Mutex<Portfolio>>,
870    persisted: Arc<Mutex<LiveState>>,
871    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
872    alerts: Arc<AlertManager>,
873    metrics: Arc<LiveMetrics>,
874    reporting_currency: Symbol,
875    threshold: Decimal,
876}
877
878impl ReconciliationContext {
879    fn new(config: ReconciliationContextConfig) -> Self {
880        let ReconciliationContextConfig {
881            client,
882            portfolio,
883            persisted,
884            state_repo,
885            alerts,
886            metrics,
887            reporting_currency,
888            threshold,
889        } = config;
890        let min_threshold = Decimal::new(1, 6); // 0.000001 as a practical floor
891        let threshold = if threshold <= Decimal::ZERO {
892            min_threshold
893        } else {
894            threshold
895        };
896        Self {
897            client,
898            portfolio,
899            persisted,
900            state_repo,
901            alerts,
902            metrics,
903            reporting_currency,
904            threshold,
905        }
906    }
907}
908
909fn spawn_reconciliation_loop(
910    ctx: Arc<ReconciliationContext>,
911    shutdown: ShutdownSignal,
912    interval: Duration,
913) -> JoinHandle<()> {
914    tokio::spawn(async move {
915        while shutdown.sleep(interval).await {
916            if let Err(err) = perform_state_reconciliation(ctx.as_ref()).await {
917                error!(error = %err, "periodic state reconciliation failed");
918            }
919        }
920    })
921}
922
923async fn perform_state_reconciliation(ctx: &ReconciliationContext) -> Result<()> {
924    info!("running state reconciliation");
925    let remote_positions = ctx
926        .client
927        .positions()
928        .await
929        .context("failed to fetch remote positions")?;
930    let remote_balances = ctx
931        .client
932        .account_balances()
933        .await
934        .context("failed to fetch remote balances")?;
935    let (local_positions, local_cash) = {
936        let guard = ctx.portfolio.lock().await;
937        (guard.positions(), guard.cash())
938    };
939
940    let remote_map = positions_to_map(remote_positions);
941    let local_map = positions_to_map(local_positions);
942    let mut tracked_symbols: HashSet<String> = HashSet::new();
943    tracked_symbols.extend(remote_map.keys().cloned());
944    tracked_symbols.extend(local_map.keys().cloned());
945
946    let mut severe_findings = Vec::new();
947    for symbol in tracked_symbols {
948        let local_qty = local_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
949        let remote_qty = remote_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
950        let diff = (local_qty - remote_qty).abs();
951        let diff_value = diff.to_f64().unwrap_or(0.0);
952        ctx.metrics.update_position_diff(&symbol, diff_value);
953        if diff > Decimal::ZERO {
954            warn!(
955                symbol = %symbol,
956                local = %local_qty,
957                remote = %remote_qty,
958                diff = %diff,
959                "position mismatch detected during reconciliation"
960            );
961            let pct = normalize_diff(diff, remote_qty);
962            if pct >= ctx.threshold {
963                error!(
964                    symbol = %symbol,
965                    local = %local_qty,
966                    remote = %remote_qty,
967                    diff = %diff,
968                    pct = %pct,
969                    "position mismatch exceeds threshold"
970                );
971                severe_findings.push(format!(
972                    "{symbol} local={local_qty} remote={remote_qty} diff={diff}"
973                ));
974            }
975        }
976    }
977
978    let reporting = ctx.reporting_currency.as_str();
979    let remote_cash = remote_balances
980        .iter()
981        .find(|balance| balance.currency == reporting)
982        .map(|balance| balance.available)
983        .unwrap_or_else(|| Decimal::ZERO);
984    let cash_diff = (remote_cash - local_cash).abs();
985    ctx.metrics
986        .update_balance_diff(reporting, cash_diff.to_f64().unwrap_or(0.0));
987    if cash_diff > Decimal::ZERO {
988        warn!(
989            currency = %reporting,
990            local = %local_cash,
991            remote = %remote_cash,
992            diff = %cash_diff,
993            "balance mismatch detected during reconciliation"
994        );
995        let pct = normalize_diff(cash_diff, remote_cash);
996        if pct >= ctx.threshold {
997            error!(
998                currency = %reporting,
999                local = %local_cash,
1000                remote = %remote_cash,
1001                diff = %cash_diff,
1002                pct = %pct,
1003                "balance mismatch exceeds threshold"
1004            );
1005            severe_findings.push(format!(
1006                "{reporting} balance local={local_cash} remote={remote_cash} diff={cash_diff}"
1007            ));
1008        }
1009    }
1010
1011    if severe_findings.is_empty() {
1012        info!("state reconciliation complete with no critical divergence");
1013        return Ok(());
1014    }
1015
1016    let alert_body = severe_findings.join("; ");
1017    ctx.alerts
1018        .notify("State reconciliation divergence", &alert_body)
1019        .await;
1020    enforce_liquidate_only(ctx).await;
1021    Ok(())
1022}
1023
1024async fn enforce_liquidate_only(ctx: &ReconciliationContext) {
1025    let snapshot = {
1026        let mut guard = ctx.portfolio.lock().await;
1027        if !guard.set_liquidate_only(true) {
1028            return;
1029        }
1030        info!("entering liquidate-only mode due to reconciliation divergence");
1031        guard.snapshot()
1032    };
1033    {
1034        let mut state = ctx.persisted.lock().await;
1035        state.portfolio = Some(snapshot);
1036    }
1037    if let Err(err) = persist_state(ctx.state_repo.clone(), ctx.persisted.clone(), None).await {
1038        warn!(error = %err, "failed to persist liquidate-only transition");
1039    }
1040}
1041
1042fn positions_to_map(positions: Vec<Position>) -> HashMap<String, Decimal> {
1043    let mut map = HashMap::new();
1044    for position in positions {
1045        map.insert(position.symbol.clone(), position_signed_qty(&position));
1046    }
1047    map
1048}
1049
1050fn position_signed_qty(position: &Position) -> Decimal {
1051    match position.side {
1052        Some(Side::Buy) => position.quantity,
1053        Some(Side::Sell) => -position.quantity,
1054        None => Decimal::ZERO,
1055    }
1056}
1057
1058fn normalize_diff(diff: Decimal, reference: Decimal) -> Decimal {
1059    if diff <= Decimal::ZERO {
1060        Decimal::ZERO
1061    } else {
1062        let denominator = std::cmp::max(reference.abs(), Decimal::ONE);
1063        diff / denominator
1064    }
1065}
1066
1067fn build_exchange_payload(exchange: &ExchangeConfig, settings: &LiveSessionSettings) -> Value {
1068    let mut payload = serde_json::Map::new();
1069    payload.insert("rest_url".into(), Value::String(exchange.rest_url.clone()));
1070    payload.insert("ws_url".into(), Value::String(exchange.ws_url.clone()));
1071    payload.insert("api_key".into(), Value::String(exchange.api_key.clone()));
1072    payload.insert(
1073        "api_secret".into(),
1074        Value::String(exchange.api_secret.clone()),
1075    );
1076    payload.insert(
1077        "category".into(),
1078        Value::String(settings.category.as_path().to_string()),
1079    );
1080    payload.insert(
1081        "orderbook_depth".into(),
1082        Value::Number(serde_json::Number::from(settings.orderbook_depth as u64)),
1083    );
1084    if let Value::Object(extra) = exchange.params.clone() {
1085        for (key, value) in extra {
1086            payload.insert(key, value);
1087        }
1088    }
1089    Value::Object(payload)
1090}
1091
1092#[derive(Default)]
1093struct MarketSnapshot {
1094    last_trade: Option<Price>,
1095    last_trade_ts: Option<DateTime<Utc>>,
1096    last_candle: Option<Candle>,
1097}
1098
1099impl MarketSnapshot {
1100    fn price(&self) -> Option<Price> {
1101        self.last_trade
1102            .or_else(|| self.last_candle.as_ref().map(|c| c.close))
1103    }
1104}
1105
1106pub struct ShutdownSignal {
1107    flag: Arc<AtomicBool>,
1108    notify: Arc<Notify>,
1109}
1110
1111impl ShutdownSignal {
1112    pub fn new() -> Self {
1113        let flag = Arc::new(AtomicBool::new(false));
1114        let notify = Arc::new(Notify::new());
1115        let flag_clone = flag.clone();
1116        let notify_clone = notify.clone();
1117        tokio::spawn(async move {
1118            if tokio::signal::ctrl_c().await.is_ok() {
1119                flag_clone.store(true, Ordering::SeqCst);
1120                notify_clone.notify_waiters();
1121            }
1122        });
1123        Self { flag, notify }
1124    }
1125
1126    pub fn trigger(&self) {
1127        self.flag.store(true, Ordering::SeqCst);
1128        self.notify.notify_waiters();
1129    }
1130
1131    pub fn triggered(&self) -> bool {
1132        self.flag.load(Ordering::SeqCst)
1133    }
1134
1135    pub async fn wait(&self) {
1136        if self.triggered() {
1137            return;
1138        }
1139        self.notify.notified().await;
1140    }
1141
1142    async fn sleep(&self, duration: Duration) -> bool {
1143        tokio::select! {
1144            _ = tokio::time::sleep(duration) => true,
1145            _ = self.notify.notified() => false,
1146        }
1147    }
1148}
1149
1150impl Default for ShutdownSignal {
1151    fn default() -> Self {
1152        Self::new()
1153    }
1154}
1155
1156impl Clone for ShutdownSignal {
1157    fn clone(&self) -> Self {
1158        Self {
1159            flag: self.flag.clone(),
1160            notify: self.notify.clone(),
1161        }
1162    }
1163}
1164
1165#[allow(clippy::too_many_arguments)]
1166fn spawn_event_subscribers(
1167    bus: Arc<EventBus>,
1168    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1169    strategy_ctx: Arc<Mutex<StrategyContext>>,
1170    orchestrator: Arc<OrderOrchestrator>,
1171    portfolio: Arc<Mutex<Portfolio>>,
1172    metrics: Arc<LiveMetrics>,
1173    alerts: Arc<AlertManager>,
1174    market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1175    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1176    persisted: Arc<Mutex<LiveState>>,
1177    exec_backend: ExecutionBackend,
1178    recorder: Option<RecorderHandle>,
1179    last_data_timestamp: Arc<AtomicI64>,
1180    driver: Arc<String>,
1181) -> Vec<JoinHandle<()>> {
1182    let mut handles = Vec::new();
1183    let market_recorder = recorder.clone();
1184
1185    let market_bus = bus.clone();
1186    let market_strategy = strategy.clone();
1187    let market_ctx = strategy_ctx.clone();
1188    let market_metrics = metrics.clone();
1189    let market_alerts = alerts.clone();
1190    let market_state = state_repo.clone();
1191    let market_persisted = persisted.clone();
1192    let market_portfolio = portfolio.clone();
1193    let market_snapshot = market.clone();
1194    let orchestrator_clone = orchestrator.clone();
1195    let market_data_tracker = last_data_timestamp.clone();
1196    let driver_clone = driver.clone();
1197    handles.push(tokio::spawn(async move {
1198        let recorder = market_recorder;
1199        let mut stream = market_bus.subscribe();
1200        loop {
1201            match stream.recv().await {
1202                Ok(Event::Tick(evt)) => {
1203                    if let Some(handle) = recorder.as_ref() {
1204                        handle.record_tick(evt.tick.clone());
1205                    }
1206                    if let Err(err) = process_tick_event(
1207                        evt.tick,
1208                        market_strategy.clone(),
1209                        market_ctx.clone(),
1210                        market_metrics.clone(),
1211                        market_alerts.clone(),
1212                        market_snapshot.clone(),
1213                        market_portfolio.clone(),
1214                        market_state.clone(),
1215                        market_persisted.clone(),
1216                        market_bus.clone(),
1217                        market_data_tracker.clone(),
1218                    )
1219                    .await
1220                    {
1221                        warn!(error = %err, "tick handler failed");
1222                    }
1223                }
1224                Ok(Event::Candle(evt)) => {
1225                    if let Some(handle) = recorder.as_ref() {
1226                        handle.record_candle(evt.candle.clone());
1227                    }
1228                    if let Err(err) = process_candle_event(
1229                        evt.candle,
1230                        market_strategy.clone(),
1231                        market_ctx.clone(),
1232                        market_metrics.clone(),
1233                        market_alerts.clone(),
1234                        market_snapshot.clone(),
1235                        market_portfolio.clone(),
1236                        orchestrator_clone.clone(),
1237                        exec_backend,
1238                        market_state.clone(),
1239                        market_persisted.clone(),
1240                        market_bus.clone(),
1241                        market_data_tracker.clone(),
1242                    )
1243                    .await
1244                    {
1245                        warn!(error = %err, "candle handler failed");
1246                    }
1247                }
1248                Ok(Event::OrderBook(evt)) => {
1249                    if let Err(err) = process_order_book_event(
1250                        evt.order_book,
1251                        market_strategy.clone(),
1252                        market_ctx.clone(),
1253                        market_metrics.clone(),
1254                        market_alerts.clone(),
1255                        market_snapshot.clone(),
1256                        market_bus.clone(),
1257                        market_data_tracker.clone(),
1258                        driver_clone.clone(),
1259                    )
1260                    .await
1261                    {
1262                        warn!(error = %err, "order book handler failed");
1263                    }
1264                }
1265                Ok(_) => {}
1266                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1267                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1268                    warn!(lag = lag, "market subscriber lagged");
1269                    continue;
1270                }
1271            }
1272        }
1273    }));
1274
1275    let exec_bus = bus.clone();
1276    let exec_portfolio = portfolio.clone();
1277    let exec_market = market.clone();
1278    let exec_persisted = persisted.clone();
1279    let exec_alerts = alerts.clone();
1280    let exec_metrics = metrics.clone();
1281    let exec_orchestrator = orchestrator.clone();
1282    let exec_recorder = recorder.clone();
1283    handles.push(tokio::spawn(async move {
1284        let orchestrator = exec_orchestrator.clone();
1285        let recorder = exec_recorder;
1286        let mut stream = exec_bus.subscribe();
1287        loop {
1288            match stream.recv().await {
1289                Ok(Event::Signal(evt)) => {
1290                    if let Some(handle) = recorder.as_ref() {
1291                        handle.record_signal(evt.signal.clone());
1292                    }
1293                    if let Err(err) = process_signal_event(
1294                        evt.signal,
1295                        orchestrator.clone(),
1296                        exec_portfolio.clone(),
1297                        exec_market.clone(),
1298                        exec_persisted.clone(),
1299                        exec_alerts.clone(),
1300                        exec_metrics.clone(),
1301                    )
1302                    .await
1303                    {
1304                        warn!(error = %err, "signal handler failed");
1305                    }
1306                }
1307                Ok(_) => {}
1308                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1309                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1310                    warn!(lag = lag, "signal subscriber lagged");
1311                    continue;
1312                }
1313            }
1314        }
1315    }));
1316
1317    let fill_bus = bus.clone();
1318    let fill_state = state_repo.clone();
1319    let fill_orchestrator = orchestrator.clone();
1320    let fill_persisted = persisted.clone();
1321    let fill_alerts = alerts.clone();
1322    let fill_recorder = recorder.clone();
1323    handles.push(tokio::spawn(async move {
1324        let orchestrator = fill_orchestrator.clone();
1325        let persisted = fill_persisted.clone();
1326        let recorder = fill_recorder;
1327        let mut stream = fill_bus.subscribe();
1328        loop {
1329            match stream.recv().await {
1330                Ok(Event::Fill(evt)) => {
1331                    if let Some(handle) = recorder.as_ref() {
1332                        handle.record_fill(evt.fill.clone());
1333                    }
1334                    if let Err(err) = process_fill_event(
1335                        evt.fill,
1336                        portfolio.clone(),
1337                        strategy.clone(),
1338                        strategy_ctx.clone(),
1339                        orchestrator.clone(),
1340                        metrics.clone(),
1341                        fill_alerts.clone(),
1342                        fill_state.clone(),
1343                        persisted.clone(),
1344                    )
1345                    .await
1346                    {
1347                        warn!(error = %err, "fill handler failed");
1348                    }
1349                }
1350                Ok(_) => {}
1351                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1352                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1353                    warn!(lag = lag, "fill subscriber lagged");
1354                    continue;
1355                }
1356            }
1357        }
1358    }));
1359
1360    let order_bus = bus.clone();
1361    let order_persisted = persisted.clone();
1362    let order_alerts = alerts.clone();
1363    let order_orchestrator = orchestrator.clone();
1364    // Note: We don't pass strategy to order update handler to avoid lock contention
1365    // on high-frequency updates. Strategy state is snapshotted on candles/fills.
1366    let order_recorder = recorder;
1367    handles.push(tokio::spawn(async move {
1368        let orchestrator = order_orchestrator.clone();
1369        let persisted = order_persisted.clone();
1370        let recorder = order_recorder;
1371        let mut stream = order_bus.subscribe();
1372        loop {
1373            match stream.recv().await {
1374                Ok(Event::OrderUpdate(evt)) => {
1375                    if let Some(handle) = recorder.as_ref() {
1376                        handle.record_order(evt.order.clone());
1377                    }
1378                    if let Err(err) = process_order_update_event(
1379                        evt.order,
1380                        orchestrator.clone(),
1381                        order_alerts.clone(),
1382                        state_repo.clone(),
1383                        persisted.clone(),
1384                    )
1385                    .await
1386                    {
1387                        warn!(error = %err, "order update handler failed");
1388                    }
1389                }
1390                Ok(_) => {}
1391                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1392                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1393                    warn!(lag = lag, "order subscriber lagged");
1394                    continue;
1395                }
1396            }
1397        }
1398    }));
1399
1400    handles
1401}
1402
1403#[allow(clippy::too_many_arguments)]
1404async fn process_tick_event(
1405    tick: Tick,
1406    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1407    strategy_ctx: Arc<Mutex<StrategyContext>>,
1408    metrics: Arc<LiveMetrics>,
1409    alerts: Arc<AlertManager>,
1410    market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1411    portfolio: Arc<Mutex<Portfolio>>,
1412    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1413    persisted: Arc<Mutex<LiveState>>,
1414    bus: Arc<EventBus>,
1415    last_data_timestamp: Arc<AtomicI64>,
1416) -> Result<()> {
1417    metrics.inc_tick();
1418    metrics.update_staleness(0.0);
1419    metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1420    last_data_timestamp.store(tick.exchange_timestamp.timestamp(), Ordering::SeqCst);
1421    alerts.heartbeat().await;
1422    {
1423        let mut guard = market.lock().await;
1424        if let Some(snapshot) = guard.get_mut(&tick.symbol) {
1425            snapshot.last_trade = Some(tick.price);
1426            snapshot.last_trade_ts = Some(tick.exchange_timestamp);
1427        }
1428    }
1429    let mut drawdown_triggered = false;
1430    let mut snapshot_on_trigger = None;
1431    {
1432        let mut guard = portfolio.lock().await;
1433        let was_liquidate_only = guard.liquidate_only();
1434        match guard.update_market_data(&tick.symbol, tick.price) {
1435            Ok(_) => {
1436                if !was_liquidate_only && guard.liquidate_only() {
1437                    drawdown_triggered = true;
1438                    snapshot_on_trigger = Some(guard.snapshot());
1439                }
1440            }
1441            Err(err) => {
1442                warn!(
1443                    symbol = %tick.symbol,
1444                    error = %err,
1445                    "failed to refresh market data"
1446                );
1447            }
1448        }
1449    }
1450    {
1451        let mut state = persisted.lock().await;
1452        state.last_prices.insert(tick.symbol.clone(), tick.price);
1453        if drawdown_triggered {
1454            if let Some(snapshot) = snapshot_on_trigger.take() {
1455                state.portfolio = Some(snapshot);
1456            }
1457        }
1458    }
1459    if drawdown_triggered {
1460        persist_state(
1461            state_repo.clone(),
1462            persisted.clone(),
1463            Some(strategy.clone()),
1464        )
1465        .await?;
1466        alert_liquidate_only(alerts.clone()).await;
1467    }
1468    {
1469        let mut ctx = strategy_ctx.lock().await;
1470        ctx.push_tick(tick.clone());
1471        let lock_start = Instant::now();
1472        let mut strat = strategy.lock().await;
1473        log_strategy_lock("tick", lock_start.elapsed());
1474        let call_start = Instant::now();
1475        strat
1476            .on_tick(&ctx, &tick)
1477            .await
1478            .context("strategy failure on tick event")?;
1479        log_strategy_call("tick", call_start.elapsed());
1480    }
1481    emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1482    debug!(symbol = %tick.symbol, price = %tick.price, "completed tick processing");
1483    Ok(())
1484}
1485
1486#[allow(clippy::too_many_arguments)]
1487async fn process_candle_event(
1488    candle: Candle,
1489    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1490    strategy_ctx: Arc<Mutex<StrategyContext>>,
1491    metrics: Arc<LiveMetrics>,
1492    alerts: Arc<AlertManager>,
1493    market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1494    portfolio: Arc<Mutex<Portfolio>>,
1495    orchestrator: Arc<OrderOrchestrator>,
1496    exec_backend: ExecutionBackend,
1497    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1498    persisted: Arc<Mutex<LiveState>>,
1499    bus: Arc<EventBus>,
1500    last_data_timestamp: Arc<AtomicI64>,
1501) -> Result<()> {
1502    metrics.inc_candle();
1503    metrics.update_staleness(0.0);
1504    metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1505    last_data_timestamp.store(candle.timestamp.timestamp(), Ordering::SeqCst);
1506    alerts.heartbeat().await;
1507    metrics.update_price(&candle.symbol, candle.close.to_f64().unwrap_or(0.0));
1508    {
1509        let mut guard = market.lock().await;
1510        if let Some(snapshot) = guard.get_mut(&candle.symbol) {
1511            snapshot.last_candle = Some(candle.clone());
1512            snapshot.last_trade = Some(candle.close);
1513        }
1514    }
1515    if exec_backend.is_paper() {
1516        let client = orchestrator.execution_engine().client();
1517        if let Some(paper) = client.as_any().downcast_ref::<PaperExecutionClient>() {
1518            paper.update_price(&candle.symbol, candle.close);
1519        }
1520    }
1521    let mut candle_drawdown_triggered = false;
1522    let mut candle_snapshot = None;
1523    {
1524        let mut guard = portfolio.lock().await;
1525        let was_liquidate_only = guard.liquidate_only();
1526        match guard.update_market_data(&candle.symbol, candle.close) {
1527            Ok(_) => {
1528                if !was_liquidate_only && guard.liquidate_only() {
1529                    candle_drawdown_triggered = true;
1530                    candle_snapshot = Some(guard.snapshot());
1531                }
1532            }
1533            Err(err) => {
1534                warn!(
1535                    symbol = %candle.symbol,
1536                    error = %err,
1537                    "failed to refresh market data"
1538                );
1539            }
1540        }
1541    }
1542    if candle_drawdown_triggered {
1543        if let Some(snapshot) = candle_snapshot.take() {
1544            let mut persisted_guard = persisted.lock().await;
1545            persisted_guard.portfolio = Some(snapshot);
1546        }
1547        alert_liquidate_only(alerts.clone()).await;
1548    }
1549    {
1550        let mut ctx = strategy_ctx.lock().await;
1551        ctx.push_candle(candle.clone());
1552        let lock_start = Instant::now();
1553        let mut strat = strategy.lock().await;
1554        log_strategy_lock("candle", lock_start.elapsed());
1555        let call_start = Instant::now();
1556        strat
1557            .on_candle(&ctx, &candle)
1558            .await
1559            .context("strategy failure on candle event")?;
1560        log_strategy_call("candle", call_start.elapsed());
1561    }
1562    {
1563        let mut snapshot = persisted.lock().await;
1564        snapshot.last_candle_ts = Some(candle.timestamp);
1565        snapshot
1566            .last_prices
1567            .insert(candle.symbol.clone(), candle.close);
1568    }
1569    persist_state(
1570        state_repo.clone(),
1571        persisted.clone(),
1572        Some(strategy.clone()),
1573    )
1574    .await?;
1575    let ctx = shared_risk_context(&candle.symbol, &portfolio, &market, &persisted).await;
1576    orchestrator.update_risk_context(candle.symbol.clone(), ctx);
1577    emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1578    debug!(symbol = %candle.symbol, close = %candle.close, "completed candle processing");
1579    Ok(())
1580}
1581
1582#[allow(clippy::too_many_arguments)]
1583async fn process_order_book_event(
1584    mut book: OrderBook,
1585    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1586    strategy_ctx: Arc<Mutex<StrategyContext>>,
1587    metrics: Arc<LiveMetrics>,
1588    alerts: Arc<AlertManager>,
1589    _market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1590    bus: Arc<EventBus>,
1591    last_data_timestamp: Arc<AtomicI64>,
1592    driver: Arc<String>,
1593) -> Result<()> {
1594    metrics.update_staleness(0.0);
1595    alerts.heartbeat().await;
1596    last_data_timestamp.store(book.timestamp.timestamp(), Ordering::SeqCst);
1597    let driver_name = driver.as_str();
1598    let local_checksum = if let Some(cs) = book.local_checksum {
1599        cs
1600    } else {
1601        let computed = book.computed_checksum(None);
1602        book.local_checksum = Some(computed);
1603        computed
1604    };
1605    if let Some(expected) = book.exchange_checksum {
1606        if expected != local_checksum {
1607            metrics.inc_checksum_mismatch(driver_name, &book.symbol);
1608            alerts
1609                .order_book_checksum_mismatch(driver_name, &book.symbol, expected, local_checksum)
1610                .await;
1611        }
1612    }
1613    {
1614        let mut ctx = strategy_ctx.lock().await;
1615        ctx.push_order_book(book.clone());
1616        let lock_start = Instant::now();
1617        let mut strat = strategy.lock().await;
1618        log_strategy_lock("order_book", lock_start.elapsed());
1619        let call_start = Instant::now();
1620        strat
1621            .on_order_book(&ctx, &book)
1622            .await
1623            .context("strategy failure on order book")?;
1624        log_strategy_call("order_book", call_start.elapsed());
1625    }
1626    emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1627    Ok(())
1628}
1629
1630async fn process_signal_event(
1631    signal: Signal,
1632    orchestrator: Arc<OrderOrchestrator>,
1633    portfolio: Arc<Mutex<Portfolio>>,
1634    market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1635    persisted: Arc<Mutex<LiveState>>,
1636    alerts: Arc<AlertManager>,
1637    metrics: Arc<LiveMetrics>,
1638) -> Result<()> {
1639    let ctx = shared_risk_context(&signal.symbol, &portfolio, &market, &persisted).await;
1640    orchestrator.update_risk_context(signal.symbol.clone(), ctx);
1641    match orchestrator.on_signal(&signal, &ctx).await {
1642        Ok(()) => {
1643            alerts.reset_order_failures().await;
1644        }
1645        Err(err) => {
1646            metrics.inc_order_failure();
1647            alerts
1648                .order_failure(&format!("orchestrator error: {err}"))
1649                .await;
1650        }
1651    }
1652    Ok(())
1653}
1654
1655fn log_strategy_lock(event: &str, wait: Duration) {
1656    let wait_ms = wait.as_secs_f64() * 1000.0;
1657    if wait >= STRATEGY_LOCK_WARN_THRESHOLD {
1658        warn!(target: "strategy", event, wait_ms, "strategy lock wait exceeded threshold");
1659    } else {
1660        trace!(target: "strategy", event, wait_ms, "strategy lock acquired");
1661    }
1662}
1663
1664fn log_strategy_call(event: &str, elapsed: Duration) {
1665    let duration_ms = elapsed.as_secs_f64() * 1000.0;
1666    if elapsed >= STRATEGY_CALL_WARN_THRESHOLD {
1667        warn!(target: "strategy", event, duration_ms, "strategy call latency above threshold");
1668    } else {
1669        trace!(target: "strategy", event, duration_ms, "strategy call completed");
1670    }
1671}
1672
1673#[allow(clippy::too_many_arguments)]
1674async fn process_fill_event(
1675    fill: Fill,
1676    portfolio: Arc<Mutex<Portfolio>>,
1677    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1678    strategy_ctx: Arc<Mutex<StrategyContext>>,
1679    orchestrator: Arc<OrderOrchestrator>,
1680    metrics: Arc<LiveMetrics>,
1681    alerts: Arc<AlertManager>,
1682    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1683    persisted: Arc<Mutex<LiveState>>,
1684) -> Result<()> {
1685    let mut drawdown_triggered = false;
1686    {
1687        let mut guard = portfolio.lock().await;
1688        let was_liquidate_only = guard.liquidate_only();
1689        guard
1690            .apply_fill(&fill)
1691            .context("Failed to apply fill to portfolio")?;
1692        if !was_liquidate_only && guard.liquidate_only() {
1693            drawdown_triggered = true;
1694        }
1695        let snapshot = guard.snapshot();
1696        let mut persisted_guard = persisted.lock().await;
1697        persisted_guard.portfolio = Some(snapshot);
1698    }
1699    {
1700        let positions = {
1701            let guard = portfolio.lock().await;
1702            guard.positions()
1703        };
1704        let mut ctx = strategy_ctx.lock().await;
1705        ctx.update_positions(positions);
1706    }
1707    orchestrator.on_fill(&fill).await.ok();
1708    {
1709        let ctx = strategy_ctx.lock().await;
1710        let lock_start = Instant::now();
1711        let mut strat = strategy.lock().await;
1712        log_strategy_lock("fill", lock_start.elapsed());
1713        let call_start = Instant::now();
1714        strat
1715            .on_fill(&ctx, &fill)
1716            .await
1717            .context("Strategy failed on fill event")?;
1718        log_strategy_call("fill", call_start.elapsed());
1719    }
1720    let equity = {
1721        let guard = portfolio.lock().await;
1722        guard.equity()
1723    };
1724    if let Some(value) = equity.to_f64() {
1725        metrics.update_equity(value);
1726    }
1727    alerts.update_equity(equity).await;
1728    metrics.inc_order();
1729    alerts
1730        .notify(
1731            "Order Filled",
1732            &format!(
1733                "order filled: {}@{} ({})",
1734                fill.fill_quantity,
1735                fill.fill_price,
1736                match fill.side {
1737                    Side::Buy => "buy",
1738                    Side::Sell => "sell",
1739                }
1740            ),
1741        )
1742        .await;
1743    if drawdown_triggered {
1744        alert_liquidate_only(alerts.clone()).await;
1745    }
1746    persist_state(
1747        state_repo.clone(),
1748        persisted.clone(),
1749        Some(strategy.clone()),
1750    )
1751    .await?;
1752    Ok(())
1753}
1754
1755async fn process_order_update_event(
1756    order: Order,
1757    orchestrator: Arc<OrderOrchestrator>,
1758    alerts: Arc<AlertManager>,
1759    state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1760    persisted: Arc<Mutex<LiveState>>,
1761) -> Result<()> {
1762    orchestrator.on_order_update(&order);
1763    if matches!(order.status, OrderStatus::Rejected) {
1764        error!(
1765            order_id = %order.id,
1766            symbol = %order.request.symbol,
1767            "order rejected by exchange"
1768        );
1769        alerts.order_failure("order rejected by exchange").await;
1770        alerts
1771            .notify(
1772                "Order rejected",
1773                &format!(
1774                    "Order {} for {} was rejected",
1775                    order.id, order.request.symbol
1776                ),
1777            )
1778            .await;
1779    }
1780    {
1781        let mut snapshot = persisted.lock().await;
1782        let mut found = false;
1783        for existing in &mut snapshot.open_orders {
1784            if existing.id == order.id {
1785                *existing = order.clone();
1786                found = true;
1787                break;
1788            }
1789        }
1790        if !found {
1791            snapshot.open_orders.push(order.clone());
1792        }
1793        if matches!(
1794            order.status,
1795            OrderStatus::Filled | OrderStatus::Canceled | OrderStatus::Rejected
1796        ) {
1797            snapshot.open_orders.retain(|o| o.id != order.id);
1798        }
1799    }
1800    persist_state(state_repo, persisted, None).await?;
1801    Ok(())
1802}
1803
1804async fn emit_signals(
1805    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1806    bus: Arc<EventBus>,
1807    metrics: Arc<LiveMetrics>,
1808) {
1809    let signals = {
1810        let mut strat = strategy.lock().await;
1811        let drained = strat.drain_signals();
1812        debug!(count = drained.len(), "strategy drained signals");
1813        drained
1814    };
1815    if signals.is_empty() {
1816        return;
1817    }
1818    metrics.inc_signals(signals.len());
1819    for signal in signals {
1820        debug!(id = %signal.id, symbol = %signal.symbol, kind = ?signal.kind, "publishing signal event");
1821        bus.publish(Event::Signal(SignalEvent { signal }));
1822    }
1823}
1824
1825async fn persist_state(
1826    repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1827    persisted: Arc<Mutex<LiveState>>,
1828    strategy: Option<Arc<Mutex<Box<dyn Strategy>>>>,
1829) -> Result<()> {
1830    if let Some(strat_lock) = strategy {
1831        // Snapshot strategy state before cloning the full state for persistence
1832        let strat = strat_lock.lock().await;
1833        if let Ok(json_state) = strat.snapshot() {
1834            let mut guard = persisted.lock().await;
1835            guard.strategy_state = Some(json_state);
1836        } else {
1837            warn!("failed to snapshot strategy state");
1838        }
1839    }
1840
1841    let snapshot = {
1842        let guard = persisted.lock().await;
1843        guard.clone()
1844    };
1845    tokio::task::spawn_blocking(move || repo.save(&snapshot))
1846        .await
1847        .map_err(|err| anyhow!("state persistence task failed: {err}"))?
1848        .map_err(|err| anyhow!(err.to_string()))
1849}
1850
1851async fn shared_risk_context(
1852    symbol: &str,
1853    portfolio: &Arc<Mutex<Portfolio>>,
1854    market: &Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1855    persisted: &Arc<Mutex<LiveState>>,
1856) -> RiskContext {
1857    let (signed_qty, equity, liquidate_only) = {
1858        let guard = portfolio.lock().await;
1859        (
1860            guard.signed_position_qty(symbol),
1861            guard.equity(),
1862            guard.liquidate_only(),
1863        )
1864    };
1865    let observed_price = {
1866        let guard = market.lock().await;
1867        guard.get(symbol).and_then(|snapshot| snapshot.price())
1868    };
1869    let last_price = if let Some(price) = observed_price {
1870        price
1871    } else {
1872        let guard = persisted.lock().await;
1873        guard
1874            .last_prices
1875            .get(symbol)
1876            .copied()
1877            .unwrap_or(Decimal::ZERO)
1878    };
1879    RiskContext {
1880        signed_position_qty: signed_qty,
1881        portfolio_equity: equity,
1882        last_price,
1883        liquidate_only,
1884    }
1885}
1886
1887async fn alert_liquidate_only(alerts: Arc<AlertManager>) {
1888    alerts
1889        .notify(
1890            "Max drawdown triggered",
1891            "Portfolio entered liquidate-only mode; new exposure blocked until review",
1892        )
1893        .await;
1894}
1895
1896fn spawn_connection_monitor(
1897    shutdown: ShutdownSignal,
1898    flag: Arc<AtomicBool>,
1899    metrics: Arc<LiveMetrics>,
1900    stream: &'static str,
1901) -> JoinHandle<()> {
1902    tokio::spawn(async move {
1903        loop {
1904            metrics.update_connection_status(stream, flag.load(Ordering::SeqCst));
1905            if !shutdown.sleep(Duration::from_secs(5)).await {
1906                break;
1907            }
1908        }
1909    })
1910}
1911
1912fn spawn_order_timeout_monitor(
1913    orchestrator: Arc<OrderOrchestrator>,
1914    bus: Arc<EventBus>,
1915    alerts: Arc<AlertManager>,
1916    shutdown: ShutdownSignal,
1917) -> JoinHandle<()> {
1918    tokio::spawn(async move {
1919        let mut ticker = tokio::time::interval(tesser_execution::orchestrator::ORDER_POLL_INTERVAL);
1920        loop {
1921            ticker.tick().await;
1922            if shutdown.triggered() {
1923                break;
1924            }
1925            match orchestrator.poll_stale_orders().await {
1926                Ok(updates) => {
1927                    for order in updates {
1928                        if matches!(order.status, OrderStatus::Rejected | OrderStatus::Canceled) {
1929                            let message = format!(
1930                                "Order {} for {} timed out after {}s",
1931                                order.id,
1932                                order.request.symbol,
1933                                tesser_execution::orchestrator::ORDER_TIMEOUT.as_secs()
1934                            );
1935                            error!(%message);
1936                            alerts.order_failure(&message).await;
1937                            alerts.notify("Order timeout", &message).await;
1938                        }
1939                        bus.publish(Event::OrderUpdate(OrderUpdateEvent { order }));
1940                    }
1941                }
1942                Err(err) => {
1943                    warn!(error = %err, "order timeout monitor failed");
1944                }
1945            }
1946        }
1947    })
1948}
1949
1950async fn load_market_registry(
1951    client: Arc<dyn ExecutionClient>,
1952    settings: &LiveSessionSettings,
1953) -> Result<Arc<MarketRegistry>> {
1954    if let Some(path) = &settings.markets_file {
1955        let registry = MarketRegistry::load_from_file(path)
1956            .with_context(|| format!("failed to load markets from {}", path.display()))?;
1957        return Ok(Arc::new(registry));
1958    }
1959
1960    if settings.exec_backend.is_paper() {
1961        return Err(anyhow!(
1962            "paper execution requires --markets-file when exchange metadata is unavailable"
1963        ));
1964    }
1965
1966    let instruments = client
1967        .list_instruments(settings.category.as_path())
1968        .await
1969        .context("failed to fetch instruments from execution client")?;
1970    let registry =
1971        MarketRegistry::from_instruments(instruments).map_err(|err| anyhow!(err.to_string()))?;
1972    Ok(Arc::new(registry))
1973}
1974
1975#[cfg(feature = "bybit")]
1976#[allow(clippy::too_many_arguments)]
1977fn spawn_bybit_private_stream(
1978    creds: BybitCredentials,
1979    ws_url: String,
1980    private_tx: mpsc::Sender<BrokerEvent>,
1981    exec_client: Arc<dyn ExecutionClient>,
1982    symbols: Vec<String>,
1983    last_sync: Arc<tokio::sync::Mutex<Option<DateTime<Utc>>>>,
1984    private_connection_flag: Option<Arc<AtomicBool>>,
1985    metrics: Arc<LiveMetrics>,
1986    shutdown: ShutdownSignal,
1987) {
1988    tokio::spawn(async move {
1989        loop {
1990            match tesser_bybit::ws::connect_private(
1991                &ws_url,
1992                &creds,
1993                private_connection_flag.clone(),
1994            )
1995            .await
1996            {
1997                Ok(mut socket) => {
1998                    if let Some(flag) = &private_connection_flag {
1999                        flag.store(true, Ordering::SeqCst);
2000                    }
2001                    metrics.update_connection_status("private", true);
2002                    info!("Connected to Bybit private WebSocket stream");
2003                    for symbol in &symbols {
2004                        match exec_client.list_open_orders(symbol).await {
2005                            Ok(orders) => {
2006                                for order in orders {
2007                                    if let Err(err) =
2008                                        private_tx.send(BrokerEvent::OrderUpdate(order)).await
2009                                    {
2010                                        error!("failed to send reconciled order update: {err}");
2011                                    }
2012                                }
2013                            }
2014                            Err(e) => {
2015                                error!("failed to reconcile open orders for {symbol}: {e}");
2016                            }
2017                        }
2018                    }
2019                    if let Some(bybit) = exec_client.as_any().downcast_ref::<BybitClient>() {
2020                        let since = {
2021                            let guard = last_sync.lock().await;
2022                            guard.unwrap_or_else(|| Utc::now() - chrono::Duration::minutes(30))
2023                        };
2024                        match bybit.list_executions_since(since).await {
2025                            Ok(fills) => {
2026                                for fill in fills {
2027                                    if let Err(err) = private_tx.send(BrokerEvent::Fill(fill)).await
2028                                    {
2029                                        error!("failed to send reconciled fill: {err}");
2030                                    }
2031                                }
2032                            }
2033                            Err(e) => {
2034                                error!("failed to reconcile executions since {:?}: {}", since, e);
2035                            }
2036                        }
2037                        let mut guard = last_sync.lock().await;
2038                        *guard = Some(Utc::now());
2039                    }
2040
2041                    while let Some(msg) = socket.next().await {
2042                        if shutdown.triggered() {
2043                            break;
2044                        }
2045                        if let Ok(Message::Text(text)) = msg {
2046                            if let Ok(value) = serde_json::from_str::<serde_json::Value>(&text) {
2047                                if let Some(topic) = value.get("topic").and_then(|v| v.as_str()) {
2048                                    match topic {
2049                                        "order" => {
2050                                            if let Ok(msg) = serde_json::from_value::<
2051                                                PrivateMessage<BybitWsOrder>,
2052                                            >(
2053                                                value.clone()
2054                                            ) {
2055                                                for update in msg.data {
2056                                                    if let Ok(order) = update.to_tesser_order(None)
2057                                                    {
2058                                                        if let Err(err) = private_tx
2059                                                            .send(BrokerEvent::OrderUpdate(order))
2060                                                            .await
2061                                                        {
2062                                                            error!(
2063                                                                "failed to send private order update: {err}"
2064                                                            );
2065                                                        }
2066                                                    }
2067                                                }
2068                                            }
2069                                        }
2070                                        "execution" => {
2071                                            if let Ok(msg) = serde_json::from_value::<
2072                                                PrivateMessage<BybitWsExecution>,
2073                                            >(
2074                                                value.clone()
2075                                            ) {
2076                                                for exec in msg.data {
2077                                                    if let Ok(fill) = exec.to_tesser_fill() {
2078                                                        if let Err(err) = private_tx
2079                                                            .send(BrokerEvent::Fill(fill))
2080                                                            .await
2081                                                        {
2082                                                            error!(
2083                                                                "failed to send private fill event: {err}"
2084                                                            );
2085                                                        }
2086                                                    }
2087                                                }
2088                                            }
2089                                        }
2090                                        _ => {}
2091                                    }
2092                                }
2093                            }
2094                        }
2095                    }
2096                }
2097                Err(e) => {
2098                    if let Some(flag) = &private_connection_flag {
2099                        flag.store(false, Ordering::SeqCst);
2100                    }
2101                    metrics.update_connection_status("private", false);
2102                    error!("Bybit private WebSocket connection failed: {e}. Retrying...");
2103                    tokio::time::sleep(Duration::from_secs(5)).await;
2104                }
2105            }
2106            if shutdown.triggered() {
2107                break;
2108            }
2109        }
2110    });
2111}
2112
2113#[cfg(feature = "binance")]
2114#[allow(clippy::too_many_arguments)]
2115fn spawn_binance_private_stream(
2116    exec_client: Arc<dyn ExecutionClient>,
2117    ws_url: String,
2118    private_tx: mpsc::Sender<BrokerEvent>,
2119    private_connection_flag: Option<Arc<AtomicBool>>,
2120    metrics: Arc<LiveMetrics>,
2121    shutdown: ShutdownSignal,
2122) {
2123    tokio::spawn(async move {
2124        loop {
2125            let Some(binance) = exec_client
2126                .as_ref()
2127                .as_any()
2128                .downcast_ref::<BinanceClient>()
2129            else {
2130                warn!("execution client is not Binance");
2131                return;
2132            };
2133            let listen_key = match binance.start_user_stream().await {
2134                Ok(key) => key,
2135                Err(err) => {
2136                    error!("failed to start Binance user stream: {err}");
2137                    tokio::time::sleep(Duration::from_secs(5)).await;
2138                    continue;
2139                }
2140            };
2141            match BinanceUserDataStream::connect(&ws_url, &listen_key).await {
2142                Ok(user_stream) => {
2143                    if let Some(flag) = &private_connection_flag {
2144                        flag.store(true, Ordering::SeqCst);
2145                    }
2146                    metrics.update_connection_status("private", true);
2147                    let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
2148                    let tx_orders = private_tx.clone();
2149                    user_stream.on_event(move |event| {
2150                        if let Some(update) = extract_order_update(&event) {
2151                            if let Some(order) = order_from_update(update) {
2152                                let _ = tx_orders.blocking_send(BrokerEvent::OrderUpdate(order));
2153                            }
2154                            if let Some(fill) = fill_from_update(update) {
2155                                let _ = tx_orders.blocking_send(BrokerEvent::Fill(fill));
2156                            }
2157                        }
2158                        if matches!(event, UserDataStreamEventsResponse::ListenKeyExpired(_)) {
2159                            let _ = reconnect_tx.try_send(());
2160                        }
2161                    });
2162                    let keepalive_client = exec_client.clone();
2163                    let keepalive_handle = tokio::spawn(async move {
2164                        let mut interval = tokio::time::interval(Duration::from_secs(30 * 60));
2165                        loop {
2166                            interval.tick().await;
2167                            let Some(client) = keepalive_client
2168                                .as_ref()
2169                                .as_any()
2170                                .downcast_ref::<BinanceClient>()
2171                            else {
2172                                break;
2173                            };
2174                            if client.keepalive_user_stream().await.is_err() {
2175                                break;
2176                            }
2177                        }
2178                    });
2179                    tokio::select! {
2180                        _ = reconnect_rx.recv() => {
2181                            warn!("binance listen key expired; reconnecting");
2182                        }
2183                        _ = shutdown.wait() => {
2184                            keepalive_handle.abort();
2185                            let _ = user_stream.unsubscribe().await;
2186                            return;
2187                        }
2188                    }
2189                    keepalive_handle.abort();
2190                    let _ = user_stream.unsubscribe().await;
2191                }
2192                Err(err) => {
2193                    error!("failed to connect to Binance user stream: {err}");
2194                }
2195            }
2196            if let Some(flag) = &private_connection_flag {
2197                flag.store(false, Ordering::SeqCst);
2198            }
2199            metrics.update_connection_status("private", false);
2200            if shutdown.triggered() {
2201                break;
2202            }
2203            tokio::time::sleep(Duration::from_secs(5)).await;
2204        }
2205    });
2206}