tesser_cli/
live.rs

1use std::collections::{HashMap, HashSet};
2use std::net::SocketAddr;
3use std::path::PathBuf;
4use std::sync::{
5    atomic::{AtomicBool, Ordering},
6    Arc, Once,
7};
8use std::time::{Duration, Instant};
9
10use anyhow::{anyhow, bail, Context, Result};
11use chrono::{DateTime, Utc};
12use clap::ValueEnum;
13use futures::StreamExt;
14use rust_decimal::{prelude::ToPrimitive, Decimal};
15use tokio::sync::{mpsc, Mutex, Notify};
16use tokio::task::JoinHandle;
17use tokio_tungstenite::tungstenite::Message;
18use tracing::{error, info, trace, warn};
19
20use serde_json::{json, Value};
21
22fn ensure_builtin_connectors_registered() {
23    static INIT: Once = Once::new();
24    INIT.call_once(|| {
25        register_connector_factory(Arc::new(PaperFactory::default()));
26        #[cfg(feature = "bybit")]
27        register_bybit_factory();
28        #[cfg(feature = "binance")]
29        register_binance_factory();
30    });
31}
32
33#[cfg(feature = "binance")]
34use tesser_binance::{
35    fill_from_update, order_from_update, register_factory as register_binance_factory,
36    ws::{extract_order_update, BinanceUserDataStream, UserDataStreamEventsResponse},
37    BinanceClient,
38};
39use tesser_broker::{
40    get_connector_factory, register_connector_factory, BrokerResult, ConnectorFactory,
41    ConnectorStream, ConnectorStreamConfig, ExecutionClient,
42};
43#[cfg(feature = "bybit")]
44use tesser_bybit::ws::{BybitWsExecution, BybitWsOrder, PrivateMessage};
45#[cfg(feature = "bybit")]
46use tesser_bybit::{register_factory as register_bybit_factory, BybitClient, BybitCredentials};
47use tesser_config::{AlertingConfig, ExchangeConfig, RiskManagementConfig};
48use tesser_core::{
49    Candle, Fill, Interval, Order, OrderBook, OrderStatus, Position, Price, Quantity, Side, Signal,
50    Symbol, Tick,
51};
52use tesser_events::{
53    CandleEvent, Event, EventBus, FillEvent, OrderBookEvent, OrderUpdateEvent, SignalEvent,
54    TickEvent,
55};
56use tesser_execution::{
57    BasicRiskChecker, ExecutionEngine, FixedOrderSizer, OrderOrchestrator, PreTradeRiskChecker,
58    RiskContext, RiskLimits, SqliteAlgoStateRepository,
59};
60use tesser_markets::MarketRegistry;
61use tesser_paper::{PaperExecutionClient, PaperFactory};
62use tesser_portfolio::{
63    LiveState, Portfolio, PortfolioConfig, SqliteStateRepository, StateRepository,
64};
65use tesser_strategy::{Strategy, StrategyContext};
66
67use crate::alerts::{AlertDispatcher, AlertManager};
68use crate::telemetry::{spawn_metrics_server, LiveMetrics};
69use crate::PublicChannel;
70
71/// Unified event type for asynchronous updates from the broker.
72#[derive(Debug)]
73pub enum BrokerEvent {
74    OrderUpdate(Order),
75    Fill(Fill),
76}
77
78#[derive(Clone, Copy, Debug, Eq, PartialEq, ValueEnum)]
79#[value(rename_all = "kebab-case")]
80pub enum ExecutionBackend {
81    Paper,
82    Live,
83}
84
85impl ExecutionBackend {
86    fn is_paper(self) -> bool {
87        matches!(self, Self::Paper)
88    }
89}
90
91const DEFAULT_ORDER_BOOK_DEPTH: usize = 50;
92
93pub const fn default_order_book_depth() -> usize {
94    DEFAULT_ORDER_BOOK_DEPTH
95}
96const STRATEGY_LOCK_WARN_THRESHOLD: Duration = Duration::from_millis(25);
97const STRATEGY_CALL_WARN_THRESHOLD: Duration = Duration::from_millis(250);
98
99#[async_trait::async_trait]
100trait LiveMarketStream: Send {
101    async fn next_tick(&mut self) -> BrokerResult<Option<Tick>>;
102    async fn next_candle(&mut self) -> BrokerResult<Option<Candle>>;
103    async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>>;
104}
105
106struct FactoryStreamAdapter {
107    inner: Box<dyn ConnectorStream>,
108}
109
110impl FactoryStreamAdapter {
111    fn new(inner: Box<dyn ConnectorStream>) -> Self {
112        Self { inner }
113    }
114}
115
116#[async_trait::async_trait]
117impl LiveMarketStream for FactoryStreamAdapter {
118    async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
119        self.inner.next_tick().await
120    }
121
122    async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
123        self.inner.next_candle().await
124    }
125
126    async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>> {
127        self.inner.next_order_book().await
128    }
129}
130
131pub struct LiveSessionSettings {
132    pub category: PublicChannel,
133    pub interval: Interval,
134    pub quantity: Quantity,
135    pub slippage_bps: Decimal,
136    pub fee_bps: Decimal,
137    pub history: usize,
138    pub metrics_addr: SocketAddr,
139    pub state_path: PathBuf,
140    pub initial_balances: HashMap<Symbol, Decimal>,
141    pub reporting_currency: Symbol,
142    pub markets_file: Option<PathBuf>,
143    pub alerting: AlertingConfig,
144    pub exec_backend: ExecutionBackend,
145    pub risk: RiskManagementConfig,
146    pub reconciliation_interval: Duration,
147    pub reconciliation_threshold: Decimal,
148    pub driver: String,
149    pub orderbook_depth: usize,
150}
151
152impl LiveSessionSettings {
153    fn risk_limits(&self) -> RiskLimits {
154        RiskLimits {
155            max_order_quantity: self.risk.max_order_quantity.max(Decimal::ZERO),
156            max_position_quantity: self.risk.max_position_quantity.max(Decimal::ZERO),
157        }
158    }
159}
160
161pub async fn run_live(
162    strategy: Box<dyn Strategy>,
163    symbols: Vec<String>,
164    exchange: ExchangeConfig,
165    settings: LiveSessionSettings,
166) -> Result<()> {
167    run_live_with_shutdown(strategy, symbols, exchange, settings, ShutdownSignal::new()).await
168}
169
170/// Variant of [`run_live`] that accepts a manually controlled shutdown signal.
171pub async fn run_live_with_shutdown(
172    strategy: Box<dyn Strategy>,
173    symbols: Vec<String>,
174    exchange: ExchangeConfig,
175    settings: LiveSessionSettings,
176    shutdown: ShutdownSignal,
177) -> Result<()> {
178    if symbols.is_empty() {
179        return Err(anyhow!("strategy did not declare any subscriptions"));
180    }
181    if settings.quantity <= Decimal::ZERO {
182        return Err(anyhow!("--quantity must be positive"));
183    }
184
185    let public_connection = Arc::new(AtomicBool::new(false));
186    let private_connection = if matches!(settings.exec_backend, ExecutionBackend::Live) {
187        Some(Arc::new(AtomicBool::new(false)))
188    } else {
189        None
190    };
191    ensure_builtin_connectors_registered();
192    let connector_payload = build_exchange_payload(&exchange, &settings);
193    let connector_factory = get_connector_factory(&settings.driver)
194        .ok_or_else(|| anyhow!("driver {} is not registered", settings.driver))?;
195    let stream_config = ConnectorStreamConfig {
196        ws_url: Some(exchange.ws_url.clone()),
197        metadata: json!({
198            "category": settings.category.as_path(),
199            "symbols": symbols.clone(),
200            "orderbook_depth": settings.orderbook_depth,
201        }),
202        connection_status: Some(public_connection.clone()),
203    };
204    let mut connector_stream = connector_factory
205        .create_market_stream(&connector_payload, stream_config)
206        .await
207        .map_err(|err| anyhow!("failed to create market stream: {err}"))?;
208    connector_stream
209        .subscribe(&symbols, settings.interval)
210        .await
211        .map_err(|err| anyhow!("failed to subscribe via connector: {err}"))?;
212    let market_stream: Box<dyn LiveMarketStream> =
213        Box::new(FactoryStreamAdapter::new(connector_stream));
214
215    let execution_client =
216        build_execution_client(&settings, connector_factory.clone(), &connector_payload).await?;
217    let market_registry = load_market_registry(execution_client.clone(), &settings).await?;
218    if matches!(settings.exec_backend, ExecutionBackend::Live) {
219        info!(
220            rest = %exchange.rest_url,
221            driver = ?settings.driver,
222            "live execution enabled via {:?} REST",
223            settings.driver
224        );
225    }
226    let risk_checker: Arc<dyn PreTradeRiskChecker> =
227        Arc::new(BasicRiskChecker::new(settings.risk_limits()));
228    let execution = ExecutionEngine::new(
229        execution_client.clone(),
230        Box::new(FixedOrderSizer {
231            quantity: settings.quantity,
232        }),
233        risk_checker,
234    );
235
236    // Create algorithm state repository
237    let algo_repo_path = settings.state_path.with_extension("algos.db");
238    let algo_state_repo = Arc::new(SqliteAlgoStateRepository::new(&algo_repo_path)?);
239
240    // Create orchestrator with execution engine
241    let orchestrator = OrderOrchestrator::new(Arc::new(execution), algo_state_repo).await?;
242
243    let runtime = LiveRuntime::new(
244        market_stream,
245        strategy,
246        symbols,
247        orchestrator,
248        settings,
249        exchange.ws_url.clone(),
250        market_registry,
251        shutdown,
252        public_connection,
253        private_connection,
254    )
255    .await?;
256    runtime.run().await
257}
258
259async fn build_execution_client(
260    settings: &LiveSessionSettings,
261    connector_factory: Arc<dyn ConnectorFactory>,
262    connector_payload: &Value,
263) -> Result<Arc<dyn ExecutionClient>> {
264    match settings.exec_backend {
265        ExecutionBackend::Paper => {
266            if settings.driver == "paper" {
267                return connector_factory
268                    .create_execution_client(connector_payload)
269                    .await
270                    .map_err(|err| anyhow!("failed to create execution client: {err}"));
271            }
272            Ok(Arc::new(PaperExecutionClient::new(
273                "paper".to_string(),
274                vec!["BTCUSDT".to_string()],
275                settings.slippage_bps,
276                settings.fee_bps,
277            )))
278        }
279        ExecutionBackend::Live => connector_factory
280            .create_execution_client(connector_payload)
281            .await
282            .map_err(|err| anyhow!("failed to create execution client: {err}")),
283    }
284}
285
286struct LiveRuntime {
287    market: Box<dyn LiveMarketStream>,
288    orchestrator: Arc<OrderOrchestrator>,
289    state_repo: Arc<dyn StateRepository>,
290    persisted: Arc<Mutex<LiveState>>,
291    event_bus: Arc<EventBus>,
292    shutdown: ShutdownSignal,
293    metrics_task: JoinHandle<()>,
294    alert_task: Option<JoinHandle<()>>,
295    reconciliation_task: Option<JoinHandle<()>>,
296    reconciliation_ctx: Option<Arc<ReconciliationContext>>,
297    private_event_rx: mpsc::Receiver<BrokerEvent>,
298    #[allow(dead_code)]
299    last_private_sync: Arc<tokio::sync::Mutex<Option<DateTime<Utc>>>>,
300    subscriber_handles: Vec<JoinHandle<()>>,
301    connection_monitors: Vec<JoinHandle<()>>,
302    order_timeout_task: JoinHandle<()>,
303    strategy: Arc<Mutex<Box<dyn Strategy>>>,
304    _public_connection: Arc<AtomicBool>,
305    _private_connection: Option<Arc<AtomicBool>>,
306}
307
308impl LiveRuntime {
309    #[allow(clippy::too_many_arguments)]
310    async fn new(
311        market: Box<dyn LiveMarketStream>,
312        mut strategy: Box<dyn Strategy>,
313        symbols: Vec<String>,
314        orchestrator: OrderOrchestrator,
315        settings: LiveSessionSettings,
316        #[cfg_attr(not(feature = "binance"), allow(unused_variables))] exchange_ws_url: String,
317        market_registry: Arc<MarketRegistry>,
318        shutdown: ShutdownSignal,
319        public_connection: Arc<AtomicBool>,
320        private_connection: Option<Arc<AtomicBool>>,
321    ) -> Result<Self> {
322        let mut strategy_ctx = StrategyContext::new(settings.history);
323        let state_repo: Arc<dyn StateRepository> =
324            Arc::new(SqliteStateRepository::new(settings.state_path.clone()));
325        let mut persisted = match tokio::task::spawn_blocking({
326            let repo = state_repo.clone();
327            move || repo.load()
328        })
329        .await
330        {
331            Ok(Ok(state)) => state,
332            Ok(Err(err)) => {
333                warn!(error = %err, "failed to load live state; starting from defaults");
334                LiveState::default()
335            }
336            Err(err) => {
337                warn!(error = %err, "state load task failed; starting from defaults");
338                LiveState::default()
339            }
340        };
341        let mut live_bootstrap = None;
342        if matches!(settings.exec_backend, ExecutionBackend::Live) {
343            info!("Synchronizing portfolio state from exchange");
344            let execution_client = orchestrator.execution_engine().client();
345            let positions = execution_client
346                .positions()
347                .await
348                .context("failed to fetch remote positions")?;
349            let balances = execution_client
350                .account_balances()
351                .await
352                .context("failed to fetch remote account balances")?;
353            let mut open_orders = Vec::new();
354            for symbol in &symbols {
355                let mut symbol_orders = execution_client
356                    .list_open_orders(symbol)
357                    .await
358                    .with_context(|| format!("failed to fetch open orders for {symbol}"))?;
359                open_orders.append(&mut symbol_orders);
360            }
361            persisted.open_orders = open_orders;
362            live_bootstrap = Some((positions, balances));
363        }
364
365        let portfolio_cfg = PortfolioConfig {
366            initial_balances: settings.initial_balances.clone(),
367            reporting_currency: settings.reporting_currency.clone(),
368            max_drawdown: Some(settings.risk.max_drawdown),
369        };
370        let portfolio = if let Some((positions, balances)) = live_bootstrap {
371            Portfolio::from_exchange_state(
372                positions,
373                balances,
374                portfolio_cfg.clone(),
375                market_registry.clone(),
376            )
377        } else if let Some(snapshot) = persisted.portfolio.take() {
378            Portfolio::from_state(snapshot, portfolio_cfg.clone(), market_registry.clone())
379        } else {
380            Portfolio::new(portfolio_cfg.clone(), market_registry.clone())
381        };
382        strategy_ctx.update_positions(portfolio.positions());
383        // Restore strategy state if found in persistence
384        if let Some(state) = persisted.strategy_state.take() {
385            info!("restoring strategy state from persistence");
386            strategy
387                .restore(state)
388                .context("failed to restore strategy state")?;
389        }
390        persisted.portfolio = Some(portfolio.snapshot());
391
392        let mut market_snapshots = HashMap::new();
393        for symbol in &symbols {
394            let mut snapshot = MarketSnapshot::default();
395            if let Some(price) = persisted.last_prices.get(symbol).copied() {
396                snapshot.last_trade = Some(price);
397            }
398            market_snapshots.insert(symbol.clone(), snapshot);
399        }
400
401        let metrics = LiveMetrics::new();
402        metrics.update_connection_status("public", public_connection.load(Ordering::SeqCst));
403        if let Some(flag) = &private_connection {
404            metrics.update_connection_status("private", flag.load(Ordering::SeqCst));
405        }
406        let metrics_task = spawn_metrics_server(metrics.registry(), settings.metrics_addr);
407        let dispatcher = AlertDispatcher::new(settings.alerting.webhook_url.clone());
408        let alerts = AlertManager::new(
409            settings.alerting,
410            dispatcher,
411            Some(public_connection.clone()),
412            private_connection.clone(),
413        );
414        let (private_event_tx, private_event_rx) = mpsc::channel(1024);
415        let last_private_sync = Arc::new(tokio::sync::Mutex::new(persisted.last_candle_ts));
416        let alerts = Arc::new(alerts);
417        let alert_task = alerts.spawn_watchdog();
418        let metrics = Arc::new(metrics);
419        let mut connection_monitors = Vec::new();
420        connection_monitors.push(spawn_connection_monitor(
421            shutdown.clone(),
422            public_connection.clone(),
423            metrics.clone(),
424            "public",
425        ));
426        if let Some(flag) = private_connection.clone() {
427            connection_monitors.push(spawn_connection_monitor(
428                shutdown.clone(),
429                flag,
430                metrics.clone(),
431                "private",
432            ));
433        }
434
435        if !settings.exec_backend.is_paper() {
436            let execution_engine = orchestrator.execution_engine();
437            let exec_client = execution_engine.client();
438            match settings.driver.as_str() {
439                "bybit" | "" => {
440                    #[cfg(feature = "bybit")]
441                    {
442                        let bybit = exec_client
443                            .as_ref()
444                            .as_any()
445                            .downcast_ref::<BybitClient>()
446                            .ok_or_else(|| anyhow!("execution client is not Bybit"))?;
447                        let creds = bybit
448                            .get_credentials()
449                            .ok_or_else(|| anyhow!("live execution requires Bybit credentials"))?;
450                        spawn_bybit_private_stream(
451                            creds,
452                            bybit.get_ws_url(),
453                            private_event_tx.clone(),
454                            exec_client.clone(),
455                            symbols.clone(),
456                            last_private_sync.clone(),
457                            private_connection.clone(),
458                            metrics.clone(),
459                            shutdown.clone(),
460                        );
461                    }
462                    #[cfg(not(feature = "bybit"))]
463                    {
464                        bail!("driver 'bybit' is unavailable without the 'bybit' feature");
465                    }
466                }
467                "binance" => {
468                    #[cfg(feature = "binance")]
469                    {
470                        spawn_binance_private_stream(
471                            exec_client.clone(),
472                            exchange_ws_url.clone(),
473                            private_event_tx.clone(),
474                            private_connection.clone(),
475                            metrics.clone(),
476                            shutdown.clone(),
477                        );
478                    }
479                    #[cfg(not(feature = "binance"))]
480                    {
481                        bail!("driver 'binance' is unavailable without the 'binance' feature");
482                    }
483                }
484                "paper" => {}
485                other => {
486                    bail!("private stream unsupported for driver '{other}'");
487                }
488            }
489        }
490
491        let strategy = Arc::new(Mutex::new(strategy));
492        let strategy_ctx = Arc::new(Mutex::new(strategy_ctx));
493        let portfolio = Arc::new(Mutex::new(portfolio));
494        let market_cache = Arc::new(Mutex::new(market_snapshots));
495        let persisted = Arc::new(Mutex::new(persisted));
496        let orchestrator = Arc::new(orchestrator);
497        let event_bus = Arc::new(EventBus::new(2048));
498        let reconciliation_ctx = (!settings.exec_backend.is_paper()).then(|| {
499            Arc::new(ReconciliationContext::new(ReconciliationContextConfig {
500                client: orchestrator.execution_engine().client(),
501                portfolio: portfolio.clone(),
502                persisted: persisted.clone(),
503                state_repo: state_repo.clone(),
504                alerts: alerts.clone(),
505                metrics: metrics.clone(),
506                reporting_currency: settings.reporting_currency.clone(),
507                threshold: settings.reconciliation_threshold,
508            }))
509        });
510        let reconciliation_task = reconciliation_ctx.as_ref().map(|ctx| {
511            spawn_reconciliation_loop(
512                ctx.clone(),
513                shutdown.clone(),
514                settings.reconciliation_interval,
515            )
516        });
517        let subscriber_handles = spawn_event_subscribers(
518            event_bus.clone(),
519            strategy.clone(),
520            strategy_ctx.clone(),
521            orchestrator.clone(),
522            portfolio.clone(),
523            metrics.clone(),
524            alerts.clone(),
525            market_cache.clone(),
526            state_repo.clone(),
527            persisted.clone(),
528            settings.exec_backend,
529        );
530        let order_timeout_task = spawn_order_timeout_monitor(
531            orchestrator.clone(),
532            event_bus.clone(),
533            alerts.clone(),
534            shutdown.clone(),
535        );
536
537        info!(
538            symbols = ?symbols,
539            category = ?settings.category,
540            metrics_addr = %settings.metrics_addr,
541            state_path = %settings.state_path.display(),
542            history = settings.history,
543            "market stream ready"
544        );
545
546        for symbol in &symbols {
547            let ctx = shared_risk_context(symbol, &portfolio, &market_cache, &persisted).await;
548            orchestrator.update_risk_context(symbol.clone(), ctx);
549        }
550
551        Ok(Self {
552            market,
553            orchestrator,
554            state_repo,
555            persisted,
556            event_bus,
557            shutdown,
558            metrics_task,
559            alert_task,
560            reconciliation_task,
561            reconciliation_ctx,
562            private_event_rx,
563            last_private_sync,
564            subscriber_handles,
565            connection_monitors,
566            order_timeout_task,
567            strategy,
568            _public_connection: public_connection,
569            _private_connection: private_connection,
570        })
571    }
572
573    async fn run(mut self) -> Result<()> {
574        info!("live session started");
575        if let Some(ctx) = self.reconciliation_ctx.as_ref() {
576            perform_state_reconciliation(ctx.as_ref())
577                .await
578                .context("initial state reconciliation failed")?;
579        }
580        let backoff = Duration::from_millis(200);
581        let mut orchestrator_timer = tokio::time::interval(Duration::from_secs(1));
582
583        while !self.shutdown.triggered() {
584            let mut progressed = false;
585
586            if let Some(tick) = self.market.next_tick().await? {
587                progressed = true;
588                self.event_bus.publish(Event::Tick(TickEvent { tick }));
589            }
590
591            if let Some(candle) = self.market.next_candle().await? {
592                progressed = true;
593                self.event_bus
594                    .publish(Event::Candle(CandleEvent { candle }));
595            }
596
597            if let Some(book) = self.market.next_order_book().await? {
598                progressed = true;
599                self.event_bus
600                    .publish(Event::OrderBook(OrderBookEvent { order_book: book }));
601            }
602
603            tokio::select! {
604                biased;
605                Some(event) = self.private_event_rx.recv() => {
606                    progressed = true;
607                    match event {
608                        BrokerEvent::OrderUpdate(order) => {
609                            info!(
610                                order_id = %order.id,
611                                status = ?order.status,
612                                symbol = %order.request.symbol,
613                                "received private order update"
614                            );
615                            self.event_bus
616                                .publish(Event::OrderUpdate(OrderUpdateEvent { order }));
617                        }
618                        BrokerEvent::Fill(fill) => {
619                            info!(
620                                order_id = %fill.order_id,
621                                symbol = %fill.symbol,
622                                qty = %fill.fill_quantity,
623                                price = %fill.fill_price,
624                                "received private fill"
625                            );
626                            self.event_bus.publish(Event::Fill(FillEvent { fill }));
627                        }
628                    }
629                }
630                _ = orchestrator_timer.tick() => {
631                    // Drive TWAP and other time-based algorithms
632                    if let Err(e) = self.orchestrator.on_timer_tick().await {
633                        error!("Orchestrator timer tick failed: {}", e);
634                    }
635                }
636                else => {}
637            }
638
639            if !progressed && !self.shutdown.sleep(backoff).await {
640                break;
641            }
642        }
643        info!("live session stopping");
644        self.metrics_task.abort();
645        if let Some(handle) = self.alert_task.take() {
646            handle.abort();
647        }
648        if let Some(handle) = self.reconciliation_task.take() {
649            handle.abort();
650        }
651        self.order_timeout_task.abort();
652        for handle in self.subscriber_handles.drain(..) {
653            handle.abort();
654        }
655        for handle in self.connection_monitors.drain(..) {
656            handle.abort();
657        }
658        if let Err(err) = persist_state(
659            self.state_repo.clone(),
660            self.persisted.clone(),
661            Some(self.strategy.clone()),
662        )
663        .await
664        {
665            warn!(error = %err, "failed to persist shutdown state");
666        }
667        Ok(())
668    }
669}
670
671struct ReconciliationContext {
672    client: Arc<dyn ExecutionClient>,
673    portfolio: Arc<Mutex<Portfolio>>,
674    persisted: Arc<Mutex<LiveState>>,
675    state_repo: Arc<dyn StateRepository>,
676    alerts: Arc<AlertManager>,
677    metrics: Arc<LiveMetrics>,
678    reporting_currency: Symbol,
679    threshold: Decimal,
680}
681
682struct ReconciliationContextConfig {
683    client: Arc<dyn ExecutionClient>,
684    portfolio: Arc<Mutex<Portfolio>>,
685    persisted: Arc<Mutex<LiveState>>,
686    state_repo: Arc<dyn StateRepository>,
687    alerts: Arc<AlertManager>,
688    metrics: Arc<LiveMetrics>,
689    reporting_currency: Symbol,
690    threshold: Decimal,
691}
692
693impl ReconciliationContext {
694    fn new(config: ReconciliationContextConfig) -> Self {
695        let ReconciliationContextConfig {
696            client,
697            portfolio,
698            persisted,
699            state_repo,
700            alerts,
701            metrics,
702            reporting_currency,
703            threshold,
704        } = config;
705        let min_threshold = Decimal::new(1, 6); // 0.000001 as a practical floor
706        let threshold = if threshold <= Decimal::ZERO {
707            min_threshold
708        } else {
709            threshold
710        };
711        Self {
712            client,
713            portfolio,
714            persisted,
715            state_repo,
716            alerts,
717            metrics,
718            reporting_currency,
719            threshold,
720        }
721    }
722}
723
724fn spawn_reconciliation_loop(
725    ctx: Arc<ReconciliationContext>,
726    shutdown: ShutdownSignal,
727    interval: Duration,
728) -> JoinHandle<()> {
729    tokio::spawn(async move {
730        while shutdown.sleep(interval).await {
731            if let Err(err) = perform_state_reconciliation(ctx.as_ref()).await {
732                error!(error = %err, "periodic state reconciliation failed");
733            }
734        }
735    })
736}
737
738async fn perform_state_reconciliation(ctx: &ReconciliationContext) -> Result<()> {
739    info!("running state reconciliation");
740    let remote_positions = ctx
741        .client
742        .positions()
743        .await
744        .context("failed to fetch remote positions")?;
745    let remote_balances = ctx
746        .client
747        .account_balances()
748        .await
749        .context("failed to fetch remote balances")?;
750    let (local_positions, local_cash) = {
751        let guard = ctx.portfolio.lock().await;
752        (guard.positions(), guard.cash())
753    };
754
755    let remote_map = positions_to_map(remote_positions);
756    let local_map = positions_to_map(local_positions);
757    let mut tracked_symbols: HashSet<String> = HashSet::new();
758    tracked_symbols.extend(remote_map.keys().cloned());
759    tracked_symbols.extend(local_map.keys().cloned());
760
761    let mut severe_findings = Vec::new();
762    for symbol in tracked_symbols {
763        let local_qty = local_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
764        let remote_qty = remote_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
765        let diff = (local_qty - remote_qty).abs();
766        let diff_value = diff.to_f64().unwrap_or(0.0);
767        ctx.metrics.update_position_diff(&symbol, diff_value);
768        if diff > Decimal::ZERO {
769            warn!(
770                symbol = %symbol,
771                local = %local_qty,
772                remote = %remote_qty,
773                diff = %diff,
774                "position mismatch detected during reconciliation"
775            );
776            let pct = normalize_diff(diff, remote_qty);
777            if pct >= ctx.threshold {
778                error!(
779                    symbol = %symbol,
780                    local = %local_qty,
781                    remote = %remote_qty,
782                    diff = %diff,
783                    pct = %pct,
784                    "position mismatch exceeds threshold"
785                );
786                severe_findings.push(format!(
787                    "{symbol} local={local_qty} remote={remote_qty} diff={diff}"
788                ));
789            }
790        }
791    }
792
793    let reporting = ctx.reporting_currency.as_str();
794    let remote_cash = remote_balances
795        .iter()
796        .find(|balance| balance.currency == reporting)
797        .map(|balance| balance.available)
798        .unwrap_or_else(|| Decimal::ZERO);
799    let cash_diff = (remote_cash - local_cash).abs();
800    ctx.metrics
801        .update_balance_diff(reporting, cash_diff.to_f64().unwrap_or(0.0));
802    if cash_diff > Decimal::ZERO {
803        warn!(
804            currency = %reporting,
805            local = %local_cash,
806            remote = %remote_cash,
807            diff = %cash_diff,
808            "balance mismatch detected during reconciliation"
809        );
810        let pct = normalize_diff(cash_diff, remote_cash);
811        if pct >= ctx.threshold {
812            error!(
813                currency = %reporting,
814                local = %local_cash,
815                remote = %remote_cash,
816                diff = %cash_diff,
817                pct = %pct,
818                "balance mismatch exceeds threshold"
819            );
820            severe_findings.push(format!(
821                "{reporting} balance local={local_cash} remote={remote_cash} diff={cash_diff}"
822            ));
823        }
824    }
825
826    if severe_findings.is_empty() {
827        info!("state reconciliation complete with no critical divergence");
828        return Ok(());
829    }
830
831    let alert_body = severe_findings.join("; ");
832    ctx.alerts
833        .notify("State reconciliation divergence", &alert_body)
834        .await;
835    enforce_liquidate_only(ctx).await;
836    Ok(())
837}
838
839async fn enforce_liquidate_only(ctx: &ReconciliationContext) {
840    let snapshot = {
841        let mut guard = ctx.portfolio.lock().await;
842        if !guard.set_liquidate_only(true) {
843            return;
844        }
845        info!("entering liquidate-only mode due to reconciliation divergence");
846        guard.snapshot()
847    };
848    {
849        let mut state = ctx.persisted.lock().await;
850        state.portfolio = Some(snapshot);
851    }
852    if let Err(err) = persist_state(ctx.state_repo.clone(), ctx.persisted.clone(), None).await {
853        warn!(error = %err, "failed to persist liquidate-only transition");
854    }
855}
856
857fn positions_to_map(positions: Vec<Position>) -> HashMap<String, Decimal> {
858    let mut map = HashMap::new();
859    for position in positions {
860        map.insert(position.symbol.clone(), position_signed_qty(&position));
861    }
862    map
863}
864
865fn position_signed_qty(position: &Position) -> Decimal {
866    match position.side {
867        Some(Side::Buy) => position.quantity,
868        Some(Side::Sell) => -position.quantity,
869        None => Decimal::ZERO,
870    }
871}
872
873fn normalize_diff(diff: Decimal, reference: Decimal) -> Decimal {
874    if diff <= Decimal::ZERO {
875        Decimal::ZERO
876    } else {
877        let denominator = std::cmp::max(reference.abs(), Decimal::ONE);
878        diff / denominator
879    }
880}
881
882fn build_exchange_payload(exchange: &ExchangeConfig, settings: &LiveSessionSettings) -> Value {
883    let mut payload = serde_json::Map::new();
884    payload.insert("rest_url".into(), Value::String(exchange.rest_url.clone()));
885    payload.insert("ws_url".into(), Value::String(exchange.ws_url.clone()));
886    payload.insert("api_key".into(), Value::String(exchange.api_key.clone()));
887    payload.insert(
888        "api_secret".into(),
889        Value::String(exchange.api_secret.clone()),
890    );
891    payload.insert(
892        "category".into(),
893        Value::String(settings.category.as_path().to_string()),
894    );
895    payload.insert(
896        "orderbook_depth".into(),
897        Value::Number(serde_json::Number::from(settings.orderbook_depth as u64)),
898    );
899    if let Value::Object(extra) = exchange.params.clone() {
900        for (key, value) in extra {
901            payload.insert(key, value);
902        }
903    }
904    Value::Object(payload)
905}
906
907#[derive(Default)]
908struct MarketSnapshot {
909    last_trade: Option<Price>,
910    last_trade_ts: Option<DateTime<Utc>>,
911    last_candle: Option<Candle>,
912}
913
914impl MarketSnapshot {
915    fn price(&self) -> Option<Price> {
916        self.last_trade
917            .or_else(|| self.last_candle.as_ref().map(|c| c.close))
918    }
919}
920
921#[derive(Clone)]
922pub struct ShutdownSignal {
923    flag: Arc<AtomicBool>,
924    notify: Arc<Notify>,
925}
926
927impl ShutdownSignal {
928    pub fn new() -> Self {
929        let flag = Arc::new(AtomicBool::new(false));
930        let notify = Arc::new(Notify::new());
931        let flag_clone = flag.clone();
932        let notify_clone = notify.clone();
933        tokio::spawn(async move {
934            if tokio::signal::ctrl_c().await.is_ok() {
935                flag_clone.store(true, Ordering::SeqCst);
936                notify_clone.notify_waiters();
937            }
938        });
939        Self { flag, notify }
940    }
941
942    pub fn trigger(&self) {
943        self.flag.store(true, Ordering::SeqCst);
944        self.notify.notify_waiters();
945    }
946
947    fn triggered(&self) -> bool {
948        self.flag.load(Ordering::SeqCst)
949    }
950
951    #[cfg_attr(not(feature = "binance"), allow(dead_code))]
952    async fn wait(&self) {
953        if self.triggered() {
954            return;
955        }
956        self.notify.notified().await;
957    }
958
959    async fn sleep(&self, duration: Duration) -> bool {
960        tokio::select! {
961            _ = tokio::time::sleep(duration) => true,
962            _ = self.notify.notified() => false,
963        }
964    }
965}
966
967impl Default for ShutdownSignal {
968    fn default() -> Self {
969        Self::new()
970    }
971}
972
973#[allow(clippy::too_many_arguments)]
974fn spawn_event_subscribers(
975    bus: Arc<EventBus>,
976    strategy: Arc<Mutex<Box<dyn Strategy>>>,
977    strategy_ctx: Arc<Mutex<StrategyContext>>,
978    orchestrator: Arc<OrderOrchestrator>,
979    portfolio: Arc<Mutex<Portfolio>>,
980    metrics: Arc<LiveMetrics>,
981    alerts: Arc<AlertManager>,
982    market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
983    state_repo: Arc<dyn StateRepository>,
984    persisted: Arc<Mutex<LiveState>>,
985    exec_backend: ExecutionBackend,
986) -> Vec<JoinHandle<()>> {
987    let mut handles = Vec::new();
988
989    let market_bus = bus.clone();
990    let market_strategy = strategy.clone();
991    let market_ctx = strategy_ctx.clone();
992    let market_metrics = metrics.clone();
993    let market_alerts = alerts.clone();
994    let market_state = state_repo.clone();
995    let market_persisted = persisted.clone();
996    let market_portfolio = portfolio.clone();
997    let market_snapshot = market.clone();
998    let orchestrator_clone = orchestrator.clone();
999    handles.push(tokio::spawn(async move {
1000        let mut stream = market_bus.subscribe();
1001        loop {
1002            match stream.recv().await {
1003                Ok(Event::Tick(evt)) => {
1004                    if let Err(err) = process_tick_event(
1005                        evt.tick,
1006                        market_strategy.clone(),
1007                        market_ctx.clone(),
1008                        market_metrics.clone(),
1009                        market_alerts.clone(),
1010                        market_snapshot.clone(),
1011                        market_portfolio.clone(),
1012                        market_state.clone(),
1013                        market_persisted.clone(),
1014                        market_bus.clone(),
1015                    )
1016                    .await
1017                    {
1018                        warn!(error = %err, "tick handler failed");
1019                    }
1020                }
1021                Ok(Event::Candle(evt)) => {
1022                    if let Err(err) = process_candle_event(
1023                        evt.candle,
1024                        market_strategy.clone(),
1025                        market_ctx.clone(),
1026                        market_metrics.clone(),
1027                        market_alerts.clone(),
1028                        market_snapshot.clone(),
1029                        market_portfolio.clone(),
1030                        orchestrator_clone.clone(),
1031                        exec_backend,
1032                        market_state.clone(),
1033                        market_persisted.clone(),
1034                        market_bus.clone(),
1035                    )
1036                    .await
1037                    {
1038                        warn!(error = %err, "candle handler failed");
1039                    }
1040                }
1041                Ok(Event::OrderBook(evt)) => {
1042                    if let Err(err) = process_order_book_event(
1043                        evt.order_book,
1044                        market_strategy.clone(),
1045                        market_ctx.clone(),
1046                        market_metrics.clone(),
1047                        market_alerts.clone(),
1048                        market_snapshot.clone(),
1049                        market_bus.clone(),
1050                    )
1051                    .await
1052                    {
1053                        warn!(error = %err, "order book handler failed");
1054                    }
1055                }
1056                Ok(_) => {}
1057                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1058                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1059                    warn!(lag = lag, "market subscriber lagged");
1060                    continue;
1061                }
1062            }
1063        }
1064    }));
1065
1066    let exec_bus = bus.clone();
1067    let exec_portfolio = portfolio.clone();
1068    let exec_market = market.clone();
1069    let exec_persisted = persisted.clone();
1070    let exec_alerts = alerts.clone();
1071    let exec_metrics = metrics.clone();
1072    let exec_orchestrator = orchestrator.clone();
1073    handles.push(tokio::spawn(async move {
1074        let orchestrator = exec_orchestrator.clone();
1075        let mut stream = exec_bus.subscribe();
1076        loop {
1077            match stream.recv().await {
1078                Ok(Event::Signal(evt)) => {
1079                    if let Err(err) = process_signal_event(
1080                        evt.signal,
1081                        orchestrator.clone(),
1082                        exec_portfolio.clone(),
1083                        exec_market.clone(),
1084                        exec_persisted.clone(),
1085                        exec_alerts.clone(),
1086                        exec_metrics.clone(),
1087                    )
1088                    .await
1089                    {
1090                        warn!(error = %err, "signal handler failed");
1091                    }
1092                }
1093                Ok(_) => {}
1094                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1095                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1096                    warn!(lag = lag, "signal subscriber lagged");
1097                    continue;
1098                }
1099            }
1100        }
1101    }));
1102
1103    let fill_bus = bus.clone();
1104    let fill_state = state_repo.clone();
1105    let fill_orchestrator = orchestrator.clone();
1106    let fill_persisted = persisted.clone();
1107    let fill_alerts = alerts.clone();
1108    handles.push(tokio::spawn(async move {
1109        let orchestrator = fill_orchestrator.clone();
1110        let persisted = fill_persisted.clone();
1111        let mut stream = fill_bus.subscribe();
1112        loop {
1113            match stream.recv().await {
1114                Ok(Event::Fill(evt)) => {
1115                    if let Err(err) = process_fill_event(
1116                        evt.fill,
1117                        portfolio.clone(),
1118                        strategy.clone(),
1119                        strategy_ctx.clone(),
1120                        orchestrator.clone(),
1121                        metrics.clone(),
1122                        fill_alerts.clone(),
1123                        fill_state.clone(),
1124                        persisted.clone(),
1125                    )
1126                    .await
1127                    {
1128                        warn!(error = %err, "fill handler failed");
1129                    }
1130                }
1131                Ok(_) => {}
1132                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1133                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1134                    warn!(lag = lag, "fill subscriber lagged");
1135                    continue;
1136                }
1137            }
1138        }
1139    }));
1140
1141    let order_bus = bus.clone();
1142    let order_persisted = persisted.clone();
1143    let order_alerts = alerts.clone();
1144    let order_orchestrator = orchestrator.clone();
1145    // Note: We don't pass strategy to order update handler to avoid lock contention
1146    // on high-frequency updates. Strategy state is snapshotted on candles/fills.
1147    handles.push(tokio::spawn(async move {
1148        let orchestrator = order_orchestrator.clone();
1149        let persisted = order_persisted.clone();
1150        let mut stream = order_bus.subscribe();
1151        loop {
1152            match stream.recv().await {
1153                Ok(Event::OrderUpdate(evt)) => {
1154                    if let Err(err) = process_order_update_event(
1155                        evt.order,
1156                        orchestrator.clone(),
1157                        order_alerts.clone(),
1158                        state_repo.clone(),
1159                        persisted.clone(),
1160                    )
1161                    .await
1162                    {
1163                        warn!(error = %err, "order update handler failed");
1164                    }
1165                }
1166                Ok(_) => {}
1167                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1168                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1169                    warn!(lag = lag, "order subscriber lagged");
1170                    continue;
1171                }
1172            }
1173        }
1174    }));
1175
1176    handles
1177}
1178
1179#[allow(clippy::too_many_arguments)]
1180async fn process_tick_event(
1181    tick: Tick,
1182    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1183    strategy_ctx: Arc<Mutex<StrategyContext>>,
1184    metrics: Arc<LiveMetrics>,
1185    alerts: Arc<AlertManager>,
1186    market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1187    portfolio: Arc<Mutex<Portfolio>>,
1188    state_repo: Arc<dyn StateRepository>,
1189    persisted: Arc<Mutex<LiveState>>,
1190    bus: Arc<EventBus>,
1191) -> Result<()> {
1192    metrics.inc_tick();
1193    metrics.update_staleness(0.0);
1194    metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1195    alerts.heartbeat().await;
1196    {
1197        let mut guard = market.lock().await;
1198        if let Some(snapshot) = guard.get_mut(&tick.symbol) {
1199            snapshot.last_trade = Some(tick.price);
1200            snapshot.last_trade_ts = Some(tick.exchange_timestamp);
1201        }
1202    }
1203    let mut drawdown_triggered = false;
1204    let mut snapshot_on_trigger = None;
1205    {
1206        let mut guard = portfolio.lock().await;
1207        let was_liquidate_only = guard.liquidate_only();
1208        match guard.update_market_data(&tick.symbol, tick.price) {
1209            Ok(_) => {
1210                if !was_liquidate_only && guard.liquidate_only() {
1211                    drawdown_triggered = true;
1212                    snapshot_on_trigger = Some(guard.snapshot());
1213                }
1214            }
1215            Err(err) => {
1216                warn!(
1217                    symbol = %tick.symbol,
1218                    error = %err,
1219                    "failed to refresh market data"
1220                );
1221            }
1222        }
1223    }
1224    {
1225        let mut state = persisted.lock().await;
1226        state.last_prices.insert(tick.symbol.clone(), tick.price);
1227        if drawdown_triggered {
1228            if let Some(snapshot) = snapshot_on_trigger.take() {
1229                state.portfolio = Some(snapshot);
1230            }
1231        }
1232    }
1233    if drawdown_triggered {
1234        persist_state(
1235            state_repo.clone(),
1236            persisted.clone(),
1237            Some(strategy.clone()),
1238        )
1239        .await?;
1240        alert_liquidate_only(alerts.clone()).await;
1241    }
1242    {
1243        let mut ctx = strategy_ctx.lock().await;
1244        ctx.push_tick(tick.clone());
1245        let lock_start = Instant::now();
1246        let mut strat = strategy.lock().await;
1247        log_strategy_lock("tick", lock_start.elapsed());
1248        let call_start = Instant::now();
1249        strat
1250            .on_tick(&ctx, &tick)
1251            .await
1252            .context("strategy failure on tick event")?;
1253        log_strategy_call("tick", call_start.elapsed());
1254    }
1255    emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1256    Ok(())
1257}
1258
1259#[allow(clippy::too_many_arguments)]
1260async fn process_candle_event(
1261    candle: Candle,
1262    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1263    strategy_ctx: Arc<Mutex<StrategyContext>>,
1264    metrics: Arc<LiveMetrics>,
1265    alerts: Arc<AlertManager>,
1266    market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1267    portfolio: Arc<Mutex<Portfolio>>,
1268    orchestrator: Arc<OrderOrchestrator>,
1269    exec_backend: ExecutionBackend,
1270    state_repo: Arc<dyn StateRepository>,
1271    persisted: Arc<Mutex<LiveState>>,
1272    bus: Arc<EventBus>,
1273) -> Result<()> {
1274    metrics.inc_candle();
1275    metrics.update_staleness(0.0);
1276    metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1277    alerts.heartbeat().await;
1278    metrics.update_price(&candle.symbol, candle.close.to_f64().unwrap_or(0.0));
1279    {
1280        let mut guard = market.lock().await;
1281        if let Some(snapshot) = guard.get_mut(&candle.symbol) {
1282            snapshot.last_candle = Some(candle.clone());
1283            snapshot.last_trade = Some(candle.close);
1284        }
1285    }
1286    if exec_backend.is_paper() {
1287        let client = orchestrator.execution_engine().client();
1288        if let Some(paper) = client.as_any().downcast_ref::<PaperExecutionClient>() {
1289            paper.update_price(&candle.symbol, candle.close);
1290        }
1291    }
1292    let mut candle_drawdown_triggered = false;
1293    let mut candle_snapshot = None;
1294    {
1295        let mut guard = portfolio.lock().await;
1296        let was_liquidate_only = guard.liquidate_only();
1297        match guard.update_market_data(&candle.symbol, candle.close) {
1298            Ok(_) => {
1299                if !was_liquidate_only && guard.liquidate_only() {
1300                    candle_drawdown_triggered = true;
1301                    candle_snapshot = Some(guard.snapshot());
1302                }
1303            }
1304            Err(err) => {
1305                warn!(
1306                    symbol = %candle.symbol,
1307                    error = %err,
1308                    "failed to refresh market data"
1309                );
1310            }
1311        }
1312    }
1313    if candle_drawdown_triggered {
1314        if let Some(snapshot) = candle_snapshot.take() {
1315            let mut persisted_guard = persisted.lock().await;
1316            persisted_guard.portfolio = Some(snapshot);
1317        }
1318        alert_liquidate_only(alerts.clone()).await;
1319    }
1320    {
1321        let mut ctx = strategy_ctx.lock().await;
1322        ctx.push_candle(candle.clone());
1323        let lock_start = Instant::now();
1324        let mut strat = strategy.lock().await;
1325        log_strategy_lock("candle", lock_start.elapsed());
1326        let call_start = Instant::now();
1327        strat
1328            .on_candle(&ctx, &candle)
1329            .await
1330            .context("strategy failure on candle event")?;
1331        log_strategy_call("candle", call_start.elapsed());
1332    }
1333    {
1334        let mut snapshot = persisted.lock().await;
1335        snapshot.last_candle_ts = Some(candle.timestamp);
1336        snapshot
1337            .last_prices
1338            .insert(candle.symbol.clone(), candle.close);
1339    }
1340    persist_state(
1341        state_repo.clone(),
1342        persisted.clone(),
1343        Some(strategy.clone()),
1344    )
1345    .await?;
1346    let ctx = shared_risk_context(&candle.symbol, &portfolio, &market, &persisted).await;
1347    orchestrator.update_risk_context(candle.symbol.clone(), ctx);
1348    emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1349    Ok(())
1350}
1351
1352async fn process_order_book_event(
1353    book: OrderBook,
1354    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1355    strategy_ctx: Arc<Mutex<StrategyContext>>,
1356    metrics: Arc<LiveMetrics>,
1357    alerts: Arc<AlertManager>,
1358    _market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1359    bus: Arc<EventBus>,
1360) -> Result<()> {
1361    metrics.update_staleness(0.0);
1362    alerts.heartbeat().await;
1363    {
1364        let mut ctx = strategy_ctx.lock().await;
1365        ctx.push_order_book(book.clone());
1366        let lock_start = Instant::now();
1367        let mut strat = strategy.lock().await;
1368        log_strategy_lock("order_book", lock_start.elapsed());
1369        let call_start = Instant::now();
1370        strat
1371            .on_order_book(&ctx, &book)
1372            .await
1373            .context("strategy failure on order book")?;
1374        log_strategy_call("order_book", call_start.elapsed());
1375    }
1376    emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1377    Ok(())
1378}
1379
1380async fn process_signal_event(
1381    signal: Signal,
1382    orchestrator: Arc<OrderOrchestrator>,
1383    portfolio: Arc<Mutex<Portfolio>>,
1384    market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1385    persisted: Arc<Mutex<LiveState>>,
1386    alerts: Arc<AlertManager>,
1387    metrics: Arc<LiveMetrics>,
1388) -> Result<()> {
1389    let ctx = shared_risk_context(&signal.symbol, &portfolio, &market, &persisted).await;
1390    orchestrator.update_risk_context(signal.symbol.clone(), ctx);
1391    match orchestrator.on_signal(&signal, &ctx).await {
1392        Ok(()) => {
1393            alerts.reset_order_failures().await;
1394        }
1395        Err(err) => {
1396            metrics.inc_order_failure();
1397            alerts
1398                .order_failure(&format!("orchestrator error: {err}"))
1399                .await;
1400        }
1401    }
1402    Ok(())
1403}
1404
1405fn log_strategy_lock(event: &str, wait: Duration) {
1406    let wait_ms = wait.as_secs_f64() * 1000.0;
1407    if wait >= STRATEGY_LOCK_WARN_THRESHOLD {
1408        warn!(target: "strategy", event, wait_ms, "strategy lock wait exceeded threshold");
1409    } else {
1410        trace!(target: "strategy", event, wait_ms, "strategy lock acquired");
1411    }
1412}
1413
1414fn log_strategy_call(event: &str, elapsed: Duration) {
1415    let duration_ms = elapsed.as_secs_f64() * 1000.0;
1416    if elapsed >= STRATEGY_CALL_WARN_THRESHOLD {
1417        warn!(target: "strategy", event, duration_ms, "strategy call latency above threshold");
1418    } else {
1419        trace!(target: "strategy", event, duration_ms, "strategy call completed");
1420    }
1421}
1422
1423#[allow(clippy::too_many_arguments)]
1424async fn process_fill_event(
1425    fill: Fill,
1426    portfolio: Arc<Mutex<Portfolio>>,
1427    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1428    strategy_ctx: Arc<Mutex<StrategyContext>>,
1429    orchestrator: Arc<OrderOrchestrator>,
1430    metrics: Arc<LiveMetrics>,
1431    alerts: Arc<AlertManager>,
1432    state_repo: Arc<dyn StateRepository>,
1433    persisted: Arc<Mutex<LiveState>>,
1434) -> Result<()> {
1435    let mut drawdown_triggered = false;
1436    {
1437        let mut guard = portfolio.lock().await;
1438        let was_liquidate_only = guard.liquidate_only();
1439        guard
1440            .apply_fill(&fill)
1441            .context("Failed to apply fill to portfolio")?;
1442        if !was_liquidate_only && guard.liquidate_only() {
1443            drawdown_triggered = true;
1444        }
1445        let snapshot = guard.snapshot();
1446        let mut persisted_guard = persisted.lock().await;
1447        persisted_guard.portfolio = Some(snapshot);
1448    }
1449    {
1450        let positions = {
1451            let guard = portfolio.lock().await;
1452            guard.positions()
1453        };
1454        let mut ctx = strategy_ctx.lock().await;
1455        ctx.update_positions(positions);
1456    }
1457    orchestrator.on_fill(&fill).await.ok();
1458    {
1459        let ctx = strategy_ctx.lock().await;
1460        let lock_start = Instant::now();
1461        let mut strat = strategy.lock().await;
1462        log_strategy_lock("fill", lock_start.elapsed());
1463        let call_start = Instant::now();
1464        strat
1465            .on_fill(&ctx, &fill)
1466            .await
1467            .context("Strategy failed on fill event")?;
1468        log_strategy_call("fill", call_start.elapsed());
1469    }
1470    let equity = {
1471        let guard = portfolio.lock().await;
1472        guard.equity()
1473    };
1474    if let Some(value) = equity.to_f64() {
1475        metrics.update_equity(value);
1476    }
1477    alerts.update_equity(equity).await;
1478    metrics.inc_order();
1479    alerts
1480        .notify(
1481            "Order Filled",
1482            &format!(
1483                "order filled: {}@{} ({})",
1484                fill.fill_quantity,
1485                fill.fill_price,
1486                match fill.side {
1487                    Side::Buy => "buy",
1488                    Side::Sell => "sell",
1489                }
1490            ),
1491        )
1492        .await;
1493    if drawdown_triggered {
1494        alert_liquidate_only(alerts.clone()).await;
1495    }
1496    persist_state(
1497        state_repo.clone(),
1498        persisted.clone(),
1499        Some(strategy.clone()),
1500    )
1501    .await?;
1502    Ok(())
1503}
1504
1505async fn process_order_update_event(
1506    order: Order,
1507    orchestrator: Arc<OrderOrchestrator>,
1508    alerts: Arc<AlertManager>,
1509    state_repo: Arc<dyn StateRepository>,
1510    persisted: Arc<Mutex<LiveState>>,
1511) -> Result<()> {
1512    orchestrator.on_order_update(&order);
1513    if matches!(order.status, OrderStatus::Rejected) {
1514        error!(
1515            order_id = %order.id,
1516            symbol = %order.request.symbol,
1517            "order rejected by exchange"
1518        );
1519        alerts.order_failure("order rejected by exchange").await;
1520        alerts
1521            .notify(
1522                "Order rejected",
1523                &format!(
1524                    "Order {} for {} was rejected",
1525                    order.id, order.request.symbol
1526                ),
1527            )
1528            .await;
1529    }
1530    {
1531        let mut snapshot = persisted.lock().await;
1532        let mut found = false;
1533        for existing in &mut snapshot.open_orders {
1534            if existing.id == order.id {
1535                *existing = order.clone();
1536                found = true;
1537                break;
1538            }
1539        }
1540        if !found {
1541            snapshot.open_orders.push(order.clone());
1542        }
1543        if matches!(
1544            order.status,
1545            OrderStatus::Filled | OrderStatus::Canceled | OrderStatus::Rejected
1546        ) {
1547            snapshot.open_orders.retain(|o| o.id != order.id);
1548        }
1549    }
1550    persist_state(state_repo, persisted, None).await?;
1551    Ok(())
1552}
1553
1554async fn emit_signals(
1555    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1556    bus: Arc<EventBus>,
1557    metrics: Arc<LiveMetrics>,
1558) {
1559    let signals = {
1560        let mut strat = strategy.lock().await;
1561        strat.drain_signals()
1562    };
1563    if signals.is_empty() {
1564        return;
1565    }
1566    metrics.inc_signals(signals.len());
1567    for signal in signals {
1568        bus.publish(Event::Signal(SignalEvent { signal }));
1569    }
1570}
1571
1572async fn persist_state(
1573    repo: Arc<dyn StateRepository>,
1574    persisted: Arc<Mutex<LiveState>>,
1575    strategy: Option<Arc<Mutex<Box<dyn Strategy>>>>,
1576) -> Result<()> {
1577    if let Some(strat_lock) = strategy {
1578        // Snapshot strategy state before cloning the full state for persistence
1579        let strat = strat_lock.lock().await;
1580        if let Ok(json_state) = strat.snapshot() {
1581            let mut guard = persisted.lock().await;
1582            guard.strategy_state = Some(json_state);
1583        } else {
1584            warn!("failed to snapshot strategy state");
1585        }
1586    }
1587
1588    let snapshot = {
1589        let guard = persisted.lock().await;
1590        guard.clone()
1591    };
1592    tokio::task::spawn_blocking(move || repo.save(&snapshot))
1593        .await
1594        .map_err(|err| anyhow!("state persistence task failed: {err}"))?
1595        .map_err(|err| anyhow!(err.to_string()))
1596}
1597
1598async fn shared_risk_context(
1599    symbol: &str,
1600    portfolio: &Arc<Mutex<Portfolio>>,
1601    market: &Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1602    persisted: &Arc<Mutex<LiveState>>,
1603) -> RiskContext {
1604    let (signed_qty, equity, liquidate_only) = {
1605        let guard = portfolio.lock().await;
1606        (
1607            guard.signed_position_qty(symbol),
1608            guard.equity(),
1609            guard.liquidate_only(),
1610        )
1611    };
1612    let observed_price = {
1613        let guard = market.lock().await;
1614        guard.get(symbol).and_then(|snapshot| snapshot.price())
1615    };
1616    let last_price = if let Some(price) = observed_price {
1617        price
1618    } else {
1619        let guard = persisted.lock().await;
1620        guard
1621            .last_prices
1622            .get(symbol)
1623            .copied()
1624            .unwrap_or(Decimal::ZERO)
1625    };
1626    RiskContext {
1627        signed_position_qty: signed_qty,
1628        portfolio_equity: equity,
1629        last_price,
1630        liquidate_only,
1631    }
1632}
1633
1634async fn alert_liquidate_only(alerts: Arc<AlertManager>) {
1635    alerts
1636        .notify(
1637            "Max drawdown triggered",
1638            "Portfolio entered liquidate-only mode; new exposure blocked until review",
1639        )
1640        .await;
1641}
1642
1643fn spawn_connection_monitor(
1644    shutdown: ShutdownSignal,
1645    flag: Arc<AtomicBool>,
1646    metrics: Arc<LiveMetrics>,
1647    stream: &'static str,
1648) -> JoinHandle<()> {
1649    tokio::spawn(async move {
1650        loop {
1651            metrics.update_connection_status(stream, flag.load(Ordering::SeqCst));
1652            if !shutdown.sleep(Duration::from_secs(5)).await {
1653                break;
1654            }
1655        }
1656    })
1657}
1658
1659fn spawn_order_timeout_monitor(
1660    orchestrator: Arc<OrderOrchestrator>,
1661    bus: Arc<EventBus>,
1662    alerts: Arc<AlertManager>,
1663    shutdown: ShutdownSignal,
1664) -> JoinHandle<()> {
1665    tokio::spawn(async move {
1666        let mut ticker = tokio::time::interval(tesser_execution::orchestrator::ORDER_POLL_INTERVAL);
1667        loop {
1668            ticker.tick().await;
1669            if shutdown.triggered() {
1670                break;
1671            }
1672            match orchestrator.poll_stale_orders().await {
1673                Ok(updates) => {
1674                    for order in updates {
1675                        if matches!(order.status, OrderStatus::Rejected | OrderStatus::Canceled) {
1676                            let message = format!(
1677                                "Order {} for {} timed out after {}s",
1678                                order.id,
1679                                order.request.symbol,
1680                                tesser_execution::orchestrator::ORDER_TIMEOUT.as_secs()
1681                            );
1682                            error!(%message);
1683                            alerts.order_failure(&message).await;
1684                            alerts.notify("Order timeout", &message).await;
1685                        }
1686                        bus.publish(Event::OrderUpdate(OrderUpdateEvent { order }));
1687                    }
1688                }
1689                Err(err) => {
1690                    warn!(error = %err, "order timeout monitor failed");
1691                }
1692            }
1693        }
1694    })
1695}
1696
1697async fn load_market_registry(
1698    client: Arc<dyn ExecutionClient>,
1699    settings: &LiveSessionSettings,
1700) -> Result<Arc<MarketRegistry>> {
1701    if let Some(path) = &settings.markets_file {
1702        let registry = MarketRegistry::load_from_file(path)
1703            .with_context(|| format!("failed to load markets from {}", path.display()))?;
1704        return Ok(Arc::new(registry));
1705    }
1706
1707    if settings.exec_backend.is_paper() {
1708        return Err(anyhow!(
1709            "paper execution requires --markets-file when exchange metadata is unavailable"
1710        ));
1711    }
1712
1713    let instruments = client
1714        .list_instruments(settings.category.as_path())
1715        .await
1716        .context("failed to fetch instruments from execution client")?;
1717    let registry =
1718        MarketRegistry::from_instruments(instruments).map_err(|err| anyhow!(err.to_string()))?;
1719    Ok(Arc::new(registry))
1720}
1721
1722#[cfg(feature = "bybit")]
1723#[allow(clippy::too_many_arguments)]
1724fn spawn_bybit_private_stream(
1725    creds: BybitCredentials,
1726    ws_url: String,
1727    private_tx: mpsc::Sender<BrokerEvent>,
1728    exec_client: Arc<dyn ExecutionClient>,
1729    symbols: Vec<String>,
1730    last_sync: Arc<tokio::sync::Mutex<Option<DateTime<Utc>>>>,
1731    private_connection_flag: Option<Arc<AtomicBool>>,
1732    metrics: Arc<LiveMetrics>,
1733    shutdown: ShutdownSignal,
1734) {
1735    tokio::spawn(async move {
1736        loop {
1737            match tesser_bybit::ws::connect_private(
1738                &ws_url,
1739                &creds,
1740                private_connection_flag.clone(),
1741            )
1742            .await
1743            {
1744                Ok(mut socket) => {
1745                    if let Some(flag) = &private_connection_flag {
1746                        flag.store(true, Ordering::SeqCst);
1747                    }
1748                    metrics.update_connection_status("private", true);
1749                    info!("Connected to Bybit private WebSocket stream");
1750                    for symbol in &symbols {
1751                        match exec_client.list_open_orders(symbol).await {
1752                            Ok(orders) => {
1753                                for order in orders {
1754                                    if let Err(err) =
1755                                        private_tx.send(BrokerEvent::OrderUpdate(order)).await
1756                                    {
1757                                        error!("failed to send reconciled order update: {err}");
1758                                    }
1759                                }
1760                            }
1761                            Err(e) => {
1762                                error!("failed to reconcile open orders for {symbol}: {e}");
1763                            }
1764                        }
1765                    }
1766                    if let Some(bybit) = exec_client.as_any().downcast_ref::<BybitClient>() {
1767                        let since = {
1768                            let guard = last_sync.lock().await;
1769                            guard.unwrap_or_else(|| Utc::now() - chrono::Duration::minutes(30))
1770                        };
1771                        match bybit.list_executions_since(since).await {
1772                            Ok(fills) => {
1773                                for fill in fills {
1774                                    if let Err(err) = private_tx.send(BrokerEvent::Fill(fill)).await
1775                                    {
1776                                        error!("failed to send reconciled fill: {err}");
1777                                    }
1778                                }
1779                            }
1780                            Err(e) => {
1781                                error!("failed to reconcile executions since {:?}: {}", since, e);
1782                            }
1783                        }
1784                        let mut guard = last_sync.lock().await;
1785                        *guard = Some(Utc::now());
1786                    }
1787
1788                    while let Some(msg) = socket.next().await {
1789                        if shutdown.triggered() {
1790                            break;
1791                        }
1792                        if let Ok(Message::Text(text)) = msg {
1793                            if let Ok(value) = serde_json::from_str::<serde_json::Value>(&text) {
1794                                if let Some(topic) = value.get("topic").and_then(|v| v.as_str()) {
1795                                    match topic {
1796                                        "order" => {
1797                                            if let Ok(msg) = serde_json::from_value::<
1798                                                PrivateMessage<BybitWsOrder>,
1799                                            >(
1800                                                value.clone()
1801                                            ) {
1802                                                for update in msg.data {
1803                                                    if let Ok(order) = update.to_tesser_order(None)
1804                                                    {
1805                                                        if let Err(err) = private_tx
1806                                                            .send(BrokerEvent::OrderUpdate(order))
1807                                                            .await
1808                                                        {
1809                                                            error!(
1810                                                                "failed to send private order update: {err}"
1811                                                            );
1812                                                        }
1813                                                    }
1814                                                }
1815                                            }
1816                                        }
1817                                        "execution" => {
1818                                            if let Ok(msg) = serde_json::from_value::<
1819                                                PrivateMessage<BybitWsExecution>,
1820                                            >(
1821                                                value.clone()
1822                                            ) {
1823                                                for exec in msg.data {
1824                                                    if let Ok(fill) = exec.to_tesser_fill() {
1825                                                        if let Err(err) = private_tx
1826                                                            .send(BrokerEvent::Fill(fill))
1827                                                            .await
1828                                                        {
1829                                                            error!(
1830                                                                "failed to send private fill event: {err}"
1831                                                            );
1832                                                        }
1833                                                    }
1834                                                }
1835                                            }
1836                                        }
1837                                        _ => {}
1838                                    }
1839                                }
1840                            }
1841                        }
1842                    }
1843                }
1844                Err(e) => {
1845                    if let Some(flag) = &private_connection_flag {
1846                        flag.store(false, Ordering::SeqCst);
1847                    }
1848                    metrics.update_connection_status("private", false);
1849                    error!("Bybit private WebSocket connection failed: {e}. Retrying...");
1850                    tokio::time::sleep(Duration::from_secs(5)).await;
1851                }
1852            }
1853            if shutdown.triggered() {
1854                break;
1855            }
1856        }
1857    });
1858}
1859
1860#[cfg(feature = "binance")]
1861#[allow(clippy::too_many_arguments)]
1862fn spawn_binance_private_stream(
1863    exec_client: Arc<dyn ExecutionClient>,
1864    ws_url: String,
1865    private_tx: mpsc::Sender<BrokerEvent>,
1866    private_connection_flag: Option<Arc<AtomicBool>>,
1867    metrics: Arc<LiveMetrics>,
1868    shutdown: ShutdownSignal,
1869) {
1870    tokio::spawn(async move {
1871        loop {
1872            let Some(binance) = exec_client
1873                .as_ref()
1874                .as_any()
1875                .downcast_ref::<BinanceClient>()
1876            else {
1877                warn!("execution client is not Binance");
1878                return;
1879            };
1880            let listen_key = match binance.start_user_stream().await {
1881                Ok(key) => key,
1882                Err(err) => {
1883                    error!("failed to start Binance user stream: {err}");
1884                    tokio::time::sleep(Duration::from_secs(5)).await;
1885                    continue;
1886                }
1887            };
1888            match BinanceUserDataStream::connect(&ws_url, &listen_key).await {
1889                Ok(user_stream) => {
1890                    if let Some(flag) = &private_connection_flag {
1891                        flag.store(true, Ordering::SeqCst);
1892                    }
1893                    metrics.update_connection_status("private", true);
1894                    let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
1895                    let tx_orders = private_tx.clone();
1896                    user_stream.on_event(move |event| {
1897                        if let Some(update) = extract_order_update(&event) {
1898                            if let Some(order) = order_from_update(update) {
1899                                let _ = tx_orders.blocking_send(BrokerEvent::OrderUpdate(order));
1900                            }
1901                            if let Some(fill) = fill_from_update(update) {
1902                                let _ = tx_orders.blocking_send(BrokerEvent::Fill(fill));
1903                            }
1904                        }
1905                        if matches!(event, UserDataStreamEventsResponse::ListenKeyExpired(_)) {
1906                            let _ = reconnect_tx.try_send(());
1907                        }
1908                    });
1909                    let keepalive_client = exec_client.clone();
1910                    let keepalive_handle = tokio::spawn(async move {
1911                        let mut interval = tokio::time::interval(Duration::from_secs(30 * 60));
1912                        loop {
1913                            interval.tick().await;
1914                            let Some(client) = keepalive_client
1915                                .as_ref()
1916                                .as_any()
1917                                .downcast_ref::<BinanceClient>()
1918                            else {
1919                                break;
1920                            };
1921                            if client.keepalive_user_stream().await.is_err() {
1922                                break;
1923                            }
1924                        }
1925                    });
1926                    tokio::select! {
1927                        _ = reconnect_rx.recv() => {
1928                            warn!("binance listen key expired; reconnecting");
1929                        }
1930                        _ = shutdown.wait() => {
1931                            keepalive_handle.abort();
1932                            let _ = user_stream.unsubscribe().await;
1933                            return;
1934                        }
1935                    }
1936                    keepalive_handle.abort();
1937                    let _ = user_stream.unsubscribe().await;
1938                }
1939                Err(err) => {
1940                    error!("failed to connect to Binance user stream: {err}");
1941                }
1942            }
1943            if let Some(flag) = &private_connection_flag {
1944                flag.store(false, Ordering::SeqCst);
1945            }
1946            metrics.update_connection_status("private", false);
1947            if shutdown.triggered() {
1948                break;
1949            }
1950            tokio::time::sleep(Duration::from_secs(5)).await;
1951        }
1952    });
1953}