tesser_cli/
live.rs

1use std::collections::{HashMap, HashSet};
2use std::net::SocketAddr;
3use std::path::PathBuf;
4use std::sync::{
5    atomic::{AtomicBool, AtomicI64, Ordering},
6    Arc, Once,
7};
8use std::time::{Duration, Instant};
9
10use anyhow::{anyhow, bail, Context, Result};
11use chrono::{DateTime, Utc};
12use clap::ValueEnum;
13use futures::StreamExt;
14use rust_decimal::{prelude::ToPrimitive, Decimal};
15use tokio::sync::{mpsc, Mutex, Notify};
16use tokio::task::JoinHandle;
17use tokio_tungstenite::tungstenite::Message;
18use tracing::{error, info, trace, warn};
19
20use serde_json::{json, Value};
21
22fn ensure_builtin_connectors_registered() {
23    static INIT: Once = Once::new();
24    INIT.call_once(|| {
25        register_connector_factory(Arc::new(PaperFactory::default()));
26        #[cfg(feature = "bybit")]
27        register_bybit_factory();
28        #[cfg(feature = "binance")]
29        register_binance_factory();
30    });
31}
32
33#[cfg(feature = "binance")]
34use tesser_binance::{
35    fill_from_update, order_from_update, register_factory as register_binance_factory,
36    ws::{extract_order_update, BinanceUserDataStream, UserDataStreamEventsResponse},
37    BinanceClient,
38};
39use tesser_broker::{
40    get_connector_factory, register_connector_factory, BrokerResult, ConnectorFactory,
41    ConnectorStream, ConnectorStreamConfig, ExecutionClient,
42};
43#[cfg(feature = "bybit")]
44use tesser_bybit::ws::{BybitWsExecution, BybitWsOrder, PrivateMessage};
45#[cfg(feature = "bybit")]
46use tesser_bybit::{register_factory as register_bybit_factory, BybitClient, BybitCredentials};
47use tesser_config::{AlertingConfig, ExchangeConfig, RiskManagementConfig};
48use tesser_core::{
49    Candle, Fill, Interval, Order, OrderBook, OrderStatus, Position, Price, Quantity, Side, Signal,
50    Symbol, Tick,
51};
52use tesser_data::recorder::{ParquetRecorder, RecorderConfig, RecorderHandle};
53use tesser_events::{
54    CandleEvent, Event, EventBus, FillEvent, OrderBookEvent, OrderUpdateEvent, SignalEvent,
55    TickEvent,
56};
57use tesser_execution::{
58    BasicRiskChecker, ExecutionEngine, FixedOrderSizer, OrderOrchestrator, PreTradeRiskChecker,
59    RiskContext, RiskLimits, SqliteAlgoStateRepository,
60};
61use tesser_markets::MarketRegistry;
62use tesser_paper::{PaperExecutionClient, PaperFactory};
63use tesser_portfolio::{
64    LiveState, Portfolio, PortfolioConfig, SqliteStateRepository, StateRepository,
65};
66use tesser_strategy::{Strategy, StrategyContext};
67
68use crate::alerts::{AlertDispatcher, AlertManager};
69use crate::control;
70use crate::telemetry::{spawn_metrics_server, LiveMetrics};
71use crate::PublicChannel;
72
73/// Unified event type for asynchronous updates from the broker.
74#[derive(Debug)]
75pub enum BrokerEvent {
76    OrderUpdate(Order),
77    Fill(Fill),
78}
79
80#[derive(Clone, Copy, Debug, Eq, PartialEq, ValueEnum)]
81#[value(rename_all = "kebab-case")]
82pub enum ExecutionBackend {
83    Paper,
84    Live,
85}
86
87impl ExecutionBackend {
88    fn is_paper(self) -> bool {
89        matches!(self, Self::Paper)
90    }
91}
92
93const DEFAULT_ORDER_BOOK_DEPTH: usize = 50;
94
95pub const fn default_order_book_depth() -> usize {
96    DEFAULT_ORDER_BOOK_DEPTH
97}
98const STRATEGY_LOCK_WARN_THRESHOLD: Duration = Duration::from_millis(25);
99const STRATEGY_CALL_WARN_THRESHOLD: Duration = Duration::from_millis(250);
100
101#[async_trait::async_trait]
102trait LiveMarketStream: Send {
103    async fn next_tick(&mut self) -> BrokerResult<Option<Tick>>;
104    async fn next_candle(&mut self) -> BrokerResult<Option<Candle>>;
105    async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>>;
106}
107
108struct FactoryStreamAdapter {
109    inner: Box<dyn ConnectorStream>,
110}
111
112impl FactoryStreamAdapter {
113    fn new(inner: Box<dyn ConnectorStream>) -> Self {
114        Self { inner }
115    }
116}
117
118#[async_trait::async_trait]
119impl LiveMarketStream for FactoryStreamAdapter {
120    async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
121        self.inner.next_tick().await
122    }
123
124    async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
125        self.inner.next_candle().await
126    }
127
128    async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>> {
129        self.inner.next_order_book().await
130    }
131}
132
133pub struct LiveSessionSettings {
134    pub category: PublicChannel,
135    pub interval: Interval,
136    pub quantity: Quantity,
137    pub slippage_bps: Decimal,
138    pub fee_bps: Decimal,
139    pub history: usize,
140    pub metrics_addr: SocketAddr,
141    pub state_path: PathBuf,
142    pub initial_balances: HashMap<Symbol, Decimal>,
143    pub reporting_currency: Symbol,
144    pub markets_file: Option<PathBuf>,
145    pub alerting: AlertingConfig,
146    pub exec_backend: ExecutionBackend,
147    pub risk: RiskManagementConfig,
148    pub reconciliation_interval: Duration,
149    pub reconciliation_threshold: Decimal,
150    pub driver: String,
151    pub orderbook_depth: usize,
152    pub record_path: Option<PathBuf>,
153    pub control_addr: SocketAddr,
154}
155
156impl LiveSessionSettings {
157    fn risk_limits(&self) -> RiskLimits {
158        RiskLimits {
159            max_order_quantity: self.risk.max_order_quantity.max(Decimal::ZERO),
160            max_position_quantity: self.risk.max_position_quantity.max(Decimal::ZERO),
161        }
162    }
163}
164
165pub async fn run_live(
166    strategy: Box<dyn Strategy>,
167    symbols: Vec<String>,
168    exchange: ExchangeConfig,
169    settings: LiveSessionSettings,
170) -> Result<()> {
171    run_live_with_shutdown(strategy, symbols, exchange, settings, ShutdownSignal::new()).await
172}
173
174/// Variant of [`run_live`] that accepts a manually controlled shutdown signal.
175pub async fn run_live_with_shutdown(
176    strategy: Box<dyn Strategy>,
177    symbols: Vec<String>,
178    exchange: ExchangeConfig,
179    settings: LiveSessionSettings,
180    shutdown: ShutdownSignal,
181) -> Result<()> {
182    if symbols.is_empty() {
183        return Err(anyhow!("strategy did not declare any subscriptions"));
184    }
185    if settings.quantity <= Decimal::ZERO {
186        return Err(anyhow!("--quantity must be positive"));
187    }
188
189    let public_connection = Arc::new(AtomicBool::new(false));
190    let private_connection = if matches!(settings.exec_backend, ExecutionBackend::Live) {
191        Some(Arc::new(AtomicBool::new(false)))
192    } else {
193        None
194    };
195    ensure_builtin_connectors_registered();
196    let connector_payload = build_exchange_payload(&exchange, &settings);
197    let connector_factory = get_connector_factory(&settings.driver)
198        .ok_or_else(|| anyhow!("driver {} is not registered", settings.driver))?;
199    let stream_config = ConnectorStreamConfig {
200        ws_url: Some(exchange.ws_url.clone()),
201        metadata: json!({
202            "category": settings.category.as_path(),
203            "symbols": symbols.clone(),
204            "orderbook_depth": settings.orderbook_depth,
205        }),
206        connection_status: Some(public_connection.clone()),
207    };
208    let mut connector_stream = connector_factory
209        .create_market_stream(&connector_payload, stream_config)
210        .await
211        .map_err(|err| anyhow!("failed to create market stream: {err}"))?;
212    connector_stream
213        .subscribe(&symbols, settings.interval)
214        .await
215        .map_err(|err| anyhow!("failed to subscribe via connector: {err}"))?;
216    let market_stream: Box<dyn LiveMarketStream> =
217        Box::new(FactoryStreamAdapter::new(connector_stream));
218
219    let execution_client =
220        build_execution_client(&settings, connector_factory.clone(), &connector_payload).await?;
221    let market_registry = load_market_registry(execution_client.clone(), &settings).await?;
222    if matches!(settings.exec_backend, ExecutionBackend::Live) {
223        info!(
224            rest = %exchange.rest_url,
225            driver = ?settings.driver,
226            "live execution enabled via {:?} REST",
227            settings.driver
228        );
229    }
230    let risk_checker: Arc<dyn PreTradeRiskChecker> =
231        Arc::new(BasicRiskChecker::new(settings.risk_limits()));
232    let execution = ExecutionEngine::new(
233        execution_client.clone(),
234        Box::new(FixedOrderSizer {
235            quantity: settings.quantity,
236        }),
237        risk_checker,
238    );
239
240    // Create algorithm state repository
241    let algo_repo_path = settings.state_path.with_extension("algos.db");
242    let algo_state_repo = Arc::new(SqliteAlgoStateRepository::new(&algo_repo_path)?);
243
244    // Create orchestrator with execution engine
245    let orchestrator = OrderOrchestrator::new(Arc::new(execution), algo_state_repo).await?;
246
247    let runtime = LiveRuntime::new(
248        market_stream,
249        strategy,
250        symbols,
251        orchestrator,
252        settings,
253        exchange.ws_url.clone(),
254        market_registry,
255        shutdown,
256        public_connection,
257        private_connection,
258    )
259    .await?;
260    runtime.run().await
261}
262
263async fn build_execution_client(
264    settings: &LiveSessionSettings,
265    connector_factory: Arc<dyn ConnectorFactory>,
266    connector_payload: &Value,
267) -> Result<Arc<dyn ExecutionClient>> {
268    match settings.exec_backend {
269        ExecutionBackend::Paper => {
270            if settings.driver == "paper" {
271                return connector_factory
272                    .create_execution_client(connector_payload)
273                    .await
274                    .map_err(|err| anyhow!("failed to create execution client: {err}"));
275            }
276            Ok(Arc::new(PaperExecutionClient::new(
277                "paper".to_string(),
278                vec!["BTCUSDT".to_string()],
279                settings.slippage_bps,
280                settings.fee_bps,
281            )))
282        }
283        ExecutionBackend::Live => connector_factory
284            .create_execution_client(connector_payload)
285            .await
286            .map_err(|err| anyhow!("failed to create execution client: {err}")),
287    }
288}
289
290struct LiveRuntime {
291    market: Box<dyn LiveMarketStream>,
292    orchestrator: Arc<OrderOrchestrator>,
293    state_repo: Arc<dyn StateRepository>,
294    persisted: Arc<Mutex<LiveState>>,
295    event_bus: Arc<EventBus>,
296    recorder: Option<ParquetRecorder>,
297    control_task: Option<JoinHandle<()>>,
298    shutdown: ShutdownSignal,
299    metrics_task: JoinHandle<()>,
300    alert_task: Option<JoinHandle<()>>,
301    reconciliation_task: Option<JoinHandle<()>>,
302    reconciliation_ctx: Option<Arc<ReconciliationContext>>,
303    private_event_rx: mpsc::Receiver<BrokerEvent>,
304    #[allow(dead_code)]
305    last_private_sync: Arc<tokio::sync::Mutex<Option<DateTime<Utc>>>>,
306    subscriber_handles: Vec<JoinHandle<()>>,
307    connection_monitors: Vec<JoinHandle<()>>,
308    order_timeout_task: JoinHandle<()>,
309    strategy: Arc<Mutex<Box<dyn Strategy>>>,
310    _public_connection: Arc<AtomicBool>,
311    _private_connection: Option<Arc<AtomicBool>>,
312}
313
314impl LiveRuntime {
315    #[allow(clippy::too_many_arguments)]
316    async fn new(
317        market: Box<dyn LiveMarketStream>,
318        mut strategy: Box<dyn Strategy>,
319        symbols: Vec<String>,
320        orchestrator: OrderOrchestrator,
321        settings: LiveSessionSettings,
322        #[cfg_attr(not(feature = "binance"), allow(unused_variables))] exchange_ws_url: String,
323        market_registry: Arc<MarketRegistry>,
324        shutdown: ShutdownSignal,
325        public_connection: Arc<AtomicBool>,
326        private_connection: Option<Arc<AtomicBool>>,
327    ) -> Result<Self> {
328        let mut strategy_ctx = StrategyContext::new(settings.history);
329        let state_repo: Arc<dyn StateRepository> =
330            Arc::new(SqliteStateRepository::new(settings.state_path.clone()));
331        let mut persisted = match tokio::task::spawn_blocking({
332            let repo = state_repo.clone();
333            move || repo.load()
334        })
335        .await
336        {
337            Ok(Ok(state)) => state,
338            Ok(Err(err)) => {
339                warn!(error = %err, "failed to load live state; starting from defaults");
340                LiveState::default()
341            }
342            Err(err) => {
343                warn!(error = %err, "state load task failed; starting from defaults");
344                LiveState::default()
345            }
346        };
347        let mut live_bootstrap = None;
348        if matches!(settings.exec_backend, ExecutionBackend::Live) {
349            info!("Synchronizing portfolio state from exchange");
350            let execution_client = orchestrator.execution_engine().client();
351            let positions = execution_client
352                .positions()
353                .await
354                .context("failed to fetch remote positions")?;
355            let balances = execution_client
356                .account_balances()
357                .await
358                .context("failed to fetch remote account balances")?;
359            let mut open_orders = Vec::new();
360            for symbol in &symbols {
361                let mut symbol_orders = execution_client
362                    .list_open_orders(symbol)
363                    .await
364                    .with_context(|| format!("failed to fetch open orders for {symbol}"))?;
365                open_orders.append(&mut symbol_orders);
366            }
367            persisted.open_orders = open_orders;
368            live_bootstrap = Some((positions, balances));
369        }
370
371        let portfolio_cfg = PortfolioConfig {
372            initial_balances: settings.initial_balances.clone(),
373            reporting_currency: settings.reporting_currency.clone(),
374            max_drawdown: Some(settings.risk.max_drawdown),
375        };
376        let portfolio = if let Some((positions, balances)) = live_bootstrap {
377            Portfolio::from_exchange_state(
378                positions,
379                balances,
380                portfolio_cfg.clone(),
381                market_registry.clone(),
382            )
383        } else if let Some(snapshot) = persisted.portfolio.take() {
384            Portfolio::from_state(snapshot, portfolio_cfg.clone(), market_registry.clone())
385        } else {
386            Portfolio::new(portfolio_cfg.clone(), market_registry.clone())
387        };
388        strategy_ctx.update_positions(portfolio.positions());
389        // Restore strategy state if found in persistence
390        if let Some(state) = persisted.strategy_state.take() {
391            info!("restoring strategy state from persistence");
392            strategy
393                .restore(state)
394                .context("failed to restore strategy state")?;
395        }
396        persisted.portfolio = Some(portfolio.snapshot());
397
398        let mut market_snapshots = HashMap::new();
399        for symbol in &symbols {
400            let mut snapshot = MarketSnapshot::default();
401            if let Some(price) = persisted.last_prices.get(symbol).copied() {
402                snapshot.last_trade = Some(price);
403            }
404            market_snapshots.insert(symbol.clone(), snapshot);
405        }
406
407        let metrics = LiveMetrics::new();
408        metrics.update_connection_status("public", public_connection.load(Ordering::SeqCst));
409        if let Some(flag) = &private_connection {
410            metrics.update_connection_status("private", flag.load(Ordering::SeqCst));
411        }
412        let metrics_task = spawn_metrics_server(metrics.registry(), settings.metrics_addr);
413        let dispatcher = AlertDispatcher::new(settings.alerting.webhook_url.clone());
414        let alerts = AlertManager::new(
415            settings.alerting,
416            dispatcher,
417            Some(public_connection.clone()),
418            private_connection.clone(),
419        );
420        let (private_event_tx, private_event_rx) = mpsc::channel(1024);
421        let last_private_sync = Arc::new(tokio::sync::Mutex::new(persisted.last_candle_ts));
422        let alerts = Arc::new(alerts);
423        let alert_task = alerts.spawn_watchdog();
424        let metrics = Arc::new(metrics);
425        let mut connection_monitors = Vec::new();
426        connection_monitors.push(spawn_connection_monitor(
427            shutdown.clone(),
428            public_connection.clone(),
429            metrics.clone(),
430            "public",
431        ));
432        if let Some(flag) = private_connection.clone() {
433            connection_monitors.push(spawn_connection_monitor(
434                shutdown.clone(),
435                flag,
436                metrics.clone(),
437                "private",
438            ));
439        }
440
441        if !settings.exec_backend.is_paper() {
442            let execution_engine = orchestrator.execution_engine();
443            let exec_client = execution_engine.client();
444            match settings.driver.as_str() {
445                "bybit" | "" => {
446                    #[cfg(feature = "bybit")]
447                    {
448                        let bybit = exec_client
449                            .as_ref()
450                            .as_any()
451                            .downcast_ref::<BybitClient>()
452                            .ok_or_else(|| anyhow!("execution client is not Bybit"))?;
453                        let creds = bybit
454                            .get_credentials()
455                            .ok_or_else(|| anyhow!("live execution requires Bybit credentials"))?;
456                        spawn_bybit_private_stream(
457                            creds,
458                            bybit.get_ws_url(),
459                            private_event_tx.clone(),
460                            exec_client.clone(),
461                            symbols.clone(),
462                            last_private_sync.clone(),
463                            private_connection.clone(),
464                            metrics.clone(),
465                            shutdown.clone(),
466                        );
467                    }
468                    #[cfg(not(feature = "bybit"))]
469                    {
470                        bail!("driver 'bybit' is unavailable without the 'bybit' feature");
471                    }
472                }
473                "binance" => {
474                    #[cfg(feature = "binance")]
475                    {
476                        spawn_binance_private_stream(
477                            exec_client.clone(),
478                            exchange_ws_url.clone(),
479                            private_event_tx.clone(),
480                            private_connection.clone(),
481                            metrics.clone(),
482                            shutdown.clone(),
483                        );
484                    }
485                    #[cfg(not(feature = "binance"))]
486                    {
487                        bail!("driver 'binance' is unavailable without the 'binance' feature");
488                    }
489                }
490                "paper" => {}
491                other => {
492                    bail!("private stream unsupported for driver '{other}'");
493                }
494            }
495        }
496
497        let recorder = if let Some(record_path) = settings.record_path.clone() {
498            let config = RecorderConfig {
499                root: record_path.clone(),
500                ..RecorderConfig::default()
501            };
502            match ParquetRecorder::spawn(config).await {
503                Ok(recorder) => {
504                    info!(path = %record_path.display(), "flight recorder enabled");
505                    Some(recorder)
506                }
507                Err(err) => {
508                    warn!(
509                        error = %err,
510                        path = %record_path.display(),
511                        "failed to start flight recorder"
512                    );
513                    None
514                }
515            }
516        } else {
517            None
518        };
519        let recorder_handle = recorder.as_ref().map(|rec| rec.handle());
520
521        let strategy = Arc::new(Mutex::new(strategy));
522        let strategy_ctx = Arc::new(Mutex::new(strategy_ctx));
523        let portfolio = Arc::new(Mutex::new(portfolio));
524        let market_cache = Arc::new(Mutex::new(market_snapshots));
525        let persisted = Arc::new(Mutex::new(persisted));
526        let orchestrator = Arc::new(orchestrator);
527        let event_bus = Arc::new(EventBus::new(2048));
528        let last_data_timestamp = Arc::new(AtomicI64::new(0));
529        let control_task = control::spawn_control_plane(
530            settings.control_addr,
531            portfolio.clone(),
532            orchestrator.clone(),
533            persisted.clone(),
534            last_data_timestamp.clone(),
535            shutdown.clone(),
536        );
537        let reconciliation_ctx = (!settings.exec_backend.is_paper()).then(|| {
538            Arc::new(ReconciliationContext::new(ReconciliationContextConfig {
539                client: orchestrator.execution_engine().client(),
540                portfolio: portfolio.clone(),
541                persisted: persisted.clone(),
542                state_repo: state_repo.clone(),
543                alerts: alerts.clone(),
544                metrics: metrics.clone(),
545                reporting_currency: settings.reporting_currency.clone(),
546                threshold: settings.reconciliation_threshold,
547            }))
548        });
549        let reconciliation_task = reconciliation_ctx.as_ref().map(|ctx| {
550            spawn_reconciliation_loop(
551                ctx.clone(),
552                shutdown.clone(),
553                settings.reconciliation_interval,
554            )
555        });
556        let subscriber_handles = spawn_event_subscribers(
557            event_bus.clone(),
558            strategy.clone(),
559            strategy_ctx.clone(),
560            orchestrator.clone(),
561            portfolio.clone(),
562            metrics.clone(),
563            alerts.clone(),
564            market_cache.clone(),
565            state_repo.clone(),
566            persisted.clone(),
567            settings.exec_backend,
568            recorder_handle.clone(),
569            last_data_timestamp.clone(),
570        );
571        let order_timeout_task = spawn_order_timeout_monitor(
572            orchestrator.clone(),
573            event_bus.clone(),
574            alerts.clone(),
575            shutdown.clone(),
576        );
577
578        info!(
579            symbols = ?symbols,
580            category = ?settings.category,
581            metrics_addr = %settings.metrics_addr,
582            state_path = %settings.state_path.display(),
583            history = settings.history,
584            "market stream ready"
585        );
586
587        for symbol in &symbols {
588            let ctx = shared_risk_context(symbol, &portfolio, &market_cache, &persisted).await;
589            orchestrator.update_risk_context(symbol.clone(), ctx);
590        }
591
592        Ok(Self {
593            market,
594            orchestrator,
595            state_repo,
596            persisted,
597            event_bus,
598            recorder,
599            control_task: Some(control_task),
600            shutdown,
601            metrics_task,
602            alert_task,
603            reconciliation_task,
604            reconciliation_ctx,
605            private_event_rx,
606            last_private_sync,
607            subscriber_handles,
608            connection_monitors,
609            order_timeout_task,
610            strategy,
611            _public_connection: public_connection,
612            _private_connection: private_connection,
613        })
614    }
615
616    async fn run(mut self) -> Result<()> {
617        info!("live session started");
618        if let Some(ctx) = self.reconciliation_ctx.as_ref() {
619            perform_state_reconciliation(ctx.as_ref())
620                .await
621                .context("initial state reconciliation failed")?;
622        }
623        let backoff = Duration::from_millis(200);
624        let mut orchestrator_timer = tokio::time::interval(Duration::from_secs(1));
625
626        while !self.shutdown.triggered() {
627            let mut progressed = false;
628
629            if let Some(tick) = self.market.next_tick().await? {
630                progressed = true;
631                self.event_bus.publish(Event::Tick(TickEvent { tick }));
632            }
633
634            if let Some(candle) = self.market.next_candle().await? {
635                progressed = true;
636                self.event_bus
637                    .publish(Event::Candle(CandleEvent { candle }));
638            }
639
640            if let Some(book) = self.market.next_order_book().await? {
641                progressed = true;
642                self.event_bus
643                    .publish(Event::OrderBook(OrderBookEvent { order_book: book }));
644            }
645
646            tokio::select! {
647                biased;
648                Some(event) = self.private_event_rx.recv() => {
649                    progressed = true;
650                    match event {
651                        BrokerEvent::OrderUpdate(order) => {
652                            info!(
653                                order_id = %order.id,
654                                status = ?order.status,
655                                symbol = %order.request.symbol,
656                                "received private order update"
657                            );
658                            self.event_bus
659                                .publish(Event::OrderUpdate(OrderUpdateEvent { order }));
660                        }
661                        BrokerEvent::Fill(fill) => {
662                            info!(
663                                order_id = %fill.order_id,
664                                symbol = %fill.symbol,
665                                qty = %fill.fill_quantity,
666                                price = %fill.fill_price,
667                                "received private fill"
668                            );
669                            self.event_bus.publish(Event::Fill(FillEvent { fill }));
670                        }
671                    }
672                }
673                _ = orchestrator_timer.tick() => {
674                    // Drive TWAP and other time-based algorithms
675                    if let Err(e) = self.orchestrator.on_timer_tick().await {
676                        error!("Orchestrator timer tick failed: {}", e);
677                    }
678                }
679                else => {}
680            }
681
682            if !progressed && !self.shutdown.sleep(backoff).await {
683                break;
684            }
685        }
686        info!("live session stopping");
687        self.metrics_task.abort();
688        if let Some(handle) = self.alert_task.take() {
689            handle.abort();
690        }
691        if let Some(handle) = self.reconciliation_task.take() {
692            handle.abort();
693        }
694        self.order_timeout_task.abort();
695        for handle in self.subscriber_handles.drain(..) {
696            handle.abort();
697        }
698        for handle in self.connection_monitors.drain(..) {
699            handle.abort();
700        }
701        if let Err(err) = persist_state(
702            self.state_repo.clone(),
703            self.persisted.clone(),
704            Some(self.strategy.clone()),
705        )
706        .await
707        {
708            warn!(error = %err, "failed to persist shutdown state");
709        }
710        if let Some(task) = self.control_task.take() {
711            if let Err(err) = task.await {
712                warn!(error = %err, "control plane server task aborted");
713            }
714        }
715        if let Some(recorder) = self.recorder.take() {
716            if let Err(err) = recorder.shutdown().await {
717                warn!(error = %err, "failed to flush flight recorder");
718            }
719        }
720        Ok(())
721    }
722}
723
724struct ReconciliationContext {
725    client: Arc<dyn ExecutionClient>,
726    portfolio: Arc<Mutex<Portfolio>>,
727    persisted: Arc<Mutex<LiveState>>,
728    state_repo: Arc<dyn StateRepository>,
729    alerts: Arc<AlertManager>,
730    metrics: Arc<LiveMetrics>,
731    reporting_currency: Symbol,
732    threshold: Decimal,
733}
734
735struct ReconciliationContextConfig {
736    client: Arc<dyn ExecutionClient>,
737    portfolio: Arc<Mutex<Portfolio>>,
738    persisted: Arc<Mutex<LiveState>>,
739    state_repo: Arc<dyn StateRepository>,
740    alerts: Arc<AlertManager>,
741    metrics: Arc<LiveMetrics>,
742    reporting_currency: Symbol,
743    threshold: Decimal,
744}
745
746impl ReconciliationContext {
747    fn new(config: ReconciliationContextConfig) -> Self {
748        let ReconciliationContextConfig {
749            client,
750            portfolio,
751            persisted,
752            state_repo,
753            alerts,
754            metrics,
755            reporting_currency,
756            threshold,
757        } = config;
758        let min_threshold = Decimal::new(1, 6); // 0.000001 as a practical floor
759        let threshold = if threshold <= Decimal::ZERO {
760            min_threshold
761        } else {
762            threshold
763        };
764        Self {
765            client,
766            portfolio,
767            persisted,
768            state_repo,
769            alerts,
770            metrics,
771            reporting_currency,
772            threshold,
773        }
774    }
775}
776
777fn spawn_reconciliation_loop(
778    ctx: Arc<ReconciliationContext>,
779    shutdown: ShutdownSignal,
780    interval: Duration,
781) -> JoinHandle<()> {
782    tokio::spawn(async move {
783        while shutdown.sleep(interval).await {
784            if let Err(err) = perform_state_reconciliation(ctx.as_ref()).await {
785                error!(error = %err, "periodic state reconciliation failed");
786            }
787        }
788    })
789}
790
791async fn perform_state_reconciliation(ctx: &ReconciliationContext) -> Result<()> {
792    info!("running state reconciliation");
793    let remote_positions = ctx
794        .client
795        .positions()
796        .await
797        .context("failed to fetch remote positions")?;
798    let remote_balances = ctx
799        .client
800        .account_balances()
801        .await
802        .context("failed to fetch remote balances")?;
803    let (local_positions, local_cash) = {
804        let guard = ctx.portfolio.lock().await;
805        (guard.positions(), guard.cash())
806    };
807
808    let remote_map = positions_to_map(remote_positions);
809    let local_map = positions_to_map(local_positions);
810    let mut tracked_symbols: HashSet<String> = HashSet::new();
811    tracked_symbols.extend(remote_map.keys().cloned());
812    tracked_symbols.extend(local_map.keys().cloned());
813
814    let mut severe_findings = Vec::new();
815    for symbol in tracked_symbols {
816        let local_qty = local_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
817        let remote_qty = remote_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
818        let diff = (local_qty - remote_qty).abs();
819        let diff_value = diff.to_f64().unwrap_or(0.0);
820        ctx.metrics.update_position_diff(&symbol, diff_value);
821        if diff > Decimal::ZERO {
822            warn!(
823                symbol = %symbol,
824                local = %local_qty,
825                remote = %remote_qty,
826                diff = %diff,
827                "position mismatch detected during reconciliation"
828            );
829            let pct = normalize_diff(diff, remote_qty);
830            if pct >= ctx.threshold {
831                error!(
832                    symbol = %symbol,
833                    local = %local_qty,
834                    remote = %remote_qty,
835                    diff = %diff,
836                    pct = %pct,
837                    "position mismatch exceeds threshold"
838                );
839                severe_findings.push(format!(
840                    "{symbol} local={local_qty} remote={remote_qty} diff={diff}"
841                ));
842            }
843        }
844    }
845
846    let reporting = ctx.reporting_currency.as_str();
847    let remote_cash = remote_balances
848        .iter()
849        .find(|balance| balance.currency == reporting)
850        .map(|balance| balance.available)
851        .unwrap_or_else(|| Decimal::ZERO);
852    let cash_diff = (remote_cash - local_cash).abs();
853    ctx.metrics
854        .update_balance_diff(reporting, cash_diff.to_f64().unwrap_or(0.0));
855    if cash_diff > Decimal::ZERO {
856        warn!(
857            currency = %reporting,
858            local = %local_cash,
859            remote = %remote_cash,
860            diff = %cash_diff,
861            "balance mismatch detected during reconciliation"
862        );
863        let pct = normalize_diff(cash_diff, remote_cash);
864        if pct >= ctx.threshold {
865            error!(
866                currency = %reporting,
867                local = %local_cash,
868                remote = %remote_cash,
869                diff = %cash_diff,
870                pct = %pct,
871                "balance mismatch exceeds threshold"
872            );
873            severe_findings.push(format!(
874                "{reporting} balance local={local_cash} remote={remote_cash} diff={cash_diff}"
875            ));
876        }
877    }
878
879    if severe_findings.is_empty() {
880        info!("state reconciliation complete with no critical divergence");
881        return Ok(());
882    }
883
884    let alert_body = severe_findings.join("; ");
885    ctx.alerts
886        .notify("State reconciliation divergence", &alert_body)
887        .await;
888    enforce_liquidate_only(ctx).await;
889    Ok(())
890}
891
892async fn enforce_liquidate_only(ctx: &ReconciliationContext) {
893    let snapshot = {
894        let mut guard = ctx.portfolio.lock().await;
895        if !guard.set_liquidate_only(true) {
896            return;
897        }
898        info!("entering liquidate-only mode due to reconciliation divergence");
899        guard.snapshot()
900    };
901    {
902        let mut state = ctx.persisted.lock().await;
903        state.portfolio = Some(snapshot);
904    }
905    if let Err(err) = persist_state(ctx.state_repo.clone(), ctx.persisted.clone(), None).await {
906        warn!(error = %err, "failed to persist liquidate-only transition");
907    }
908}
909
910fn positions_to_map(positions: Vec<Position>) -> HashMap<String, Decimal> {
911    let mut map = HashMap::new();
912    for position in positions {
913        map.insert(position.symbol.clone(), position_signed_qty(&position));
914    }
915    map
916}
917
918fn position_signed_qty(position: &Position) -> Decimal {
919    match position.side {
920        Some(Side::Buy) => position.quantity,
921        Some(Side::Sell) => -position.quantity,
922        None => Decimal::ZERO,
923    }
924}
925
926fn normalize_diff(diff: Decimal, reference: Decimal) -> Decimal {
927    if diff <= Decimal::ZERO {
928        Decimal::ZERO
929    } else {
930        let denominator = std::cmp::max(reference.abs(), Decimal::ONE);
931        diff / denominator
932    }
933}
934
935fn build_exchange_payload(exchange: &ExchangeConfig, settings: &LiveSessionSettings) -> Value {
936    let mut payload = serde_json::Map::new();
937    payload.insert("rest_url".into(), Value::String(exchange.rest_url.clone()));
938    payload.insert("ws_url".into(), Value::String(exchange.ws_url.clone()));
939    payload.insert("api_key".into(), Value::String(exchange.api_key.clone()));
940    payload.insert(
941        "api_secret".into(),
942        Value::String(exchange.api_secret.clone()),
943    );
944    payload.insert(
945        "category".into(),
946        Value::String(settings.category.as_path().to_string()),
947    );
948    payload.insert(
949        "orderbook_depth".into(),
950        Value::Number(serde_json::Number::from(settings.orderbook_depth as u64)),
951    );
952    if let Value::Object(extra) = exchange.params.clone() {
953        for (key, value) in extra {
954            payload.insert(key, value);
955        }
956    }
957    Value::Object(payload)
958}
959
960#[derive(Default)]
961struct MarketSnapshot {
962    last_trade: Option<Price>,
963    last_trade_ts: Option<DateTime<Utc>>,
964    last_candle: Option<Candle>,
965}
966
967impl MarketSnapshot {
968    fn price(&self) -> Option<Price> {
969        self.last_trade
970            .or_else(|| self.last_candle.as_ref().map(|c| c.close))
971    }
972}
973
974pub struct ShutdownSignal {
975    flag: Arc<AtomicBool>,
976    notify: Arc<Notify>,
977}
978
979impl ShutdownSignal {
980    pub fn new() -> Self {
981        let flag = Arc::new(AtomicBool::new(false));
982        let notify = Arc::new(Notify::new());
983        let flag_clone = flag.clone();
984        let notify_clone = notify.clone();
985        tokio::spawn(async move {
986            if tokio::signal::ctrl_c().await.is_ok() {
987                flag_clone.store(true, Ordering::SeqCst);
988                notify_clone.notify_waiters();
989            }
990        });
991        Self { flag, notify }
992    }
993
994    pub fn trigger(&self) {
995        self.flag.store(true, Ordering::SeqCst);
996        self.notify.notify_waiters();
997    }
998
999    pub fn triggered(&self) -> bool {
1000        self.flag.load(Ordering::SeqCst)
1001    }
1002
1003    pub async fn wait(&self) {
1004        if self.triggered() {
1005            return;
1006        }
1007        self.notify.notified().await;
1008    }
1009
1010    async fn sleep(&self, duration: Duration) -> bool {
1011        tokio::select! {
1012            _ = tokio::time::sleep(duration) => true,
1013            _ = self.notify.notified() => false,
1014        }
1015    }
1016}
1017
1018impl Default for ShutdownSignal {
1019    fn default() -> Self {
1020        Self::new()
1021    }
1022}
1023
1024impl Clone for ShutdownSignal {
1025    fn clone(&self) -> Self {
1026        Self {
1027            flag: self.flag.clone(),
1028            notify: self.notify.clone(),
1029        }
1030    }
1031}
1032
1033#[allow(clippy::too_many_arguments)]
1034fn spawn_event_subscribers(
1035    bus: Arc<EventBus>,
1036    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1037    strategy_ctx: Arc<Mutex<StrategyContext>>,
1038    orchestrator: Arc<OrderOrchestrator>,
1039    portfolio: Arc<Mutex<Portfolio>>,
1040    metrics: Arc<LiveMetrics>,
1041    alerts: Arc<AlertManager>,
1042    market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1043    state_repo: Arc<dyn StateRepository>,
1044    persisted: Arc<Mutex<LiveState>>,
1045    exec_backend: ExecutionBackend,
1046    recorder: Option<RecorderHandle>,
1047    last_data_timestamp: Arc<AtomicI64>,
1048) -> Vec<JoinHandle<()>> {
1049    let mut handles = Vec::new();
1050    let market_recorder = recorder.clone();
1051
1052    let market_bus = bus.clone();
1053    let market_strategy = strategy.clone();
1054    let market_ctx = strategy_ctx.clone();
1055    let market_metrics = metrics.clone();
1056    let market_alerts = alerts.clone();
1057    let market_state = state_repo.clone();
1058    let market_persisted = persisted.clone();
1059    let market_portfolio = portfolio.clone();
1060    let market_snapshot = market.clone();
1061    let orchestrator_clone = orchestrator.clone();
1062    let market_data_tracker = last_data_timestamp.clone();
1063    handles.push(tokio::spawn(async move {
1064        let recorder = market_recorder;
1065        let mut stream = market_bus.subscribe();
1066        loop {
1067            match stream.recv().await {
1068                Ok(Event::Tick(evt)) => {
1069                    if let Some(handle) = recorder.as_ref() {
1070                        handle.record_tick(evt.tick.clone());
1071                    }
1072                    if let Err(err) = process_tick_event(
1073                        evt.tick,
1074                        market_strategy.clone(),
1075                        market_ctx.clone(),
1076                        market_metrics.clone(),
1077                        market_alerts.clone(),
1078                        market_snapshot.clone(),
1079                        market_portfolio.clone(),
1080                        market_state.clone(),
1081                        market_persisted.clone(),
1082                        market_bus.clone(),
1083                        market_data_tracker.clone(),
1084                    )
1085                    .await
1086                    {
1087                        warn!(error = %err, "tick handler failed");
1088                    }
1089                }
1090                Ok(Event::Candle(evt)) => {
1091                    if let Some(handle) = recorder.as_ref() {
1092                        handle.record_candle(evt.candle.clone());
1093                    }
1094                    if let Err(err) = process_candle_event(
1095                        evt.candle,
1096                        market_strategy.clone(),
1097                        market_ctx.clone(),
1098                        market_metrics.clone(),
1099                        market_alerts.clone(),
1100                        market_snapshot.clone(),
1101                        market_portfolio.clone(),
1102                        orchestrator_clone.clone(),
1103                        exec_backend,
1104                        market_state.clone(),
1105                        market_persisted.clone(),
1106                        market_bus.clone(),
1107                        market_data_tracker.clone(),
1108                    )
1109                    .await
1110                    {
1111                        warn!(error = %err, "candle handler failed");
1112                    }
1113                }
1114                Ok(Event::OrderBook(evt)) => {
1115                    if let Err(err) = process_order_book_event(
1116                        evt.order_book,
1117                        market_strategy.clone(),
1118                        market_ctx.clone(),
1119                        market_metrics.clone(),
1120                        market_alerts.clone(),
1121                        market_snapshot.clone(),
1122                        market_bus.clone(),
1123                        market_data_tracker.clone(),
1124                    )
1125                    .await
1126                    {
1127                        warn!(error = %err, "order book handler failed");
1128                    }
1129                }
1130                Ok(_) => {}
1131                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1132                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1133                    warn!(lag = lag, "market subscriber lagged");
1134                    continue;
1135                }
1136            }
1137        }
1138    }));
1139
1140    let exec_bus = bus.clone();
1141    let exec_portfolio = portfolio.clone();
1142    let exec_market = market.clone();
1143    let exec_persisted = persisted.clone();
1144    let exec_alerts = alerts.clone();
1145    let exec_metrics = metrics.clone();
1146    let exec_orchestrator = orchestrator.clone();
1147    handles.push(tokio::spawn(async move {
1148        let orchestrator = exec_orchestrator.clone();
1149        let mut stream = exec_bus.subscribe();
1150        loop {
1151            match stream.recv().await {
1152                Ok(Event::Signal(evt)) => {
1153                    if let Err(err) = process_signal_event(
1154                        evt.signal,
1155                        orchestrator.clone(),
1156                        exec_portfolio.clone(),
1157                        exec_market.clone(),
1158                        exec_persisted.clone(),
1159                        exec_alerts.clone(),
1160                        exec_metrics.clone(),
1161                    )
1162                    .await
1163                    {
1164                        warn!(error = %err, "signal handler failed");
1165                    }
1166                }
1167                Ok(_) => {}
1168                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1169                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1170                    warn!(lag = lag, "signal subscriber lagged");
1171                    continue;
1172                }
1173            }
1174        }
1175    }));
1176
1177    let fill_bus = bus.clone();
1178    let fill_state = state_repo.clone();
1179    let fill_orchestrator = orchestrator.clone();
1180    let fill_persisted = persisted.clone();
1181    let fill_alerts = alerts.clone();
1182    let fill_recorder = recorder.clone();
1183    handles.push(tokio::spawn(async move {
1184        let orchestrator = fill_orchestrator.clone();
1185        let persisted = fill_persisted.clone();
1186        let recorder = fill_recorder;
1187        let mut stream = fill_bus.subscribe();
1188        loop {
1189            match stream.recv().await {
1190                Ok(Event::Fill(evt)) => {
1191                    if let Some(handle) = recorder.as_ref() {
1192                        handle.record_fill(evt.fill.clone());
1193                    }
1194                    if let Err(err) = process_fill_event(
1195                        evt.fill,
1196                        portfolio.clone(),
1197                        strategy.clone(),
1198                        strategy_ctx.clone(),
1199                        orchestrator.clone(),
1200                        metrics.clone(),
1201                        fill_alerts.clone(),
1202                        fill_state.clone(),
1203                        persisted.clone(),
1204                    )
1205                    .await
1206                    {
1207                        warn!(error = %err, "fill handler failed");
1208                    }
1209                }
1210                Ok(_) => {}
1211                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1212                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1213                    warn!(lag = lag, "fill subscriber lagged");
1214                    continue;
1215                }
1216            }
1217        }
1218    }));
1219
1220    let order_bus = bus.clone();
1221    let order_persisted = persisted.clone();
1222    let order_alerts = alerts.clone();
1223    let order_orchestrator = orchestrator.clone();
1224    // Note: We don't pass strategy to order update handler to avoid lock contention
1225    // on high-frequency updates. Strategy state is snapshotted on candles/fills.
1226    let order_recorder = recorder;
1227    handles.push(tokio::spawn(async move {
1228        let orchestrator = order_orchestrator.clone();
1229        let persisted = order_persisted.clone();
1230        let recorder = order_recorder;
1231        let mut stream = order_bus.subscribe();
1232        loop {
1233            match stream.recv().await {
1234                Ok(Event::OrderUpdate(evt)) => {
1235                    if let Some(handle) = recorder.as_ref() {
1236                        handle.record_order(evt.order.clone());
1237                    }
1238                    if let Err(err) = process_order_update_event(
1239                        evt.order,
1240                        orchestrator.clone(),
1241                        order_alerts.clone(),
1242                        state_repo.clone(),
1243                        persisted.clone(),
1244                    )
1245                    .await
1246                    {
1247                        warn!(error = %err, "order update handler failed");
1248                    }
1249                }
1250                Ok(_) => {}
1251                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1252                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1253                    warn!(lag = lag, "order subscriber lagged");
1254                    continue;
1255                }
1256            }
1257        }
1258    }));
1259
1260    handles
1261}
1262
1263#[allow(clippy::too_many_arguments)]
1264async fn process_tick_event(
1265    tick: Tick,
1266    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1267    strategy_ctx: Arc<Mutex<StrategyContext>>,
1268    metrics: Arc<LiveMetrics>,
1269    alerts: Arc<AlertManager>,
1270    market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1271    portfolio: Arc<Mutex<Portfolio>>,
1272    state_repo: Arc<dyn StateRepository>,
1273    persisted: Arc<Mutex<LiveState>>,
1274    bus: Arc<EventBus>,
1275    last_data_timestamp: Arc<AtomicI64>,
1276) -> Result<()> {
1277    metrics.inc_tick();
1278    metrics.update_staleness(0.0);
1279    metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1280    last_data_timestamp.store(tick.exchange_timestamp.timestamp(), Ordering::SeqCst);
1281    alerts.heartbeat().await;
1282    {
1283        let mut guard = market.lock().await;
1284        if let Some(snapshot) = guard.get_mut(&tick.symbol) {
1285            snapshot.last_trade = Some(tick.price);
1286            snapshot.last_trade_ts = Some(tick.exchange_timestamp);
1287        }
1288    }
1289    let mut drawdown_triggered = false;
1290    let mut snapshot_on_trigger = None;
1291    {
1292        let mut guard = portfolio.lock().await;
1293        let was_liquidate_only = guard.liquidate_only();
1294        match guard.update_market_data(&tick.symbol, tick.price) {
1295            Ok(_) => {
1296                if !was_liquidate_only && guard.liquidate_only() {
1297                    drawdown_triggered = true;
1298                    snapshot_on_trigger = Some(guard.snapshot());
1299                }
1300            }
1301            Err(err) => {
1302                warn!(
1303                    symbol = %tick.symbol,
1304                    error = %err,
1305                    "failed to refresh market data"
1306                );
1307            }
1308        }
1309    }
1310    {
1311        let mut state = persisted.lock().await;
1312        state.last_prices.insert(tick.symbol.clone(), tick.price);
1313        if drawdown_triggered {
1314            if let Some(snapshot) = snapshot_on_trigger.take() {
1315                state.portfolio = Some(snapshot);
1316            }
1317        }
1318    }
1319    if drawdown_triggered {
1320        persist_state(
1321            state_repo.clone(),
1322            persisted.clone(),
1323            Some(strategy.clone()),
1324        )
1325        .await?;
1326        alert_liquidate_only(alerts.clone()).await;
1327    }
1328    {
1329        let mut ctx = strategy_ctx.lock().await;
1330        ctx.push_tick(tick.clone());
1331        let lock_start = Instant::now();
1332        let mut strat = strategy.lock().await;
1333        log_strategy_lock("tick", lock_start.elapsed());
1334        let call_start = Instant::now();
1335        strat
1336            .on_tick(&ctx, &tick)
1337            .await
1338            .context("strategy failure on tick event")?;
1339        log_strategy_call("tick", call_start.elapsed());
1340    }
1341    emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1342    Ok(())
1343}
1344
1345#[allow(clippy::too_many_arguments)]
1346async fn process_candle_event(
1347    candle: Candle,
1348    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1349    strategy_ctx: Arc<Mutex<StrategyContext>>,
1350    metrics: Arc<LiveMetrics>,
1351    alerts: Arc<AlertManager>,
1352    market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1353    portfolio: Arc<Mutex<Portfolio>>,
1354    orchestrator: Arc<OrderOrchestrator>,
1355    exec_backend: ExecutionBackend,
1356    state_repo: Arc<dyn StateRepository>,
1357    persisted: Arc<Mutex<LiveState>>,
1358    bus: Arc<EventBus>,
1359    last_data_timestamp: Arc<AtomicI64>,
1360) -> Result<()> {
1361    metrics.inc_candle();
1362    metrics.update_staleness(0.0);
1363    metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1364    last_data_timestamp.store(candle.timestamp.timestamp(), Ordering::SeqCst);
1365    alerts.heartbeat().await;
1366    metrics.update_price(&candle.symbol, candle.close.to_f64().unwrap_or(0.0));
1367    {
1368        let mut guard = market.lock().await;
1369        if let Some(snapshot) = guard.get_mut(&candle.symbol) {
1370            snapshot.last_candle = Some(candle.clone());
1371            snapshot.last_trade = Some(candle.close);
1372        }
1373    }
1374    if exec_backend.is_paper() {
1375        let client = orchestrator.execution_engine().client();
1376        if let Some(paper) = client.as_any().downcast_ref::<PaperExecutionClient>() {
1377            paper.update_price(&candle.symbol, candle.close);
1378        }
1379    }
1380    let mut candle_drawdown_triggered = false;
1381    let mut candle_snapshot = None;
1382    {
1383        let mut guard = portfolio.lock().await;
1384        let was_liquidate_only = guard.liquidate_only();
1385        match guard.update_market_data(&candle.symbol, candle.close) {
1386            Ok(_) => {
1387                if !was_liquidate_only && guard.liquidate_only() {
1388                    candle_drawdown_triggered = true;
1389                    candle_snapshot = Some(guard.snapshot());
1390                }
1391            }
1392            Err(err) => {
1393                warn!(
1394                    symbol = %candle.symbol,
1395                    error = %err,
1396                    "failed to refresh market data"
1397                );
1398            }
1399        }
1400    }
1401    if candle_drawdown_triggered {
1402        if let Some(snapshot) = candle_snapshot.take() {
1403            let mut persisted_guard = persisted.lock().await;
1404            persisted_guard.portfolio = Some(snapshot);
1405        }
1406        alert_liquidate_only(alerts.clone()).await;
1407    }
1408    {
1409        let mut ctx = strategy_ctx.lock().await;
1410        ctx.push_candle(candle.clone());
1411        let lock_start = Instant::now();
1412        let mut strat = strategy.lock().await;
1413        log_strategy_lock("candle", lock_start.elapsed());
1414        let call_start = Instant::now();
1415        strat
1416            .on_candle(&ctx, &candle)
1417            .await
1418            .context("strategy failure on candle event")?;
1419        log_strategy_call("candle", call_start.elapsed());
1420    }
1421    {
1422        let mut snapshot = persisted.lock().await;
1423        snapshot.last_candle_ts = Some(candle.timestamp);
1424        snapshot
1425            .last_prices
1426            .insert(candle.symbol.clone(), candle.close);
1427    }
1428    persist_state(
1429        state_repo.clone(),
1430        persisted.clone(),
1431        Some(strategy.clone()),
1432    )
1433    .await?;
1434    let ctx = shared_risk_context(&candle.symbol, &portfolio, &market, &persisted).await;
1435    orchestrator.update_risk_context(candle.symbol.clone(), ctx);
1436    emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1437    Ok(())
1438}
1439
1440#[allow(clippy::too_many_arguments)]
1441async fn process_order_book_event(
1442    book: OrderBook,
1443    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1444    strategy_ctx: Arc<Mutex<StrategyContext>>,
1445    metrics: Arc<LiveMetrics>,
1446    alerts: Arc<AlertManager>,
1447    _market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1448    bus: Arc<EventBus>,
1449    last_data_timestamp: Arc<AtomicI64>,
1450) -> Result<()> {
1451    metrics.update_staleness(0.0);
1452    alerts.heartbeat().await;
1453    last_data_timestamp.store(book.timestamp.timestamp(), Ordering::SeqCst);
1454    {
1455        let mut ctx = strategy_ctx.lock().await;
1456        ctx.push_order_book(book.clone());
1457        let lock_start = Instant::now();
1458        let mut strat = strategy.lock().await;
1459        log_strategy_lock("order_book", lock_start.elapsed());
1460        let call_start = Instant::now();
1461        strat
1462            .on_order_book(&ctx, &book)
1463            .await
1464            .context("strategy failure on order book")?;
1465        log_strategy_call("order_book", call_start.elapsed());
1466    }
1467    emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1468    Ok(())
1469}
1470
1471async fn process_signal_event(
1472    signal: Signal,
1473    orchestrator: Arc<OrderOrchestrator>,
1474    portfolio: Arc<Mutex<Portfolio>>,
1475    market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1476    persisted: Arc<Mutex<LiveState>>,
1477    alerts: Arc<AlertManager>,
1478    metrics: Arc<LiveMetrics>,
1479) -> Result<()> {
1480    let ctx = shared_risk_context(&signal.symbol, &portfolio, &market, &persisted).await;
1481    orchestrator.update_risk_context(signal.symbol.clone(), ctx);
1482    match orchestrator.on_signal(&signal, &ctx).await {
1483        Ok(()) => {
1484            alerts.reset_order_failures().await;
1485        }
1486        Err(err) => {
1487            metrics.inc_order_failure();
1488            alerts
1489                .order_failure(&format!("orchestrator error: {err}"))
1490                .await;
1491        }
1492    }
1493    Ok(())
1494}
1495
1496fn log_strategy_lock(event: &str, wait: Duration) {
1497    let wait_ms = wait.as_secs_f64() * 1000.0;
1498    if wait >= STRATEGY_LOCK_WARN_THRESHOLD {
1499        warn!(target: "strategy", event, wait_ms, "strategy lock wait exceeded threshold");
1500    } else {
1501        trace!(target: "strategy", event, wait_ms, "strategy lock acquired");
1502    }
1503}
1504
1505fn log_strategy_call(event: &str, elapsed: Duration) {
1506    let duration_ms = elapsed.as_secs_f64() * 1000.0;
1507    if elapsed >= STRATEGY_CALL_WARN_THRESHOLD {
1508        warn!(target: "strategy", event, duration_ms, "strategy call latency above threshold");
1509    } else {
1510        trace!(target: "strategy", event, duration_ms, "strategy call completed");
1511    }
1512}
1513
1514#[allow(clippy::too_many_arguments)]
1515async fn process_fill_event(
1516    fill: Fill,
1517    portfolio: Arc<Mutex<Portfolio>>,
1518    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1519    strategy_ctx: Arc<Mutex<StrategyContext>>,
1520    orchestrator: Arc<OrderOrchestrator>,
1521    metrics: Arc<LiveMetrics>,
1522    alerts: Arc<AlertManager>,
1523    state_repo: Arc<dyn StateRepository>,
1524    persisted: Arc<Mutex<LiveState>>,
1525) -> Result<()> {
1526    let mut drawdown_triggered = false;
1527    {
1528        let mut guard = portfolio.lock().await;
1529        let was_liquidate_only = guard.liquidate_only();
1530        guard
1531            .apply_fill(&fill)
1532            .context("Failed to apply fill to portfolio")?;
1533        if !was_liquidate_only && guard.liquidate_only() {
1534            drawdown_triggered = true;
1535        }
1536        let snapshot = guard.snapshot();
1537        let mut persisted_guard = persisted.lock().await;
1538        persisted_guard.portfolio = Some(snapshot);
1539    }
1540    {
1541        let positions = {
1542            let guard = portfolio.lock().await;
1543            guard.positions()
1544        };
1545        let mut ctx = strategy_ctx.lock().await;
1546        ctx.update_positions(positions);
1547    }
1548    orchestrator.on_fill(&fill).await.ok();
1549    {
1550        let ctx = strategy_ctx.lock().await;
1551        let lock_start = Instant::now();
1552        let mut strat = strategy.lock().await;
1553        log_strategy_lock("fill", lock_start.elapsed());
1554        let call_start = Instant::now();
1555        strat
1556            .on_fill(&ctx, &fill)
1557            .await
1558            .context("Strategy failed on fill event")?;
1559        log_strategy_call("fill", call_start.elapsed());
1560    }
1561    let equity = {
1562        let guard = portfolio.lock().await;
1563        guard.equity()
1564    };
1565    if let Some(value) = equity.to_f64() {
1566        metrics.update_equity(value);
1567    }
1568    alerts.update_equity(equity).await;
1569    metrics.inc_order();
1570    alerts
1571        .notify(
1572            "Order Filled",
1573            &format!(
1574                "order filled: {}@{} ({})",
1575                fill.fill_quantity,
1576                fill.fill_price,
1577                match fill.side {
1578                    Side::Buy => "buy",
1579                    Side::Sell => "sell",
1580                }
1581            ),
1582        )
1583        .await;
1584    if drawdown_triggered {
1585        alert_liquidate_only(alerts.clone()).await;
1586    }
1587    persist_state(
1588        state_repo.clone(),
1589        persisted.clone(),
1590        Some(strategy.clone()),
1591    )
1592    .await?;
1593    Ok(())
1594}
1595
1596async fn process_order_update_event(
1597    order: Order,
1598    orchestrator: Arc<OrderOrchestrator>,
1599    alerts: Arc<AlertManager>,
1600    state_repo: Arc<dyn StateRepository>,
1601    persisted: Arc<Mutex<LiveState>>,
1602) -> Result<()> {
1603    orchestrator.on_order_update(&order);
1604    if matches!(order.status, OrderStatus::Rejected) {
1605        error!(
1606            order_id = %order.id,
1607            symbol = %order.request.symbol,
1608            "order rejected by exchange"
1609        );
1610        alerts.order_failure("order rejected by exchange").await;
1611        alerts
1612            .notify(
1613                "Order rejected",
1614                &format!(
1615                    "Order {} for {} was rejected",
1616                    order.id, order.request.symbol
1617                ),
1618            )
1619            .await;
1620    }
1621    {
1622        let mut snapshot = persisted.lock().await;
1623        let mut found = false;
1624        for existing in &mut snapshot.open_orders {
1625            if existing.id == order.id {
1626                *existing = order.clone();
1627                found = true;
1628                break;
1629            }
1630        }
1631        if !found {
1632            snapshot.open_orders.push(order.clone());
1633        }
1634        if matches!(
1635            order.status,
1636            OrderStatus::Filled | OrderStatus::Canceled | OrderStatus::Rejected
1637        ) {
1638            snapshot.open_orders.retain(|o| o.id != order.id);
1639        }
1640    }
1641    persist_state(state_repo, persisted, None).await?;
1642    Ok(())
1643}
1644
1645async fn emit_signals(
1646    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1647    bus: Arc<EventBus>,
1648    metrics: Arc<LiveMetrics>,
1649) {
1650    let signals = {
1651        let mut strat = strategy.lock().await;
1652        strat.drain_signals()
1653    };
1654    if signals.is_empty() {
1655        return;
1656    }
1657    metrics.inc_signals(signals.len());
1658    for signal in signals {
1659        bus.publish(Event::Signal(SignalEvent { signal }));
1660    }
1661}
1662
1663async fn persist_state(
1664    repo: Arc<dyn StateRepository>,
1665    persisted: Arc<Mutex<LiveState>>,
1666    strategy: Option<Arc<Mutex<Box<dyn Strategy>>>>,
1667) -> Result<()> {
1668    if let Some(strat_lock) = strategy {
1669        // Snapshot strategy state before cloning the full state for persistence
1670        let strat = strat_lock.lock().await;
1671        if let Ok(json_state) = strat.snapshot() {
1672            let mut guard = persisted.lock().await;
1673            guard.strategy_state = Some(json_state);
1674        } else {
1675            warn!("failed to snapshot strategy state");
1676        }
1677    }
1678
1679    let snapshot = {
1680        let guard = persisted.lock().await;
1681        guard.clone()
1682    };
1683    tokio::task::spawn_blocking(move || repo.save(&snapshot))
1684        .await
1685        .map_err(|err| anyhow!("state persistence task failed: {err}"))?
1686        .map_err(|err| anyhow!(err.to_string()))
1687}
1688
1689async fn shared_risk_context(
1690    symbol: &str,
1691    portfolio: &Arc<Mutex<Portfolio>>,
1692    market: &Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1693    persisted: &Arc<Mutex<LiveState>>,
1694) -> RiskContext {
1695    let (signed_qty, equity, liquidate_only) = {
1696        let guard = portfolio.lock().await;
1697        (
1698            guard.signed_position_qty(symbol),
1699            guard.equity(),
1700            guard.liquidate_only(),
1701        )
1702    };
1703    let observed_price = {
1704        let guard = market.lock().await;
1705        guard.get(symbol).and_then(|snapshot| snapshot.price())
1706    };
1707    let last_price = if let Some(price) = observed_price {
1708        price
1709    } else {
1710        let guard = persisted.lock().await;
1711        guard
1712            .last_prices
1713            .get(symbol)
1714            .copied()
1715            .unwrap_or(Decimal::ZERO)
1716    };
1717    RiskContext {
1718        signed_position_qty: signed_qty,
1719        portfolio_equity: equity,
1720        last_price,
1721        liquidate_only,
1722    }
1723}
1724
1725async fn alert_liquidate_only(alerts: Arc<AlertManager>) {
1726    alerts
1727        .notify(
1728            "Max drawdown triggered",
1729            "Portfolio entered liquidate-only mode; new exposure blocked until review",
1730        )
1731        .await;
1732}
1733
1734fn spawn_connection_monitor(
1735    shutdown: ShutdownSignal,
1736    flag: Arc<AtomicBool>,
1737    metrics: Arc<LiveMetrics>,
1738    stream: &'static str,
1739) -> JoinHandle<()> {
1740    tokio::spawn(async move {
1741        loop {
1742            metrics.update_connection_status(stream, flag.load(Ordering::SeqCst));
1743            if !shutdown.sleep(Duration::from_secs(5)).await {
1744                break;
1745            }
1746        }
1747    })
1748}
1749
1750fn spawn_order_timeout_monitor(
1751    orchestrator: Arc<OrderOrchestrator>,
1752    bus: Arc<EventBus>,
1753    alerts: Arc<AlertManager>,
1754    shutdown: ShutdownSignal,
1755) -> JoinHandle<()> {
1756    tokio::spawn(async move {
1757        let mut ticker = tokio::time::interval(tesser_execution::orchestrator::ORDER_POLL_INTERVAL);
1758        loop {
1759            ticker.tick().await;
1760            if shutdown.triggered() {
1761                break;
1762            }
1763            match orchestrator.poll_stale_orders().await {
1764                Ok(updates) => {
1765                    for order in updates {
1766                        if matches!(order.status, OrderStatus::Rejected | OrderStatus::Canceled) {
1767                            let message = format!(
1768                                "Order {} for {} timed out after {}s",
1769                                order.id,
1770                                order.request.symbol,
1771                                tesser_execution::orchestrator::ORDER_TIMEOUT.as_secs()
1772                            );
1773                            error!(%message);
1774                            alerts.order_failure(&message).await;
1775                            alerts.notify("Order timeout", &message).await;
1776                        }
1777                        bus.publish(Event::OrderUpdate(OrderUpdateEvent { order }));
1778                    }
1779                }
1780                Err(err) => {
1781                    warn!(error = %err, "order timeout monitor failed");
1782                }
1783            }
1784        }
1785    })
1786}
1787
1788async fn load_market_registry(
1789    client: Arc<dyn ExecutionClient>,
1790    settings: &LiveSessionSettings,
1791) -> Result<Arc<MarketRegistry>> {
1792    if let Some(path) = &settings.markets_file {
1793        let registry = MarketRegistry::load_from_file(path)
1794            .with_context(|| format!("failed to load markets from {}", path.display()))?;
1795        return Ok(Arc::new(registry));
1796    }
1797
1798    if settings.exec_backend.is_paper() {
1799        return Err(anyhow!(
1800            "paper execution requires --markets-file when exchange metadata is unavailable"
1801        ));
1802    }
1803
1804    let instruments = client
1805        .list_instruments(settings.category.as_path())
1806        .await
1807        .context("failed to fetch instruments from execution client")?;
1808    let registry =
1809        MarketRegistry::from_instruments(instruments).map_err(|err| anyhow!(err.to_string()))?;
1810    Ok(Arc::new(registry))
1811}
1812
1813#[cfg(feature = "bybit")]
1814#[allow(clippy::too_many_arguments)]
1815fn spawn_bybit_private_stream(
1816    creds: BybitCredentials,
1817    ws_url: String,
1818    private_tx: mpsc::Sender<BrokerEvent>,
1819    exec_client: Arc<dyn ExecutionClient>,
1820    symbols: Vec<String>,
1821    last_sync: Arc<tokio::sync::Mutex<Option<DateTime<Utc>>>>,
1822    private_connection_flag: Option<Arc<AtomicBool>>,
1823    metrics: Arc<LiveMetrics>,
1824    shutdown: ShutdownSignal,
1825) {
1826    tokio::spawn(async move {
1827        loop {
1828            match tesser_bybit::ws::connect_private(
1829                &ws_url,
1830                &creds,
1831                private_connection_flag.clone(),
1832            )
1833            .await
1834            {
1835                Ok(mut socket) => {
1836                    if let Some(flag) = &private_connection_flag {
1837                        flag.store(true, Ordering::SeqCst);
1838                    }
1839                    metrics.update_connection_status("private", true);
1840                    info!("Connected to Bybit private WebSocket stream");
1841                    for symbol in &symbols {
1842                        match exec_client.list_open_orders(symbol).await {
1843                            Ok(orders) => {
1844                                for order in orders {
1845                                    if let Err(err) =
1846                                        private_tx.send(BrokerEvent::OrderUpdate(order)).await
1847                                    {
1848                                        error!("failed to send reconciled order update: {err}");
1849                                    }
1850                                }
1851                            }
1852                            Err(e) => {
1853                                error!("failed to reconcile open orders for {symbol}: {e}");
1854                            }
1855                        }
1856                    }
1857                    if let Some(bybit) = exec_client.as_any().downcast_ref::<BybitClient>() {
1858                        let since = {
1859                            let guard = last_sync.lock().await;
1860                            guard.unwrap_or_else(|| Utc::now() - chrono::Duration::minutes(30))
1861                        };
1862                        match bybit.list_executions_since(since).await {
1863                            Ok(fills) => {
1864                                for fill in fills {
1865                                    if let Err(err) = private_tx.send(BrokerEvent::Fill(fill)).await
1866                                    {
1867                                        error!("failed to send reconciled fill: {err}");
1868                                    }
1869                                }
1870                            }
1871                            Err(e) => {
1872                                error!("failed to reconcile executions since {:?}: {}", since, e);
1873                            }
1874                        }
1875                        let mut guard = last_sync.lock().await;
1876                        *guard = Some(Utc::now());
1877                    }
1878
1879                    while let Some(msg) = socket.next().await {
1880                        if shutdown.triggered() {
1881                            break;
1882                        }
1883                        if let Ok(Message::Text(text)) = msg {
1884                            if let Ok(value) = serde_json::from_str::<serde_json::Value>(&text) {
1885                                if let Some(topic) = value.get("topic").and_then(|v| v.as_str()) {
1886                                    match topic {
1887                                        "order" => {
1888                                            if let Ok(msg) = serde_json::from_value::<
1889                                                PrivateMessage<BybitWsOrder>,
1890                                            >(
1891                                                value.clone()
1892                                            ) {
1893                                                for update in msg.data {
1894                                                    if let Ok(order) = update.to_tesser_order(None)
1895                                                    {
1896                                                        if let Err(err) = private_tx
1897                                                            .send(BrokerEvent::OrderUpdate(order))
1898                                                            .await
1899                                                        {
1900                                                            error!(
1901                                                                "failed to send private order update: {err}"
1902                                                            );
1903                                                        }
1904                                                    }
1905                                                }
1906                                            }
1907                                        }
1908                                        "execution" => {
1909                                            if let Ok(msg) = serde_json::from_value::<
1910                                                PrivateMessage<BybitWsExecution>,
1911                                            >(
1912                                                value.clone()
1913                                            ) {
1914                                                for exec in msg.data {
1915                                                    if let Ok(fill) = exec.to_tesser_fill() {
1916                                                        if let Err(err) = private_tx
1917                                                            .send(BrokerEvent::Fill(fill))
1918                                                            .await
1919                                                        {
1920                                                            error!(
1921                                                                "failed to send private fill event: {err}"
1922                                                            );
1923                                                        }
1924                                                    }
1925                                                }
1926                                            }
1927                                        }
1928                                        _ => {}
1929                                    }
1930                                }
1931                            }
1932                        }
1933                    }
1934                }
1935                Err(e) => {
1936                    if let Some(flag) = &private_connection_flag {
1937                        flag.store(false, Ordering::SeqCst);
1938                    }
1939                    metrics.update_connection_status("private", false);
1940                    error!("Bybit private WebSocket connection failed: {e}. Retrying...");
1941                    tokio::time::sleep(Duration::from_secs(5)).await;
1942                }
1943            }
1944            if shutdown.triggered() {
1945                break;
1946            }
1947        }
1948    });
1949}
1950
1951#[cfg(feature = "binance")]
1952#[allow(clippy::too_many_arguments)]
1953fn spawn_binance_private_stream(
1954    exec_client: Arc<dyn ExecutionClient>,
1955    ws_url: String,
1956    private_tx: mpsc::Sender<BrokerEvent>,
1957    private_connection_flag: Option<Arc<AtomicBool>>,
1958    metrics: Arc<LiveMetrics>,
1959    shutdown: ShutdownSignal,
1960) {
1961    tokio::spawn(async move {
1962        loop {
1963            let Some(binance) = exec_client
1964                .as_ref()
1965                .as_any()
1966                .downcast_ref::<BinanceClient>()
1967            else {
1968                warn!("execution client is not Binance");
1969                return;
1970            };
1971            let listen_key = match binance.start_user_stream().await {
1972                Ok(key) => key,
1973                Err(err) => {
1974                    error!("failed to start Binance user stream: {err}");
1975                    tokio::time::sleep(Duration::from_secs(5)).await;
1976                    continue;
1977                }
1978            };
1979            match BinanceUserDataStream::connect(&ws_url, &listen_key).await {
1980                Ok(user_stream) => {
1981                    if let Some(flag) = &private_connection_flag {
1982                        flag.store(true, Ordering::SeqCst);
1983                    }
1984                    metrics.update_connection_status("private", true);
1985                    let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
1986                    let tx_orders = private_tx.clone();
1987                    user_stream.on_event(move |event| {
1988                        if let Some(update) = extract_order_update(&event) {
1989                            if let Some(order) = order_from_update(update) {
1990                                let _ = tx_orders.blocking_send(BrokerEvent::OrderUpdate(order));
1991                            }
1992                            if let Some(fill) = fill_from_update(update) {
1993                                let _ = tx_orders.blocking_send(BrokerEvent::Fill(fill));
1994                            }
1995                        }
1996                        if matches!(event, UserDataStreamEventsResponse::ListenKeyExpired(_)) {
1997                            let _ = reconnect_tx.try_send(());
1998                        }
1999                    });
2000                    let keepalive_client = exec_client.clone();
2001                    let keepalive_handle = tokio::spawn(async move {
2002                        let mut interval = tokio::time::interval(Duration::from_secs(30 * 60));
2003                        loop {
2004                            interval.tick().await;
2005                            let Some(client) = keepalive_client
2006                                .as_ref()
2007                                .as_any()
2008                                .downcast_ref::<BinanceClient>()
2009                            else {
2010                                break;
2011                            };
2012                            if client.keepalive_user_stream().await.is_err() {
2013                                break;
2014                            }
2015                        }
2016                    });
2017                    tokio::select! {
2018                        _ = reconnect_rx.recv() => {
2019                            warn!("binance listen key expired; reconnecting");
2020                        }
2021                        _ = shutdown.wait() => {
2022                            keepalive_handle.abort();
2023                            let _ = user_stream.unsubscribe().await;
2024                            return;
2025                        }
2026                    }
2027                    keepalive_handle.abort();
2028                    let _ = user_stream.unsubscribe().await;
2029                }
2030                Err(err) => {
2031                    error!("failed to connect to Binance user stream: {err}");
2032                }
2033            }
2034            if let Some(flag) = &private_connection_flag {
2035                flag.store(false, Ordering::SeqCst);
2036            }
2037            metrics.update_connection_status("private", false);
2038            if shutdown.triggered() {
2039                break;
2040            }
2041            tokio::time::sleep(Duration::from_secs(5)).await;
2042        }
2043    });
2044}