tesser_cli/
live.rs

1use std::collections::{HashMap, HashSet};
2use std::net::SocketAddr;
3use std::path::PathBuf;
4use std::sync::{
5    atomic::{AtomicBool, Ordering},
6    Arc,
7};
8use std::time::{Duration, Instant};
9
10use anyhow::{anyhow, bail, Context, Result};
11use chrono::{DateTime, Utc};
12use clap::ValueEnum;
13use futures::StreamExt;
14use rust_decimal::{prelude::ToPrimitive, Decimal};
15use tokio::sync::{mpsc, Mutex, Notify};
16use tokio::task::JoinHandle;
17use tokio_tungstenite::tungstenite::Message;
18use tracing::{error, info, trace, warn};
19
20use tesser_broker::{ExecutionClient, MarketStream};
21use tesser_bybit::ws::{BybitWsExecution, BybitWsOrder, PrivateMessage};
22use tesser_bybit::{
23    BybitClient, BybitConfig, BybitCredentials, BybitMarketStream, BybitSubscription, PublicChannel,
24};
25use tesser_config::{AlertingConfig, ExchangeConfig, RiskManagementConfig};
26use tesser_core::{
27    Candle, Fill, Interval, Order, OrderBook, OrderStatus, Position, Price, Quantity, Side, Signal,
28    Symbol, Tick,
29};
30use tesser_events::{
31    CandleEvent, Event, EventBus, FillEvent, OrderBookEvent, OrderUpdateEvent, SignalEvent,
32    TickEvent,
33};
34use tesser_execution::{
35    BasicRiskChecker, ExecutionEngine, FixedOrderSizer, OrderOrchestrator, PreTradeRiskChecker,
36    RiskContext, RiskLimits, SqliteAlgoStateRepository,
37};
38use tesser_markets::MarketRegistry;
39use tesser_paper::PaperExecutionClient;
40use tesser_portfolio::{
41    LiveState, Portfolio, PortfolioConfig, SqliteStateRepository, StateRepository,
42};
43use tesser_strategy::{Strategy, StrategyContext};
44
45use crate::alerts::{AlertDispatcher, AlertManager};
46use crate::telemetry::{spawn_metrics_server, LiveMetrics};
47
48/// Unified event type for asynchronous updates from the broker.
49#[derive(Debug)]
50pub enum BrokerEvent {
51    OrderUpdate(Order),
52    Fill(Fill),
53}
54
55#[derive(Clone, Copy, Debug, Eq, PartialEq, ValueEnum)]
56#[value(rename_all = "kebab-case")]
57pub enum ExecutionBackend {
58    Paper,
59    Live,
60}
61
62impl ExecutionBackend {
63    fn is_paper(self) -> bool {
64        matches!(self, Self::Paper)
65    }
66}
67
68const DEFAULT_ORDER_BOOK_DEPTH: usize = 50;
69const STRATEGY_LOCK_WARN_THRESHOLD: Duration = Duration::from_millis(25);
70const STRATEGY_CALL_WARN_THRESHOLD: Duration = Duration::from_millis(250);
71
72pub struct LiveSessionSettings {
73    pub category: PublicChannel,
74    pub interval: Interval,
75    pub quantity: Quantity,
76    pub slippage_bps: Decimal,
77    pub fee_bps: Decimal,
78    pub history: usize,
79    pub metrics_addr: SocketAddr,
80    pub state_path: PathBuf,
81    pub initial_balances: HashMap<Symbol, Decimal>,
82    pub reporting_currency: Symbol,
83    pub markets_file: Option<PathBuf>,
84    pub alerting: AlertingConfig,
85    pub exec_backend: ExecutionBackend,
86    pub risk: RiskManagementConfig,
87    pub reconciliation_interval: Duration,
88    pub reconciliation_threshold: Decimal,
89}
90
91impl LiveSessionSettings {
92    fn risk_limits(&self) -> RiskLimits {
93        RiskLimits {
94            max_order_quantity: self.risk.max_order_quantity.max(Decimal::ZERO),
95            max_position_quantity: self.risk.max_position_quantity.max(Decimal::ZERO),
96        }
97    }
98}
99
100pub async fn run_live(
101    strategy: Box<dyn Strategy>,
102    symbols: Vec<String>,
103    exchange: ExchangeConfig,
104    settings: LiveSessionSettings,
105) -> Result<()> {
106    run_live_with_shutdown(strategy, symbols, exchange, settings, ShutdownSignal::new()).await
107}
108
109/// Variant of [`run_live`] that accepts a manually controlled shutdown signal.
110pub async fn run_live_with_shutdown(
111    strategy: Box<dyn Strategy>,
112    symbols: Vec<String>,
113    exchange: ExchangeConfig,
114    settings: LiveSessionSettings,
115    shutdown: ShutdownSignal,
116) -> Result<()> {
117    if symbols.is_empty() {
118        return Err(anyhow!("strategy did not declare any subscriptions"));
119    }
120    if settings.quantity <= Decimal::ZERO {
121        return Err(anyhow!("--quantity must be positive"));
122    }
123
124    let public_connection = Arc::new(AtomicBool::new(false));
125    let private_connection = if matches!(settings.exec_backend, ExecutionBackend::Live) {
126        Some(Arc::new(AtomicBool::new(false)))
127    } else {
128        None
129    };
130    let mut stream = BybitMarketStream::connect_public(
131        &exchange.ws_url,
132        settings.category,
133        Some(public_connection.clone()),
134    )
135    .await
136    .context("failed to connect to Bybit WebSocket")?;
137    for symbol in &symbols {
138        stream
139            .subscribe(BybitSubscription::Trades {
140                symbol: symbol.clone(),
141            })
142            .await
143            .with_context(|| format!("failed to subscribe to trades for {symbol}"))?;
144        stream
145            .subscribe(BybitSubscription::Kline {
146                symbol: symbol.clone(),
147                interval: settings.interval,
148            })
149            .await
150            .with_context(|| format!("failed to subscribe to klines for {symbol}"))?;
151        stream
152            .subscribe(BybitSubscription::OrderBook {
153                symbol: symbol.clone(),
154                depth: DEFAULT_ORDER_BOOK_DEPTH,
155            })
156            .await
157            .with_context(|| format!("failed to subscribe to order books for {symbol}"))?;
158    }
159
160    let execution_client = build_execution_client(&exchange, &settings)?;
161    let market_registry = load_market_registry(execution_client.clone(), &settings).await?;
162    if matches!(settings.exec_backend, ExecutionBackend::Live) {
163        info!(
164            rest = %exchange.rest_url,
165            category = ?settings.category,
166            "live execution enabled via Bybit REST"
167        );
168    }
169    let risk_checker: Arc<dyn PreTradeRiskChecker> =
170        Arc::new(BasicRiskChecker::new(settings.risk_limits()));
171    let execution = ExecutionEngine::new(
172        execution_client.clone(),
173        Box::new(FixedOrderSizer {
174            quantity: settings.quantity,
175        }),
176        risk_checker,
177    );
178
179    // Create algorithm state repository
180    let algo_repo_path = settings.state_path.with_extension("algos.db");
181    let algo_state_repo = Arc::new(SqliteAlgoStateRepository::new(&algo_repo_path)?);
182
183    // Create orchestrator with execution engine
184    let orchestrator = OrderOrchestrator::new(Arc::new(execution), algo_state_repo).await?;
185
186    let runtime = LiveRuntime::new(
187        stream,
188        strategy,
189        symbols,
190        orchestrator,
191        settings,
192        market_registry,
193        shutdown,
194        public_connection,
195        private_connection,
196    )
197    .await?;
198    runtime.run().await
199}
200
201fn build_execution_client(
202    exchange: &ExchangeConfig,
203    settings: &LiveSessionSettings,
204) -> Result<Arc<dyn ExecutionClient>> {
205    match settings.exec_backend {
206        ExecutionBackend::Paper => Ok(Arc::new(PaperExecutionClient::new(
207            "paper".to_string(),
208            vec!["BTCUSDT".to_string()],
209            settings.slippage_bps,
210            settings.fee_bps,
211        ))),
212        ExecutionBackend::Live => {
213            let api_key = exchange.api_key.trim();
214            let api_secret = exchange.api_secret.trim();
215            if api_key.is_empty() || api_secret.is_empty() {
216                bail!("exchange profile is missing api_key/api_secret required for live execution");
217            }
218            let client = BybitClient::new(
219                BybitConfig {
220                    base_url: exchange.rest_url.clone(),
221                    category: settings.category.as_path().to_string(),
222                    recv_window: 5_000,
223                    ws_url: Some(exchange.ws_url.clone()),
224                },
225                Some(BybitCredentials {
226                    api_key: api_key.to_string(),
227                    api_secret: api_secret.to_string(),
228                }),
229            );
230            Ok(Arc::new(client))
231        }
232    }
233}
234
235struct LiveRuntime {
236    stream: BybitMarketStream,
237    orchestrator: Arc<OrderOrchestrator>,
238    state_repo: Arc<dyn StateRepository>,
239    persisted: Arc<Mutex<LiveState>>,
240    event_bus: Arc<EventBus>,
241    shutdown: ShutdownSignal,
242    metrics_task: JoinHandle<()>,
243    alert_task: Option<JoinHandle<()>>,
244    reconciliation_task: Option<JoinHandle<()>>,
245    reconciliation_ctx: Option<Arc<ReconciliationContext>>,
246    private_event_rx: mpsc::Receiver<BrokerEvent>,
247    #[allow(dead_code)]
248    last_private_sync: Arc<tokio::sync::Mutex<Option<DateTime<Utc>>>>,
249    subscriber_handles: Vec<JoinHandle<()>>,
250    connection_monitors: Vec<JoinHandle<()>>,
251    order_timeout_task: JoinHandle<()>,
252    strategy: Arc<Mutex<Box<dyn Strategy>>>,
253    _public_connection: Arc<AtomicBool>,
254    _private_connection: Option<Arc<AtomicBool>>,
255}
256
257impl LiveRuntime {
258    #[allow(clippy::too_many_arguments)]
259    async fn new(
260        stream: BybitMarketStream,
261        mut strategy: Box<dyn Strategy>,
262        symbols: Vec<String>,
263        orchestrator: OrderOrchestrator,
264        settings: LiveSessionSettings,
265        market_registry: Arc<MarketRegistry>,
266        shutdown: ShutdownSignal,
267        public_connection: Arc<AtomicBool>,
268        private_connection: Option<Arc<AtomicBool>>,
269    ) -> Result<Self> {
270        let mut strategy_ctx = StrategyContext::new(settings.history);
271        let state_repo: Arc<dyn StateRepository> =
272            Arc::new(SqliteStateRepository::new(settings.state_path.clone()));
273        let mut persisted = match tokio::task::spawn_blocking({
274            let repo = state_repo.clone();
275            move || repo.load()
276        })
277        .await
278        {
279            Ok(Ok(state)) => state,
280            Ok(Err(err)) => {
281                warn!(error = %err, "failed to load live state; starting from defaults");
282                LiveState::default()
283            }
284            Err(err) => {
285                warn!(error = %err, "state load task failed; starting from defaults");
286                LiveState::default()
287            }
288        };
289        let mut live_bootstrap = None;
290        if matches!(settings.exec_backend, ExecutionBackend::Live) {
291            info!("Synchronizing portfolio state from exchange");
292            let execution_client = orchestrator.execution_engine().client();
293            let positions = execution_client
294                .positions()
295                .await
296                .context("failed to fetch remote positions")?;
297            let balances = execution_client
298                .account_balances()
299                .await
300                .context("failed to fetch remote account balances")?;
301            let mut open_orders = Vec::new();
302            for symbol in &symbols {
303                let mut symbol_orders = execution_client
304                    .list_open_orders(symbol)
305                    .await
306                    .with_context(|| format!("failed to fetch open orders for {symbol}"))?;
307                open_orders.append(&mut symbol_orders);
308            }
309            persisted.open_orders = open_orders;
310            live_bootstrap = Some((positions, balances));
311        }
312
313        let portfolio_cfg = PortfolioConfig {
314            initial_balances: settings.initial_balances.clone(),
315            reporting_currency: settings.reporting_currency.clone(),
316            max_drawdown: Some(settings.risk.max_drawdown),
317        };
318        let portfolio = if let Some((positions, balances)) = live_bootstrap {
319            Portfolio::from_exchange_state(
320                positions,
321                balances,
322                portfolio_cfg.clone(),
323                market_registry.clone(),
324            )
325        } else if let Some(snapshot) = persisted.portfolio.take() {
326            Portfolio::from_state(snapshot, portfolio_cfg.clone(), market_registry.clone())
327        } else {
328            Portfolio::new(portfolio_cfg.clone(), market_registry.clone())
329        };
330        strategy_ctx.update_positions(portfolio.positions());
331        // Restore strategy state if found in persistence
332        if let Some(state) = persisted.strategy_state.take() {
333            info!("restoring strategy state from persistence");
334            strategy
335                .restore(state)
336                .context("failed to restore strategy state")?;
337        }
338        persisted.portfolio = Some(portfolio.snapshot());
339
340        let mut market = HashMap::new();
341        for symbol in &symbols {
342            let mut snapshot = MarketSnapshot::default();
343            if let Some(price) = persisted.last_prices.get(symbol).copied() {
344                snapshot.last_trade = Some(price);
345            }
346            market.insert(symbol.clone(), snapshot);
347        }
348
349        let metrics = LiveMetrics::new();
350        metrics.update_connection_status("public", public_connection.load(Ordering::SeqCst));
351        if let Some(flag) = &private_connection {
352            metrics.update_connection_status("private", flag.load(Ordering::SeqCst));
353        }
354        let metrics_task = spawn_metrics_server(metrics.registry(), settings.metrics_addr);
355        let dispatcher = AlertDispatcher::new(settings.alerting.webhook_url.clone());
356        let alerts = AlertManager::new(
357            settings.alerting,
358            dispatcher,
359            Some(public_connection.clone()),
360            private_connection.clone(),
361        );
362        let (private_event_tx, private_event_rx) = mpsc::channel(1024);
363        let last_private_sync = Arc::new(tokio::sync::Mutex::new(persisted.last_candle_ts));
364        let alerts = Arc::new(alerts);
365        let alert_task = alerts.spawn_watchdog();
366        let metrics = Arc::new(metrics);
367        let mut connection_monitors = Vec::new();
368        connection_monitors.push(spawn_connection_monitor(
369            shutdown.clone(),
370            public_connection.clone(),
371            metrics.clone(),
372            "public",
373        ));
374        if let Some(flag) = private_connection.clone() {
375            connection_monitors.push(spawn_connection_monitor(
376                shutdown.clone(),
377                flag,
378                metrics.clone(),
379                "private",
380            ));
381        }
382
383        if !settings.exec_backend.is_paper() {
384            let execution_engine = orchestrator.execution_engine();
385            let bybit_creds = match execution_engine.credentials() {
386                Some(creds) => creds,
387                None => bail!("live execution requires Bybit credentials"),
388            };
389            let ws_url = execution_engine.ws_url();
390            let private_tx = private_event_tx.clone();
391            let exec_client = execution_engine.client();
392            let symbols_for_private = symbols.clone();
393            let last_sync_handle = last_private_sync.clone();
394            let private_connection_flag = private_connection.clone();
395            let metrics_for_private = metrics.clone();
396            tokio::spawn(async move {
397                let creds = bybit_creds;
398                let endpoint = ws_url;
399                let client = exec_client;
400                let symbols = symbols_for_private;
401                let last_sync = last_sync_handle;
402                loop {
403                    match tesser_bybit::ws::connect_private(
404                        &endpoint,
405                        &creds,
406                        private_connection_flag.clone(),
407                    )
408                    .await
409                    {
410                        Ok(mut socket) => {
411                            if let Some(flag) = &private_connection_flag {
412                                flag.store(true, Ordering::SeqCst);
413                            }
414                            metrics_for_private.update_connection_status("private", true);
415                            info!("Connected to Bybit private WebSocket stream");
416                            // Incremental reconciliation right after a successful reconnect
417                            // 1) Sync open orders
418                            for symbol in &symbols {
419                                match client.list_open_orders(symbol).await {
420                                    Ok(orders) => {
421                                        for order in orders {
422                                            if let Err(err) = private_tx
423                                                .send(BrokerEvent::OrderUpdate(order))
424                                                .await
425                                            {
426                                                error!(
427                                                    "failed to send reconciled order update: {err}"
428                                                );
429                                            }
430                                        }
431                                    }
432                                    Err(e) => {
433                                        error!("failed to reconcile open orders for {symbol}: {e}");
434                                    }
435                                }
436                            }
437
438                            // 2) Fetch any executions since the last sync timestamp
439                            if let Some(bybit) =
440                                client.as_any().downcast_ref::<tesser_bybit::BybitClient>()
441                            {
442                                let since = {
443                                    let guard = last_sync.lock().await;
444                                    // Default to 30 minutes ago if missing
445                                    guard.unwrap_or_else(|| {
446                                        Utc::now() - chrono::Duration::minutes(30)
447                                    })
448                                };
449                                match bybit.list_executions_since(since).await {
450                                    Ok(fills) => {
451                                        for fill in fills {
452                                            if let Err(err) =
453                                                private_tx.send(BrokerEvent::Fill(fill)).await
454                                            {
455                                                error!("failed to send reconciled fill: {err}");
456                                            }
457                                        }
458                                    }
459                                    Err(e) => {
460                                        error!(
461                                            "failed to reconcile executions since {:?}: {}",
462                                            since, e
463                                        );
464                                    }
465                                }
466                                // Update last sync time to now regardless to avoid tight loops
467                                let mut guard = last_sync.lock().await;
468                                *guard = Some(Utc::now());
469                            }
470
471                            while let Some(msg) = socket.next().await {
472                                if let Ok(Message::Text(text)) = msg {
473                                    if let Ok(value) =
474                                        serde_json::from_str::<serde_json::Value>(&text)
475                                    {
476                                        if let Some(topic) =
477                                            value.get("topic").and_then(|v| v.as_str())
478                                        {
479                                            match topic {
480                                                "order" => {
481                                                    if let Ok(msg) = serde_json::from_value::<
482                                                        PrivateMessage<BybitWsOrder>,
483                                                    >(
484                                                        value.clone()
485                                                    ) {
486                                                        for update in msg.data {
487                                                            match update.to_tesser_order(None) {
488                                                                Ok(order) => {
489                                                                    if let Err(err) = private_tx.send(BrokerEvent::OrderUpdate(order)).await {
490                                                                        error!("failed to send private order update: {err}");
491                                                                    }
492                                                                }
493                                                                Err(err) => error!("failed to convert order update: {err}"),
494                                                            }
495                                                        }
496                                                    }
497                                                }
498                                                "execution" => {
499                                                    if let Ok(msg) = serde_json::from_value::<
500                                                        PrivateMessage<BybitWsExecution>,
501                                                    >(
502                                                        value.clone()
503                                                    ) {
504                                                        for exec in msg.data {
505                                                            match exec.to_tesser_fill() {
506                                                                Ok(fill) => {
507                                                                    if let Err(err) = private_tx.send(BrokerEvent::Fill(fill)).await {
508                                                                        error!("failed to send private fill event: {err}");
509                                                                    }
510                                                                }
511                                                                Err(err) => error!("failed to parse execution: {err}"),
512                                                            }
513                                                        }
514                                                    }
515                                                }
516                                                _ => {}
517                                            }
518                                        }
519                                    }
520                                }
521                            }
522                        }
523                        Err(e) => {
524                            if let Some(flag) = &private_connection_flag {
525                                flag.store(false, Ordering::SeqCst);
526                            }
527                            metrics_for_private.update_connection_status("private", false);
528                            error!("Private WebSocket connection failed: {e}. Retrying...");
529                        }
530                    }
531                    if let Some(flag) = &private_connection_flag {
532                        flag.store(false, Ordering::SeqCst);
533                    }
534                    metrics_for_private.update_connection_status("private", false);
535                    tokio::time::sleep(Duration::from_secs(5)).await;
536                }
537            });
538        }
539
540        let strategy = Arc::new(Mutex::new(strategy));
541        let strategy_ctx = Arc::new(Mutex::new(strategy_ctx));
542        let portfolio = Arc::new(Mutex::new(portfolio));
543        let market = Arc::new(Mutex::new(market));
544        let persisted = Arc::new(Mutex::new(persisted));
545        let orchestrator = Arc::new(orchestrator);
546        let event_bus = Arc::new(EventBus::new(2048));
547        let reconciliation_ctx = (!settings.exec_backend.is_paper()).then(|| {
548            Arc::new(ReconciliationContext::new(ReconciliationContextConfig {
549                client: orchestrator.execution_engine().client(),
550                portfolio: portfolio.clone(),
551                persisted: persisted.clone(),
552                state_repo: state_repo.clone(),
553                alerts: alerts.clone(),
554                metrics: metrics.clone(),
555                reporting_currency: settings.reporting_currency.clone(),
556                threshold: settings.reconciliation_threshold,
557            }))
558        });
559        let reconciliation_task = reconciliation_ctx.as_ref().map(|ctx| {
560            spawn_reconciliation_loop(
561                ctx.clone(),
562                shutdown.clone(),
563                settings.reconciliation_interval,
564            )
565        });
566        let subscriber_handles = spawn_event_subscribers(
567            event_bus.clone(),
568            strategy.clone(),
569            strategy_ctx.clone(),
570            orchestrator.clone(),
571            portfolio.clone(),
572            metrics.clone(),
573            alerts.clone(),
574            market.clone(),
575            state_repo.clone(),
576            persisted.clone(),
577            settings.exec_backend,
578        );
579        let order_timeout_task = spawn_order_timeout_monitor(
580            orchestrator.clone(),
581            event_bus.clone(),
582            alerts.clone(),
583            shutdown.clone(),
584        );
585
586        info!(
587            symbols = ?symbols,
588            category = ?settings.category,
589            metrics_addr = %settings.metrics_addr,
590            state_path = %settings.state_path.display(),
591            history = settings.history,
592            "market stream ready"
593        );
594
595        for symbol in &symbols {
596            let ctx = shared_risk_context(symbol, &portfolio, &market, &persisted).await;
597            orchestrator.update_risk_context(symbol.clone(), ctx);
598        }
599
600        Ok(Self {
601            stream,
602            orchestrator,
603            state_repo,
604            persisted,
605            event_bus,
606            shutdown,
607            metrics_task,
608            alert_task,
609            reconciliation_task,
610            reconciliation_ctx,
611            private_event_rx,
612            last_private_sync,
613            subscriber_handles,
614            connection_monitors,
615            order_timeout_task,
616            strategy,
617            _public_connection: public_connection,
618            _private_connection: private_connection,
619        })
620    }
621
622    async fn run(mut self) -> Result<()> {
623        info!("live session started");
624        if let Some(ctx) = self.reconciliation_ctx.as_ref() {
625            perform_state_reconciliation(ctx.as_ref())
626                .await
627                .context("initial state reconciliation failed")?;
628        }
629        let backoff = Duration::from_millis(200);
630        let mut orchestrator_timer = tokio::time::interval(Duration::from_secs(1));
631
632        while !self.shutdown.triggered() {
633            let mut progressed = false;
634
635            if let Some(tick) = self.stream.next_tick().await? {
636                progressed = true;
637                self.event_bus.publish(Event::Tick(TickEvent { tick }));
638            }
639
640            if let Some(candle) = self.stream.next_candle().await? {
641                progressed = true;
642                self.event_bus
643                    .publish(Event::Candle(CandleEvent { candle }));
644            }
645
646            if let Some(book) = self.stream.next_order_book().await? {
647                progressed = true;
648                self.event_bus
649                    .publish(Event::OrderBook(OrderBookEvent { order_book: book }));
650            }
651
652            tokio::select! {
653                biased;
654                Some(event) = self.private_event_rx.recv() => {
655                    progressed = true;
656                    match event {
657                        BrokerEvent::OrderUpdate(order) => {
658                            info!(
659                                order_id = %order.id,
660                                status = ?order.status,
661                                symbol = %order.request.symbol,
662                                "received private order update"
663                            );
664                            self.event_bus
665                                .publish(Event::OrderUpdate(OrderUpdateEvent { order }));
666                        }
667                        BrokerEvent::Fill(fill) => {
668                            info!(
669                                order_id = %fill.order_id,
670                                symbol = %fill.symbol,
671                                qty = %fill.fill_quantity,
672                                price = %fill.fill_price,
673                                "received private fill"
674                            );
675                            self.event_bus.publish(Event::Fill(FillEvent { fill }));
676                        }
677                    }
678                }
679                _ = orchestrator_timer.tick() => {
680                    // Drive TWAP and other time-based algorithms
681                    if let Err(e) = self.orchestrator.on_timer_tick().await {
682                        error!("Orchestrator timer tick failed: {}", e);
683                    }
684                }
685                else => {}
686            }
687
688            if !progressed && !self.shutdown.sleep(backoff).await {
689                break;
690            }
691        }
692        info!("live session stopping");
693        self.metrics_task.abort();
694        if let Some(handle) = self.alert_task.take() {
695            handle.abort();
696        }
697        if let Some(handle) = self.reconciliation_task.take() {
698            handle.abort();
699        }
700        self.order_timeout_task.abort();
701        for handle in self.subscriber_handles.drain(..) {
702            handle.abort();
703        }
704        for handle in self.connection_monitors.drain(..) {
705            handle.abort();
706        }
707        if let Err(err) = self.save_state().await {
708            warn!(error = %err, "failed to persist shutdown state");
709        }
710        Ok(())
711    }
712
713    async fn save_state(&self) -> Result<()> {
714        persist_state(
715            self.state_repo.clone(),
716            self.persisted.clone(),
717            Some(self.strategy.clone()),
718        )
719        .await
720    }
721}
722
723struct ReconciliationContext {
724    client: Arc<dyn ExecutionClient>,
725    portfolio: Arc<Mutex<Portfolio>>,
726    persisted: Arc<Mutex<LiveState>>,
727    state_repo: Arc<dyn StateRepository>,
728    alerts: Arc<AlertManager>,
729    metrics: Arc<LiveMetrics>,
730    reporting_currency: Symbol,
731    threshold: Decimal,
732}
733
734struct ReconciliationContextConfig {
735    client: Arc<dyn ExecutionClient>,
736    portfolio: Arc<Mutex<Portfolio>>,
737    persisted: Arc<Mutex<LiveState>>,
738    state_repo: Arc<dyn StateRepository>,
739    alerts: Arc<AlertManager>,
740    metrics: Arc<LiveMetrics>,
741    reporting_currency: Symbol,
742    threshold: Decimal,
743}
744
745impl ReconciliationContext {
746    fn new(config: ReconciliationContextConfig) -> Self {
747        let ReconciliationContextConfig {
748            client,
749            portfolio,
750            persisted,
751            state_repo,
752            alerts,
753            metrics,
754            reporting_currency,
755            threshold,
756        } = config;
757        let min_threshold = Decimal::new(1, 6); // 0.000001 as a practical floor
758        let threshold = if threshold <= Decimal::ZERO {
759            min_threshold
760        } else {
761            threshold
762        };
763        Self {
764            client,
765            portfolio,
766            persisted,
767            state_repo,
768            alerts,
769            metrics,
770            reporting_currency,
771            threshold,
772        }
773    }
774}
775
776fn spawn_reconciliation_loop(
777    ctx: Arc<ReconciliationContext>,
778    shutdown: ShutdownSignal,
779    interval: Duration,
780) -> JoinHandle<()> {
781    tokio::spawn(async move {
782        while shutdown.sleep(interval).await {
783            if let Err(err) = perform_state_reconciliation(ctx.as_ref()).await {
784                error!(error = %err, "periodic state reconciliation failed");
785            }
786        }
787    })
788}
789
790async fn perform_state_reconciliation(ctx: &ReconciliationContext) -> Result<()> {
791    info!("running state reconciliation");
792    let remote_positions = ctx
793        .client
794        .positions()
795        .await
796        .context("failed to fetch remote positions")?;
797    let remote_balances = ctx
798        .client
799        .account_balances()
800        .await
801        .context("failed to fetch remote balances")?;
802    let (local_positions, local_cash) = {
803        let guard = ctx.portfolio.lock().await;
804        (guard.positions(), guard.cash())
805    };
806
807    let remote_map = positions_to_map(remote_positions);
808    let local_map = positions_to_map(local_positions);
809    let mut tracked_symbols: HashSet<String> = HashSet::new();
810    tracked_symbols.extend(remote_map.keys().cloned());
811    tracked_symbols.extend(local_map.keys().cloned());
812
813    let mut severe_findings = Vec::new();
814    for symbol in tracked_symbols {
815        let local_qty = local_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
816        let remote_qty = remote_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
817        let diff = (local_qty - remote_qty).abs();
818        let diff_value = diff.to_f64().unwrap_or(0.0);
819        ctx.metrics.update_position_diff(&symbol, diff_value);
820        if diff > Decimal::ZERO {
821            warn!(
822                symbol = %symbol,
823                local = %local_qty,
824                remote = %remote_qty,
825                diff = %diff,
826                "position mismatch detected during reconciliation"
827            );
828            let pct = normalize_diff(diff, remote_qty);
829            if pct >= ctx.threshold {
830                error!(
831                    symbol = %symbol,
832                    local = %local_qty,
833                    remote = %remote_qty,
834                    diff = %diff,
835                    pct = %pct,
836                    "position mismatch exceeds threshold"
837                );
838                severe_findings.push(format!(
839                    "{symbol} local={local_qty} remote={remote_qty} diff={diff}"
840                ));
841            }
842        }
843    }
844
845    let reporting = ctx.reporting_currency.as_str();
846    let remote_cash = remote_balances
847        .iter()
848        .find(|balance| balance.currency == reporting)
849        .map(|balance| balance.available)
850        .unwrap_or_else(|| Decimal::ZERO);
851    let cash_diff = (remote_cash - local_cash).abs();
852    ctx.metrics
853        .update_balance_diff(reporting, cash_diff.to_f64().unwrap_or(0.0));
854    if cash_diff > Decimal::ZERO {
855        warn!(
856            currency = %reporting,
857            local = %local_cash,
858            remote = %remote_cash,
859            diff = %cash_diff,
860            "balance mismatch detected during reconciliation"
861        );
862        let pct = normalize_diff(cash_diff, remote_cash);
863        if pct >= ctx.threshold {
864            error!(
865                currency = %reporting,
866                local = %local_cash,
867                remote = %remote_cash,
868                diff = %cash_diff,
869                pct = %pct,
870                "balance mismatch exceeds threshold"
871            );
872            severe_findings.push(format!(
873                "{reporting} balance local={local_cash} remote={remote_cash} diff={cash_diff}"
874            ));
875        }
876    }
877
878    if severe_findings.is_empty() {
879        info!("state reconciliation complete with no critical divergence");
880        return Ok(());
881    }
882
883    let alert_body = severe_findings.join("; ");
884    ctx.alerts
885        .notify("State reconciliation divergence", &alert_body)
886        .await;
887    enforce_liquidate_only(ctx).await;
888    Ok(())
889}
890
891async fn enforce_liquidate_only(ctx: &ReconciliationContext) {
892    let snapshot = {
893        let mut guard = ctx.portfolio.lock().await;
894        if !guard.set_liquidate_only(true) {
895            return;
896        }
897        info!("entering liquidate-only mode due to reconciliation divergence");
898        guard.snapshot()
899    };
900    {
901        let mut state = ctx.persisted.lock().await;
902        state.portfolio = Some(snapshot);
903    }
904    if let Err(err) = persist_state(ctx.state_repo.clone(), ctx.persisted.clone(), None).await {
905        warn!(error = %err, "failed to persist liquidate-only transition");
906    }
907}
908
909fn positions_to_map(positions: Vec<Position>) -> HashMap<String, Decimal> {
910    let mut map = HashMap::new();
911    for position in positions {
912        map.insert(position.symbol.clone(), position_signed_qty(&position));
913    }
914    map
915}
916
917fn position_signed_qty(position: &Position) -> Decimal {
918    match position.side {
919        Some(Side::Buy) => position.quantity,
920        Some(Side::Sell) => -position.quantity,
921        None => Decimal::ZERO,
922    }
923}
924
925fn normalize_diff(diff: Decimal, reference: Decimal) -> Decimal {
926    if diff <= Decimal::ZERO {
927        Decimal::ZERO
928    } else {
929        let denominator = std::cmp::max(reference.abs(), Decimal::ONE);
930        diff / denominator
931    }
932}
933
934#[derive(Default)]
935struct MarketSnapshot {
936    last_trade: Option<Price>,
937    last_trade_ts: Option<DateTime<Utc>>,
938    last_candle: Option<Candle>,
939}
940
941impl MarketSnapshot {
942    fn price(&self) -> Option<Price> {
943        self.last_trade
944            .or_else(|| self.last_candle.as_ref().map(|c| c.close))
945    }
946}
947
948#[derive(Clone)]
949pub struct ShutdownSignal {
950    flag: Arc<AtomicBool>,
951    notify: Arc<Notify>,
952}
953
954impl ShutdownSignal {
955    pub fn new() -> Self {
956        let flag = Arc::new(AtomicBool::new(false));
957        let notify = Arc::new(Notify::new());
958        let flag_clone = flag.clone();
959        let notify_clone = notify.clone();
960        tokio::spawn(async move {
961            if tokio::signal::ctrl_c().await.is_ok() {
962                flag_clone.store(true, Ordering::SeqCst);
963                notify_clone.notify_waiters();
964            }
965        });
966        Self { flag, notify }
967    }
968
969    pub fn trigger(&self) {
970        self.flag.store(true, Ordering::SeqCst);
971        self.notify.notify_waiters();
972    }
973
974    fn triggered(&self) -> bool {
975        self.flag.load(Ordering::SeqCst)
976    }
977
978    async fn sleep(&self, duration: Duration) -> bool {
979        tokio::select! {
980            _ = tokio::time::sleep(duration) => true,
981            _ = self.notify.notified() => false,
982        }
983    }
984}
985
986impl Default for ShutdownSignal {
987    fn default() -> Self {
988        Self::new()
989    }
990}
991
992#[allow(clippy::too_many_arguments)]
993fn spawn_event_subscribers(
994    bus: Arc<EventBus>,
995    strategy: Arc<Mutex<Box<dyn Strategy>>>,
996    strategy_ctx: Arc<Mutex<StrategyContext>>,
997    orchestrator: Arc<OrderOrchestrator>,
998    portfolio: Arc<Mutex<Portfolio>>,
999    metrics: Arc<LiveMetrics>,
1000    alerts: Arc<AlertManager>,
1001    market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1002    state_repo: Arc<dyn StateRepository>,
1003    persisted: Arc<Mutex<LiveState>>,
1004    exec_backend: ExecutionBackend,
1005) -> Vec<JoinHandle<()>> {
1006    let mut handles = Vec::new();
1007
1008    let market_bus = bus.clone();
1009    let market_strategy = strategy.clone();
1010    let market_ctx = strategy_ctx.clone();
1011    let market_metrics = metrics.clone();
1012    let market_alerts = alerts.clone();
1013    let market_state = state_repo.clone();
1014    let market_persisted = persisted.clone();
1015    let market_portfolio = portfolio.clone();
1016    let market_snapshot = market.clone();
1017    let orchestrator_clone = orchestrator.clone();
1018    handles.push(tokio::spawn(async move {
1019        let mut stream = market_bus.subscribe();
1020        loop {
1021            match stream.recv().await {
1022                Ok(Event::Tick(evt)) => {
1023                    if let Err(err) = process_tick_event(
1024                        evt.tick,
1025                        market_strategy.clone(),
1026                        market_ctx.clone(),
1027                        market_metrics.clone(),
1028                        market_alerts.clone(),
1029                        market_snapshot.clone(),
1030                        market_portfolio.clone(),
1031                        market_state.clone(),
1032                        market_persisted.clone(),
1033                        market_bus.clone(),
1034                    )
1035                    .await
1036                    {
1037                        warn!(error = %err, "tick handler failed");
1038                    }
1039                }
1040                Ok(Event::Candle(evt)) => {
1041                    if let Err(err) = process_candle_event(
1042                        evt.candle,
1043                        market_strategy.clone(),
1044                        market_ctx.clone(),
1045                        market_metrics.clone(),
1046                        market_alerts.clone(),
1047                        market_snapshot.clone(),
1048                        market_portfolio.clone(),
1049                        orchestrator_clone.clone(),
1050                        exec_backend,
1051                        market_state.clone(),
1052                        market_persisted.clone(),
1053                        market_bus.clone(),
1054                    )
1055                    .await
1056                    {
1057                        warn!(error = %err, "candle handler failed");
1058                    }
1059                }
1060                Ok(Event::OrderBook(evt)) => {
1061                    if let Err(err) = process_order_book_event(
1062                        evt.order_book,
1063                        market_strategy.clone(),
1064                        market_ctx.clone(),
1065                        market_metrics.clone(),
1066                        market_alerts.clone(),
1067                        market_snapshot.clone(),
1068                        market_bus.clone(),
1069                    )
1070                    .await
1071                    {
1072                        warn!(error = %err, "order book handler failed");
1073                    }
1074                }
1075                Ok(_) => {}
1076                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1077                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1078                    warn!(lag = lag, "market subscriber lagged");
1079                    continue;
1080                }
1081            }
1082        }
1083    }));
1084
1085    let exec_bus = bus.clone();
1086    let exec_portfolio = portfolio.clone();
1087    let exec_market = market.clone();
1088    let exec_persisted = persisted.clone();
1089    let exec_alerts = alerts.clone();
1090    let exec_metrics = metrics.clone();
1091    let exec_orchestrator = orchestrator.clone();
1092    handles.push(tokio::spawn(async move {
1093        let orchestrator = exec_orchestrator.clone();
1094        let mut stream = exec_bus.subscribe();
1095        loop {
1096            match stream.recv().await {
1097                Ok(Event::Signal(evt)) => {
1098                    if let Err(err) = process_signal_event(
1099                        evt.signal,
1100                        orchestrator.clone(),
1101                        exec_portfolio.clone(),
1102                        exec_market.clone(),
1103                        exec_persisted.clone(),
1104                        exec_alerts.clone(),
1105                        exec_metrics.clone(),
1106                    )
1107                    .await
1108                    {
1109                        warn!(error = %err, "signal handler failed");
1110                    }
1111                }
1112                Ok(_) => {}
1113                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1114                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1115                    warn!(lag = lag, "signal subscriber lagged");
1116                    continue;
1117                }
1118            }
1119        }
1120    }));
1121
1122    let fill_bus = bus.clone();
1123    let fill_state = state_repo.clone();
1124    let fill_orchestrator = orchestrator.clone();
1125    let fill_persisted = persisted.clone();
1126    let fill_alerts = alerts.clone();
1127    handles.push(tokio::spawn(async move {
1128        let orchestrator = fill_orchestrator.clone();
1129        let persisted = fill_persisted.clone();
1130        let mut stream = fill_bus.subscribe();
1131        loop {
1132            match stream.recv().await {
1133                Ok(Event::Fill(evt)) => {
1134                    if let Err(err) = process_fill_event(
1135                        evt.fill,
1136                        portfolio.clone(),
1137                        strategy.clone(),
1138                        strategy_ctx.clone(),
1139                        orchestrator.clone(),
1140                        metrics.clone(),
1141                        fill_alerts.clone(),
1142                        fill_state.clone(),
1143                        persisted.clone(),
1144                    )
1145                    .await
1146                    {
1147                        warn!(error = %err, "fill handler failed");
1148                    }
1149                }
1150                Ok(_) => {}
1151                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1152                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1153                    warn!(lag = lag, "fill subscriber lagged");
1154                    continue;
1155                }
1156            }
1157        }
1158    }));
1159
1160    let order_bus = bus.clone();
1161    let order_persisted = persisted.clone();
1162    let order_alerts = alerts.clone();
1163    let order_orchestrator = orchestrator.clone();
1164    // Note: We don't pass strategy to order update handler to avoid lock contention
1165    // on high-frequency updates. Strategy state is snapshotted on candles/fills.
1166    handles.push(tokio::spawn(async move {
1167        let orchestrator = order_orchestrator.clone();
1168        let persisted = order_persisted.clone();
1169        let mut stream = order_bus.subscribe();
1170        loop {
1171            match stream.recv().await {
1172                Ok(Event::OrderUpdate(evt)) => {
1173                    if let Err(err) = process_order_update_event(
1174                        evt.order,
1175                        orchestrator.clone(),
1176                        order_alerts.clone(),
1177                        state_repo.clone(),
1178                        persisted.clone(),
1179                    )
1180                    .await
1181                    {
1182                        warn!(error = %err, "order update handler failed");
1183                    }
1184                }
1185                Ok(_) => {}
1186                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1187                Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1188                    warn!(lag = lag, "order subscriber lagged");
1189                    continue;
1190                }
1191            }
1192        }
1193    }));
1194
1195    handles
1196}
1197
1198#[allow(clippy::too_many_arguments)]
1199async fn process_tick_event(
1200    tick: Tick,
1201    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1202    strategy_ctx: Arc<Mutex<StrategyContext>>,
1203    metrics: Arc<LiveMetrics>,
1204    alerts: Arc<AlertManager>,
1205    market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1206    portfolio: Arc<Mutex<Portfolio>>,
1207    state_repo: Arc<dyn StateRepository>,
1208    persisted: Arc<Mutex<LiveState>>,
1209    bus: Arc<EventBus>,
1210) -> Result<()> {
1211    metrics.inc_tick();
1212    metrics.update_staleness(0.0);
1213    metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1214    alerts.heartbeat().await;
1215    {
1216        let mut guard = market.lock().await;
1217        if let Some(snapshot) = guard.get_mut(&tick.symbol) {
1218            snapshot.last_trade = Some(tick.price);
1219            snapshot.last_trade_ts = Some(tick.exchange_timestamp);
1220        }
1221    }
1222    let mut drawdown_triggered = false;
1223    let mut snapshot_on_trigger = None;
1224    {
1225        let mut guard = portfolio.lock().await;
1226        let was_liquidate_only = guard.liquidate_only();
1227        match guard.update_market_data(&tick.symbol, tick.price) {
1228            Ok(_) => {
1229                if !was_liquidate_only && guard.liquidate_only() {
1230                    drawdown_triggered = true;
1231                    snapshot_on_trigger = Some(guard.snapshot());
1232                }
1233            }
1234            Err(err) => {
1235                warn!(
1236                    symbol = %tick.symbol,
1237                    error = %err,
1238                    "failed to refresh market data"
1239                );
1240            }
1241        }
1242    }
1243    {
1244        let mut state = persisted.lock().await;
1245        state.last_prices.insert(tick.symbol.clone(), tick.price);
1246        if drawdown_triggered {
1247            if let Some(snapshot) = snapshot_on_trigger.take() {
1248                state.portfolio = Some(snapshot);
1249            }
1250        }
1251    }
1252    if drawdown_triggered {
1253        persist_state(
1254            state_repo.clone(),
1255            persisted.clone(),
1256            Some(strategy.clone()),
1257        )
1258        .await?;
1259        alert_liquidate_only(alerts.clone()).await;
1260    }
1261    {
1262        let mut ctx = strategy_ctx.lock().await;
1263        ctx.push_tick(tick.clone());
1264        let lock_start = Instant::now();
1265        let mut strat = strategy.lock().await;
1266        log_strategy_lock("tick", lock_start.elapsed());
1267        let call_start = Instant::now();
1268        strat
1269            .on_tick(&ctx, &tick)
1270            .await
1271            .context("strategy failure on tick event")?;
1272        log_strategy_call("tick", call_start.elapsed());
1273    }
1274    emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1275    Ok(())
1276}
1277
1278#[allow(clippy::too_many_arguments)]
1279async fn process_candle_event(
1280    candle: Candle,
1281    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1282    strategy_ctx: Arc<Mutex<StrategyContext>>,
1283    metrics: Arc<LiveMetrics>,
1284    alerts: Arc<AlertManager>,
1285    market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1286    portfolio: Arc<Mutex<Portfolio>>,
1287    orchestrator: Arc<OrderOrchestrator>,
1288    exec_backend: ExecutionBackend,
1289    state_repo: Arc<dyn StateRepository>,
1290    persisted: Arc<Mutex<LiveState>>,
1291    bus: Arc<EventBus>,
1292) -> Result<()> {
1293    metrics.inc_candle();
1294    metrics.update_staleness(0.0);
1295    metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1296    alerts.heartbeat().await;
1297    metrics.update_price(&candle.symbol, candle.close.to_f64().unwrap_or(0.0));
1298    {
1299        let mut guard = market.lock().await;
1300        if let Some(snapshot) = guard.get_mut(&candle.symbol) {
1301            snapshot.last_candle = Some(candle.clone());
1302            snapshot.last_trade = Some(candle.close);
1303        }
1304    }
1305    if exec_backend.is_paper() {
1306        let client = orchestrator.execution_engine().client();
1307        if let Some(paper) = client.as_any().downcast_ref::<PaperExecutionClient>() {
1308            paper.update_price(&candle.symbol, candle.close);
1309        }
1310    }
1311    let mut candle_drawdown_triggered = false;
1312    let mut candle_snapshot = None;
1313    {
1314        let mut guard = portfolio.lock().await;
1315        let was_liquidate_only = guard.liquidate_only();
1316        match guard.update_market_data(&candle.symbol, candle.close) {
1317            Ok(_) => {
1318                if !was_liquidate_only && guard.liquidate_only() {
1319                    candle_drawdown_triggered = true;
1320                    candle_snapshot = Some(guard.snapshot());
1321                }
1322            }
1323            Err(err) => {
1324                warn!(
1325                    symbol = %candle.symbol,
1326                    error = %err,
1327                    "failed to refresh market data"
1328                );
1329            }
1330        }
1331    }
1332    if candle_drawdown_triggered {
1333        if let Some(snapshot) = candle_snapshot.take() {
1334            let mut persisted_guard = persisted.lock().await;
1335            persisted_guard.portfolio = Some(snapshot);
1336        }
1337        alert_liquidate_only(alerts.clone()).await;
1338    }
1339    {
1340        let mut ctx = strategy_ctx.lock().await;
1341        ctx.push_candle(candle.clone());
1342        let lock_start = Instant::now();
1343        let mut strat = strategy.lock().await;
1344        log_strategy_lock("candle", lock_start.elapsed());
1345        let call_start = Instant::now();
1346        strat
1347            .on_candle(&ctx, &candle)
1348            .await
1349            .context("strategy failure on candle event")?;
1350        log_strategy_call("candle", call_start.elapsed());
1351    }
1352    {
1353        let mut snapshot = persisted.lock().await;
1354        snapshot.last_candle_ts = Some(candle.timestamp);
1355        snapshot
1356            .last_prices
1357            .insert(candle.symbol.clone(), candle.close);
1358    }
1359    persist_state(
1360        state_repo.clone(),
1361        persisted.clone(),
1362        Some(strategy.clone()),
1363    )
1364    .await?;
1365    let ctx = shared_risk_context(&candle.symbol, &portfolio, &market, &persisted).await;
1366    orchestrator.update_risk_context(candle.symbol.clone(), ctx);
1367    emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1368    Ok(())
1369}
1370
1371async fn process_order_book_event(
1372    book: OrderBook,
1373    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1374    strategy_ctx: Arc<Mutex<StrategyContext>>,
1375    metrics: Arc<LiveMetrics>,
1376    alerts: Arc<AlertManager>,
1377    _market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1378    bus: Arc<EventBus>,
1379) -> Result<()> {
1380    metrics.update_staleness(0.0);
1381    alerts.heartbeat().await;
1382    {
1383        let mut ctx = strategy_ctx.lock().await;
1384        ctx.push_order_book(book.clone());
1385        let lock_start = Instant::now();
1386        let mut strat = strategy.lock().await;
1387        log_strategy_lock("order_book", lock_start.elapsed());
1388        let call_start = Instant::now();
1389        strat
1390            .on_order_book(&ctx, &book)
1391            .await
1392            .context("strategy failure on order book")?;
1393        log_strategy_call("order_book", call_start.elapsed());
1394    }
1395    emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1396    Ok(())
1397}
1398
1399async fn process_signal_event(
1400    signal: Signal,
1401    orchestrator: Arc<OrderOrchestrator>,
1402    portfolio: Arc<Mutex<Portfolio>>,
1403    market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1404    persisted: Arc<Mutex<LiveState>>,
1405    alerts: Arc<AlertManager>,
1406    metrics: Arc<LiveMetrics>,
1407) -> Result<()> {
1408    let ctx = shared_risk_context(&signal.symbol, &portfolio, &market, &persisted).await;
1409    orchestrator.update_risk_context(signal.symbol.clone(), ctx);
1410    match orchestrator.on_signal(&signal, &ctx).await {
1411        Ok(()) => {
1412            alerts.reset_order_failures().await;
1413        }
1414        Err(err) => {
1415            metrics.inc_order_failure();
1416            alerts
1417                .order_failure(&format!("orchestrator error: {err}"))
1418                .await;
1419        }
1420    }
1421    Ok(())
1422}
1423
1424fn log_strategy_lock(event: &str, wait: Duration) {
1425    let wait_ms = wait.as_secs_f64() * 1000.0;
1426    if wait >= STRATEGY_LOCK_WARN_THRESHOLD {
1427        warn!(target: "strategy", event, wait_ms, "strategy lock wait exceeded threshold");
1428    } else {
1429        trace!(target: "strategy", event, wait_ms, "strategy lock acquired");
1430    }
1431}
1432
1433fn log_strategy_call(event: &str, elapsed: Duration) {
1434    let duration_ms = elapsed.as_secs_f64() * 1000.0;
1435    if elapsed >= STRATEGY_CALL_WARN_THRESHOLD {
1436        warn!(target: "strategy", event, duration_ms, "strategy call latency above threshold");
1437    } else {
1438        trace!(target: "strategy", event, duration_ms, "strategy call completed");
1439    }
1440}
1441
1442#[allow(clippy::too_many_arguments)]
1443async fn process_fill_event(
1444    fill: Fill,
1445    portfolio: Arc<Mutex<Portfolio>>,
1446    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1447    strategy_ctx: Arc<Mutex<StrategyContext>>,
1448    orchestrator: Arc<OrderOrchestrator>,
1449    metrics: Arc<LiveMetrics>,
1450    alerts: Arc<AlertManager>,
1451    state_repo: Arc<dyn StateRepository>,
1452    persisted: Arc<Mutex<LiveState>>,
1453) -> Result<()> {
1454    let mut drawdown_triggered = false;
1455    {
1456        let mut guard = portfolio.lock().await;
1457        let was_liquidate_only = guard.liquidate_only();
1458        guard
1459            .apply_fill(&fill)
1460            .context("Failed to apply fill to portfolio")?;
1461        if !was_liquidate_only && guard.liquidate_only() {
1462            drawdown_triggered = true;
1463        }
1464        let snapshot = guard.snapshot();
1465        let mut persisted_guard = persisted.lock().await;
1466        persisted_guard.portfolio = Some(snapshot);
1467    }
1468    {
1469        let positions = {
1470            let guard = portfolio.lock().await;
1471            guard.positions()
1472        };
1473        let mut ctx = strategy_ctx.lock().await;
1474        ctx.update_positions(positions);
1475    }
1476    orchestrator.on_fill(&fill).await.ok();
1477    {
1478        let ctx = strategy_ctx.lock().await;
1479        let lock_start = Instant::now();
1480        let mut strat = strategy.lock().await;
1481        log_strategy_lock("fill", lock_start.elapsed());
1482        let call_start = Instant::now();
1483        strat
1484            .on_fill(&ctx, &fill)
1485            .await
1486            .context("Strategy failed on fill event")?;
1487        log_strategy_call("fill", call_start.elapsed());
1488    }
1489    let equity = {
1490        let guard = portfolio.lock().await;
1491        guard.equity()
1492    };
1493    if let Some(value) = equity.to_f64() {
1494        metrics.update_equity(value);
1495    }
1496    alerts.update_equity(equity).await;
1497    metrics.inc_order();
1498    alerts
1499        .notify(
1500            "Order Filled",
1501            &format!(
1502                "order filled: {}@{} ({})",
1503                fill.fill_quantity,
1504                fill.fill_price,
1505                match fill.side {
1506                    Side::Buy => "buy",
1507                    Side::Sell => "sell",
1508                }
1509            ),
1510        )
1511        .await;
1512    if drawdown_triggered {
1513        alert_liquidate_only(alerts.clone()).await;
1514    }
1515    persist_state(
1516        state_repo.clone(),
1517        persisted.clone(),
1518        Some(strategy.clone()),
1519    )
1520    .await?;
1521    Ok(())
1522}
1523
1524async fn process_order_update_event(
1525    order: Order,
1526    orchestrator: Arc<OrderOrchestrator>,
1527    alerts: Arc<AlertManager>,
1528    state_repo: Arc<dyn StateRepository>,
1529    persisted: Arc<Mutex<LiveState>>,
1530) -> Result<()> {
1531    orchestrator.on_order_update(&order);
1532    if matches!(order.status, OrderStatus::Rejected) {
1533        error!(
1534            order_id = %order.id,
1535            symbol = %order.request.symbol,
1536            "order rejected by exchange"
1537        );
1538        alerts.order_failure("order rejected by exchange").await;
1539        alerts
1540            .notify(
1541                "Order rejected",
1542                &format!(
1543                    "Order {} for {} was rejected",
1544                    order.id, order.request.symbol
1545                ),
1546            )
1547            .await;
1548    }
1549    {
1550        let mut snapshot = persisted.lock().await;
1551        let mut found = false;
1552        for existing in &mut snapshot.open_orders {
1553            if existing.id == order.id {
1554                *existing = order.clone();
1555                found = true;
1556                break;
1557            }
1558        }
1559        if !found {
1560            snapshot.open_orders.push(order.clone());
1561        }
1562        if matches!(
1563            order.status,
1564            OrderStatus::Filled | OrderStatus::Canceled | OrderStatus::Rejected
1565        ) {
1566            snapshot.open_orders.retain(|o| o.id != order.id);
1567        }
1568    }
1569    persist_state(state_repo, persisted, None).await?;
1570    Ok(())
1571}
1572
1573async fn emit_signals(
1574    strategy: Arc<Mutex<Box<dyn Strategy>>>,
1575    bus: Arc<EventBus>,
1576    metrics: Arc<LiveMetrics>,
1577) {
1578    let signals = {
1579        let mut strat = strategy.lock().await;
1580        strat.drain_signals()
1581    };
1582    if signals.is_empty() {
1583        return;
1584    }
1585    metrics.inc_signals(signals.len());
1586    for signal in signals {
1587        bus.publish(Event::Signal(SignalEvent { signal }));
1588    }
1589}
1590
1591async fn persist_state(
1592    repo: Arc<dyn StateRepository>,
1593    persisted: Arc<Mutex<LiveState>>,
1594    strategy: Option<Arc<Mutex<Box<dyn Strategy>>>>,
1595) -> Result<()> {
1596    if let Some(strat_lock) = strategy {
1597        // Snapshot strategy state before cloning the full state for persistence
1598        let strat = strat_lock.lock().await;
1599        if let Ok(json_state) = strat.snapshot() {
1600            let mut guard = persisted.lock().await;
1601            guard.strategy_state = Some(json_state);
1602        } else {
1603            warn!("failed to snapshot strategy state");
1604        }
1605    }
1606
1607    let snapshot = {
1608        let guard = persisted.lock().await;
1609        guard.clone()
1610    };
1611    tokio::task::spawn_blocking(move || repo.save(&snapshot))
1612        .await
1613        .map_err(|err| anyhow!("state persistence task failed: {err}"))?
1614        .map_err(|err| anyhow!(err.to_string()))
1615}
1616
1617async fn shared_risk_context(
1618    symbol: &str,
1619    portfolio: &Arc<Mutex<Portfolio>>,
1620    market: &Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1621    persisted: &Arc<Mutex<LiveState>>,
1622) -> RiskContext {
1623    let (signed_qty, equity, liquidate_only) = {
1624        let guard = portfolio.lock().await;
1625        (
1626            guard.signed_position_qty(symbol),
1627            guard.equity(),
1628            guard.liquidate_only(),
1629        )
1630    };
1631    let observed_price = {
1632        let guard = market.lock().await;
1633        guard.get(symbol).and_then(|snapshot| snapshot.price())
1634    };
1635    let last_price = if let Some(price) = observed_price {
1636        price
1637    } else {
1638        let guard = persisted.lock().await;
1639        guard
1640            .last_prices
1641            .get(symbol)
1642            .copied()
1643            .unwrap_or(Decimal::ZERO)
1644    };
1645    RiskContext {
1646        signed_position_qty: signed_qty,
1647        portfolio_equity: equity,
1648        last_price,
1649        liquidate_only,
1650    }
1651}
1652
1653async fn alert_liquidate_only(alerts: Arc<AlertManager>) {
1654    alerts
1655        .notify(
1656            "Max drawdown triggered",
1657            "Portfolio entered liquidate-only mode; new exposure blocked until review",
1658        )
1659        .await;
1660}
1661
1662fn spawn_connection_monitor(
1663    shutdown: ShutdownSignal,
1664    flag: Arc<AtomicBool>,
1665    metrics: Arc<LiveMetrics>,
1666    stream: &'static str,
1667) -> JoinHandle<()> {
1668    tokio::spawn(async move {
1669        loop {
1670            metrics.update_connection_status(stream, flag.load(Ordering::SeqCst));
1671            if !shutdown.sleep(Duration::from_secs(5)).await {
1672                break;
1673            }
1674        }
1675    })
1676}
1677
1678fn spawn_order_timeout_monitor(
1679    orchestrator: Arc<OrderOrchestrator>,
1680    bus: Arc<EventBus>,
1681    alerts: Arc<AlertManager>,
1682    shutdown: ShutdownSignal,
1683) -> JoinHandle<()> {
1684    tokio::spawn(async move {
1685        let mut ticker = tokio::time::interval(tesser_execution::orchestrator::ORDER_POLL_INTERVAL);
1686        loop {
1687            ticker.tick().await;
1688            if shutdown.triggered() {
1689                break;
1690            }
1691            match orchestrator.poll_stale_orders().await {
1692                Ok(updates) => {
1693                    for order in updates {
1694                        if matches!(order.status, OrderStatus::Rejected | OrderStatus::Canceled) {
1695                            let message = format!(
1696                                "Order {} for {} timed out after {}s",
1697                                order.id,
1698                                order.request.symbol,
1699                                tesser_execution::orchestrator::ORDER_TIMEOUT.as_secs()
1700                            );
1701                            error!(%message);
1702                            alerts.order_failure(&message).await;
1703                            alerts.notify("Order timeout", &message).await;
1704                        }
1705                        bus.publish(Event::OrderUpdate(OrderUpdateEvent { order }));
1706                    }
1707                }
1708                Err(err) => {
1709                    warn!(error = %err, "order timeout monitor failed");
1710                }
1711            }
1712        }
1713    })
1714}
1715
1716async fn load_market_registry(
1717    client: Arc<dyn ExecutionClient>,
1718    settings: &LiveSessionSettings,
1719) -> Result<Arc<MarketRegistry>> {
1720    if let Some(path) = &settings.markets_file {
1721        let registry = MarketRegistry::load_from_file(path)
1722            .with_context(|| format!("failed to load markets from {}", path.display()))?;
1723        return Ok(Arc::new(registry));
1724    }
1725
1726    if settings.exec_backend.is_paper() {
1727        return Err(anyhow!(
1728            "paper execution requires --markets-file when exchange metadata is unavailable"
1729        ));
1730    }
1731
1732    let instruments = client
1733        .list_instruments(settings.category.as_path())
1734        .await
1735        .context("failed to fetch instruments from execution client")?;
1736    let registry =
1737        MarketRegistry::from_instruments(instruments).map_err(|err| anyhow!(err.to_string()))?;
1738    Ok(Arc::new(registry))
1739}