1use std::collections::{HashMap, HashSet};
2use std::net::SocketAddr;
3use std::path::PathBuf;
4use std::sync::{
5 atomic::{AtomicBool, Ordering},
6 Arc,
7};
8use std::time::{Duration, Instant};
9
10use anyhow::{anyhow, bail, Context, Result};
11use chrono::{DateTime, Utc};
12use clap::ValueEnum;
13use futures::StreamExt;
14use rust_decimal::{prelude::ToPrimitive, Decimal};
15use tokio::sync::{mpsc, Mutex, Notify};
16use tokio::task::JoinHandle;
17use tokio_tungstenite::tungstenite::Message;
18use tracing::{error, info, trace, warn};
19
20use tesser_broker::{ExecutionClient, MarketStream};
21use tesser_bybit::ws::{BybitWsExecution, BybitWsOrder, PrivateMessage};
22use tesser_bybit::{
23 BybitClient, BybitConfig, BybitCredentials, BybitMarketStream, BybitSubscription, PublicChannel,
24};
25use tesser_config::{AlertingConfig, ExchangeConfig, RiskManagementConfig};
26use tesser_core::{
27 Candle, Fill, Interval, Order, OrderBook, OrderStatus, Position, Price, Quantity, Side, Signal,
28 Symbol, Tick,
29};
30use tesser_events::{
31 CandleEvent, Event, EventBus, FillEvent, OrderBookEvent, OrderUpdateEvent, SignalEvent,
32 TickEvent,
33};
34use tesser_execution::{
35 BasicRiskChecker, ExecutionEngine, FixedOrderSizer, OrderOrchestrator, PreTradeRiskChecker,
36 RiskContext, RiskLimits, SqliteAlgoStateRepository,
37};
38use tesser_markets::MarketRegistry;
39use tesser_paper::PaperExecutionClient;
40use tesser_portfolio::{
41 LiveState, Portfolio, PortfolioConfig, SqliteStateRepository, StateRepository,
42};
43use tesser_strategy::{Strategy, StrategyContext};
44
45use crate::alerts::{AlertDispatcher, AlertManager};
46use crate::telemetry::{spawn_metrics_server, LiveMetrics};
47
48#[derive(Debug)]
50pub enum BrokerEvent {
51 OrderUpdate(Order),
52 Fill(Fill),
53}
54
55#[derive(Clone, Copy, Debug, Eq, PartialEq, ValueEnum)]
56#[value(rename_all = "kebab-case")]
57pub enum ExecutionBackend {
58 Paper,
59 Live,
60}
61
62impl ExecutionBackend {
63 fn is_paper(self) -> bool {
64 matches!(self, Self::Paper)
65 }
66}
67
68const DEFAULT_ORDER_BOOK_DEPTH: usize = 50;
69const STRATEGY_LOCK_WARN_THRESHOLD: Duration = Duration::from_millis(25);
70const STRATEGY_CALL_WARN_THRESHOLD: Duration = Duration::from_millis(250);
71
72pub struct LiveSessionSettings {
73 pub category: PublicChannel,
74 pub interval: Interval,
75 pub quantity: Quantity,
76 pub slippage_bps: Decimal,
77 pub fee_bps: Decimal,
78 pub history: usize,
79 pub metrics_addr: SocketAddr,
80 pub state_path: PathBuf,
81 pub initial_balances: HashMap<Symbol, Decimal>,
82 pub reporting_currency: Symbol,
83 pub markets_file: Option<PathBuf>,
84 pub alerting: AlertingConfig,
85 pub exec_backend: ExecutionBackend,
86 pub risk: RiskManagementConfig,
87 pub reconciliation_interval: Duration,
88 pub reconciliation_threshold: Decimal,
89}
90
91impl LiveSessionSettings {
92 fn risk_limits(&self) -> RiskLimits {
93 RiskLimits {
94 max_order_quantity: self.risk.max_order_quantity.max(Decimal::ZERO),
95 max_position_quantity: self.risk.max_position_quantity.max(Decimal::ZERO),
96 }
97 }
98}
99
100pub async fn run_live(
101 strategy: Box<dyn Strategy>,
102 symbols: Vec<String>,
103 exchange: ExchangeConfig,
104 settings: LiveSessionSettings,
105) -> Result<()> {
106 run_live_with_shutdown(strategy, symbols, exchange, settings, ShutdownSignal::new()).await
107}
108
109pub async fn run_live_with_shutdown(
111 strategy: Box<dyn Strategy>,
112 symbols: Vec<String>,
113 exchange: ExchangeConfig,
114 settings: LiveSessionSettings,
115 shutdown: ShutdownSignal,
116) -> Result<()> {
117 if symbols.is_empty() {
118 return Err(anyhow!("strategy did not declare any subscriptions"));
119 }
120 if settings.quantity <= Decimal::ZERO {
121 return Err(anyhow!("--quantity must be positive"));
122 }
123
124 let public_connection = Arc::new(AtomicBool::new(false));
125 let private_connection = if matches!(settings.exec_backend, ExecutionBackend::Live) {
126 Some(Arc::new(AtomicBool::new(false)))
127 } else {
128 None
129 };
130 let mut stream = BybitMarketStream::connect_public(
131 &exchange.ws_url,
132 settings.category,
133 Some(public_connection.clone()),
134 )
135 .await
136 .context("failed to connect to Bybit WebSocket")?;
137 for symbol in &symbols {
138 stream
139 .subscribe(BybitSubscription::Trades {
140 symbol: symbol.clone(),
141 })
142 .await
143 .with_context(|| format!("failed to subscribe to trades for {symbol}"))?;
144 stream
145 .subscribe(BybitSubscription::Kline {
146 symbol: symbol.clone(),
147 interval: settings.interval,
148 })
149 .await
150 .with_context(|| format!("failed to subscribe to klines for {symbol}"))?;
151 stream
152 .subscribe(BybitSubscription::OrderBook {
153 symbol: symbol.clone(),
154 depth: DEFAULT_ORDER_BOOK_DEPTH,
155 })
156 .await
157 .with_context(|| format!("failed to subscribe to order books for {symbol}"))?;
158 }
159
160 let execution_client = build_execution_client(&exchange, &settings)?;
161 let market_registry = load_market_registry(execution_client.clone(), &settings).await?;
162 if matches!(settings.exec_backend, ExecutionBackend::Live) {
163 info!(
164 rest = %exchange.rest_url,
165 category = ?settings.category,
166 "live execution enabled via Bybit REST"
167 );
168 }
169 let risk_checker: Arc<dyn PreTradeRiskChecker> =
170 Arc::new(BasicRiskChecker::new(settings.risk_limits()));
171 let execution = ExecutionEngine::new(
172 execution_client.clone(),
173 Box::new(FixedOrderSizer {
174 quantity: settings.quantity,
175 }),
176 risk_checker,
177 );
178
179 let algo_repo_path = settings.state_path.with_extension("algos.db");
181 let algo_state_repo = Arc::new(SqliteAlgoStateRepository::new(&algo_repo_path)?);
182
183 let orchestrator = OrderOrchestrator::new(Arc::new(execution), algo_state_repo).await?;
185
186 let runtime = LiveRuntime::new(
187 stream,
188 strategy,
189 symbols,
190 orchestrator,
191 settings,
192 market_registry,
193 shutdown,
194 public_connection,
195 private_connection,
196 )
197 .await?;
198 runtime.run().await
199}
200
201fn build_execution_client(
202 exchange: &ExchangeConfig,
203 settings: &LiveSessionSettings,
204) -> Result<Arc<dyn ExecutionClient>> {
205 match settings.exec_backend {
206 ExecutionBackend::Paper => Ok(Arc::new(PaperExecutionClient::new(
207 "paper".to_string(),
208 vec!["BTCUSDT".to_string()],
209 settings.slippage_bps,
210 settings.fee_bps,
211 ))),
212 ExecutionBackend::Live => {
213 let api_key = exchange.api_key.trim();
214 let api_secret = exchange.api_secret.trim();
215 if api_key.is_empty() || api_secret.is_empty() {
216 bail!("exchange profile is missing api_key/api_secret required for live execution");
217 }
218 let client = BybitClient::new(
219 BybitConfig {
220 base_url: exchange.rest_url.clone(),
221 category: settings.category.as_path().to_string(),
222 recv_window: 5_000,
223 ws_url: Some(exchange.ws_url.clone()),
224 },
225 Some(BybitCredentials {
226 api_key: api_key.to_string(),
227 api_secret: api_secret.to_string(),
228 }),
229 );
230 Ok(Arc::new(client))
231 }
232 }
233}
234
235struct LiveRuntime {
236 stream: BybitMarketStream,
237 orchestrator: Arc<OrderOrchestrator>,
238 state_repo: Arc<dyn StateRepository>,
239 persisted: Arc<Mutex<LiveState>>,
240 event_bus: Arc<EventBus>,
241 shutdown: ShutdownSignal,
242 metrics_task: JoinHandle<()>,
243 alert_task: Option<JoinHandle<()>>,
244 reconciliation_task: Option<JoinHandle<()>>,
245 reconciliation_ctx: Option<Arc<ReconciliationContext>>,
246 private_event_rx: mpsc::Receiver<BrokerEvent>,
247 #[allow(dead_code)]
248 last_private_sync: Arc<tokio::sync::Mutex<Option<DateTime<Utc>>>>,
249 subscriber_handles: Vec<JoinHandle<()>>,
250 connection_monitors: Vec<JoinHandle<()>>,
251 order_timeout_task: JoinHandle<()>,
252 strategy: Arc<Mutex<Box<dyn Strategy>>>,
253 _public_connection: Arc<AtomicBool>,
254 _private_connection: Option<Arc<AtomicBool>>,
255}
256
257impl LiveRuntime {
258 #[allow(clippy::too_many_arguments)]
259 async fn new(
260 stream: BybitMarketStream,
261 mut strategy: Box<dyn Strategy>,
262 symbols: Vec<String>,
263 orchestrator: OrderOrchestrator,
264 settings: LiveSessionSettings,
265 market_registry: Arc<MarketRegistry>,
266 shutdown: ShutdownSignal,
267 public_connection: Arc<AtomicBool>,
268 private_connection: Option<Arc<AtomicBool>>,
269 ) -> Result<Self> {
270 let mut strategy_ctx = StrategyContext::new(settings.history);
271 let state_repo: Arc<dyn StateRepository> =
272 Arc::new(SqliteStateRepository::new(settings.state_path.clone()));
273 let mut persisted = match tokio::task::spawn_blocking({
274 let repo = state_repo.clone();
275 move || repo.load()
276 })
277 .await
278 {
279 Ok(Ok(state)) => state,
280 Ok(Err(err)) => {
281 warn!(error = %err, "failed to load live state; starting from defaults");
282 LiveState::default()
283 }
284 Err(err) => {
285 warn!(error = %err, "state load task failed; starting from defaults");
286 LiveState::default()
287 }
288 };
289 let mut live_bootstrap = None;
290 if matches!(settings.exec_backend, ExecutionBackend::Live) {
291 info!("Synchronizing portfolio state from exchange");
292 let execution_client = orchestrator.execution_engine().client();
293 let positions = execution_client
294 .positions()
295 .await
296 .context("failed to fetch remote positions")?;
297 let balances = execution_client
298 .account_balances()
299 .await
300 .context("failed to fetch remote account balances")?;
301 let mut open_orders = Vec::new();
302 for symbol in &symbols {
303 let mut symbol_orders = execution_client
304 .list_open_orders(symbol)
305 .await
306 .with_context(|| format!("failed to fetch open orders for {symbol}"))?;
307 open_orders.append(&mut symbol_orders);
308 }
309 persisted.open_orders = open_orders;
310 live_bootstrap = Some((positions, balances));
311 }
312
313 let portfolio_cfg = PortfolioConfig {
314 initial_balances: settings.initial_balances.clone(),
315 reporting_currency: settings.reporting_currency.clone(),
316 max_drawdown: Some(settings.risk.max_drawdown),
317 };
318 let portfolio = if let Some((positions, balances)) = live_bootstrap {
319 Portfolio::from_exchange_state(
320 positions,
321 balances,
322 portfolio_cfg.clone(),
323 market_registry.clone(),
324 )
325 } else if let Some(snapshot) = persisted.portfolio.take() {
326 Portfolio::from_state(snapshot, portfolio_cfg.clone(), market_registry.clone())
327 } else {
328 Portfolio::new(portfolio_cfg.clone(), market_registry.clone())
329 };
330 strategy_ctx.update_positions(portfolio.positions());
331 if let Some(state) = persisted.strategy_state.take() {
333 info!("restoring strategy state from persistence");
334 strategy
335 .restore(state)
336 .context("failed to restore strategy state")?;
337 }
338 persisted.portfolio = Some(portfolio.snapshot());
339
340 let mut market = HashMap::new();
341 for symbol in &symbols {
342 let mut snapshot = MarketSnapshot::default();
343 if let Some(price) = persisted.last_prices.get(symbol).copied() {
344 snapshot.last_trade = Some(price);
345 }
346 market.insert(symbol.clone(), snapshot);
347 }
348
349 let metrics = LiveMetrics::new();
350 metrics.update_connection_status("public", public_connection.load(Ordering::SeqCst));
351 if let Some(flag) = &private_connection {
352 metrics.update_connection_status("private", flag.load(Ordering::SeqCst));
353 }
354 let metrics_task = spawn_metrics_server(metrics.registry(), settings.metrics_addr);
355 let dispatcher = AlertDispatcher::new(settings.alerting.webhook_url.clone());
356 let alerts = AlertManager::new(
357 settings.alerting,
358 dispatcher,
359 Some(public_connection.clone()),
360 private_connection.clone(),
361 );
362 let (private_event_tx, private_event_rx) = mpsc::channel(1024);
363 let last_private_sync = Arc::new(tokio::sync::Mutex::new(persisted.last_candle_ts));
364 let alerts = Arc::new(alerts);
365 let alert_task = alerts.spawn_watchdog();
366 let metrics = Arc::new(metrics);
367 let mut connection_monitors = Vec::new();
368 connection_monitors.push(spawn_connection_monitor(
369 shutdown.clone(),
370 public_connection.clone(),
371 metrics.clone(),
372 "public",
373 ));
374 if let Some(flag) = private_connection.clone() {
375 connection_monitors.push(spawn_connection_monitor(
376 shutdown.clone(),
377 flag,
378 metrics.clone(),
379 "private",
380 ));
381 }
382
383 if !settings.exec_backend.is_paper() {
384 let execution_engine = orchestrator.execution_engine();
385 let bybit_creds = match execution_engine.credentials() {
386 Some(creds) => creds,
387 None => bail!("live execution requires Bybit credentials"),
388 };
389 let ws_url = execution_engine.ws_url();
390 let private_tx = private_event_tx.clone();
391 let exec_client = execution_engine.client();
392 let symbols_for_private = symbols.clone();
393 let last_sync_handle = last_private_sync.clone();
394 let private_connection_flag = private_connection.clone();
395 let metrics_for_private = metrics.clone();
396 tokio::spawn(async move {
397 let creds = bybit_creds;
398 let endpoint = ws_url;
399 let client = exec_client;
400 let symbols = symbols_for_private;
401 let last_sync = last_sync_handle;
402 loop {
403 match tesser_bybit::ws::connect_private(
404 &endpoint,
405 &creds,
406 private_connection_flag.clone(),
407 )
408 .await
409 {
410 Ok(mut socket) => {
411 if let Some(flag) = &private_connection_flag {
412 flag.store(true, Ordering::SeqCst);
413 }
414 metrics_for_private.update_connection_status("private", true);
415 info!("Connected to Bybit private WebSocket stream");
416 for symbol in &symbols {
419 match client.list_open_orders(symbol).await {
420 Ok(orders) => {
421 for order in orders {
422 if let Err(err) = private_tx
423 .send(BrokerEvent::OrderUpdate(order))
424 .await
425 {
426 error!(
427 "failed to send reconciled order update: {err}"
428 );
429 }
430 }
431 }
432 Err(e) => {
433 error!("failed to reconcile open orders for {symbol}: {e}");
434 }
435 }
436 }
437
438 if let Some(bybit) =
440 client.as_any().downcast_ref::<tesser_bybit::BybitClient>()
441 {
442 let since = {
443 let guard = last_sync.lock().await;
444 guard.unwrap_or_else(|| {
446 Utc::now() - chrono::Duration::minutes(30)
447 })
448 };
449 match bybit.list_executions_since(since).await {
450 Ok(fills) => {
451 for fill in fills {
452 if let Err(err) =
453 private_tx.send(BrokerEvent::Fill(fill)).await
454 {
455 error!("failed to send reconciled fill: {err}");
456 }
457 }
458 }
459 Err(e) => {
460 error!(
461 "failed to reconcile executions since {:?}: {}",
462 since, e
463 );
464 }
465 }
466 let mut guard = last_sync.lock().await;
468 *guard = Some(Utc::now());
469 }
470
471 while let Some(msg) = socket.next().await {
472 if let Ok(Message::Text(text)) = msg {
473 if let Ok(value) =
474 serde_json::from_str::<serde_json::Value>(&text)
475 {
476 if let Some(topic) =
477 value.get("topic").and_then(|v| v.as_str())
478 {
479 match topic {
480 "order" => {
481 if let Ok(msg) = serde_json::from_value::<
482 PrivateMessage<BybitWsOrder>,
483 >(
484 value.clone()
485 ) {
486 for update in msg.data {
487 match update.to_tesser_order(None) {
488 Ok(order) => {
489 if let Err(err) = private_tx.send(BrokerEvent::OrderUpdate(order)).await {
490 error!("failed to send private order update: {err}");
491 }
492 }
493 Err(err) => error!("failed to convert order update: {err}"),
494 }
495 }
496 }
497 }
498 "execution" => {
499 if let Ok(msg) = serde_json::from_value::<
500 PrivateMessage<BybitWsExecution>,
501 >(
502 value.clone()
503 ) {
504 for exec in msg.data {
505 match exec.to_tesser_fill() {
506 Ok(fill) => {
507 if let Err(err) = private_tx.send(BrokerEvent::Fill(fill)).await {
508 error!("failed to send private fill event: {err}");
509 }
510 }
511 Err(err) => error!("failed to parse execution: {err}"),
512 }
513 }
514 }
515 }
516 _ => {}
517 }
518 }
519 }
520 }
521 }
522 }
523 Err(e) => {
524 if let Some(flag) = &private_connection_flag {
525 flag.store(false, Ordering::SeqCst);
526 }
527 metrics_for_private.update_connection_status("private", false);
528 error!("Private WebSocket connection failed: {e}. Retrying...");
529 }
530 }
531 if let Some(flag) = &private_connection_flag {
532 flag.store(false, Ordering::SeqCst);
533 }
534 metrics_for_private.update_connection_status("private", false);
535 tokio::time::sleep(Duration::from_secs(5)).await;
536 }
537 });
538 }
539
540 let strategy = Arc::new(Mutex::new(strategy));
541 let strategy_ctx = Arc::new(Mutex::new(strategy_ctx));
542 let portfolio = Arc::new(Mutex::new(portfolio));
543 let market = Arc::new(Mutex::new(market));
544 let persisted = Arc::new(Mutex::new(persisted));
545 let orchestrator = Arc::new(orchestrator);
546 let event_bus = Arc::new(EventBus::new(2048));
547 let reconciliation_ctx = (!settings.exec_backend.is_paper()).then(|| {
548 Arc::new(ReconciliationContext::new(ReconciliationContextConfig {
549 client: orchestrator.execution_engine().client(),
550 portfolio: portfolio.clone(),
551 persisted: persisted.clone(),
552 state_repo: state_repo.clone(),
553 alerts: alerts.clone(),
554 metrics: metrics.clone(),
555 reporting_currency: settings.reporting_currency.clone(),
556 threshold: settings.reconciliation_threshold,
557 }))
558 });
559 let reconciliation_task = reconciliation_ctx.as_ref().map(|ctx| {
560 spawn_reconciliation_loop(
561 ctx.clone(),
562 shutdown.clone(),
563 settings.reconciliation_interval,
564 )
565 });
566 let subscriber_handles = spawn_event_subscribers(
567 event_bus.clone(),
568 strategy.clone(),
569 strategy_ctx.clone(),
570 orchestrator.clone(),
571 portfolio.clone(),
572 metrics.clone(),
573 alerts.clone(),
574 market.clone(),
575 state_repo.clone(),
576 persisted.clone(),
577 settings.exec_backend,
578 );
579 let order_timeout_task = spawn_order_timeout_monitor(
580 orchestrator.clone(),
581 event_bus.clone(),
582 alerts.clone(),
583 shutdown.clone(),
584 );
585
586 info!(
587 symbols = ?symbols,
588 category = ?settings.category,
589 metrics_addr = %settings.metrics_addr,
590 state_path = %settings.state_path.display(),
591 history = settings.history,
592 "market stream ready"
593 );
594
595 for symbol in &symbols {
596 let ctx = shared_risk_context(symbol, &portfolio, &market, &persisted).await;
597 orchestrator.update_risk_context(symbol.clone(), ctx);
598 }
599
600 Ok(Self {
601 stream,
602 orchestrator,
603 state_repo,
604 persisted,
605 event_bus,
606 shutdown,
607 metrics_task,
608 alert_task,
609 reconciliation_task,
610 reconciliation_ctx,
611 private_event_rx,
612 last_private_sync,
613 subscriber_handles,
614 connection_monitors,
615 order_timeout_task,
616 strategy,
617 _public_connection: public_connection,
618 _private_connection: private_connection,
619 })
620 }
621
622 async fn run(mut self) -> Result<()> {
623 info!("live session started");
624 if let Some(ctx) = self.reconciliation_ctx.as_ref() {
625 perform_state_reconciliation(ctx.as_ref())
626 .await
627 .context("initial state reconciliation failed")?;
628 }
629 let backoff = Duration::from_millis(200);
630 let mut orchestrator_timer = tokio::time::interval(Duration::from_secs(1));
631
632 while !self.shutdown.triggered() {
633 let mut progressed = false;
634
635 if let Some(tick) = self.stream.next_tick().await? {
636 progressed = true;
637 self.event_bus.publish(Event::Tick(TickEvent { tick }));
638 }
639
640 if let Some(candle) = self.stream.next_candle().await? {
641 progressed = true;
642 self.event_bus
643 .publish(Event::Candle(CandleEvent { candle }));
644 }
645
646 if let Some(book) = self.stream.next_order_book().await? {
647 progressed = true;
648 self.event_bus
649 .publish(Event::OrderBook(OrderBookEvent { order_book: book }));
650 }
651
652 tokio::select! {
653 biased;
654 Some(event) = self.private_event_rx.recv() => {
655 progressed = true;
656 match event {
657 BrokerEvent::OrderUpdate(order) => {
658 info!(
659 order_id = %order.id,
660 status = ?order.status,
661 symbol = %order.request.symbol,
662 "received private order update"
663 );
664 self.event_bus
665 .publish(Event::OrderUpdate(OrderUpdateEvent { order }));
666 }
667 BrokerEvent::Fill(fill) => {
668 info!(
669 order_id = %fill.order_id,
670 symbol = %fill.symbol,
671 qty = %fill.fill_quantity,
672 price = %fill.fill_price,
673 "received private fill"
674 );
675 self.event_bus.publish(Event::Fill(FillEvent { fill }));
676 }
677 }
678 }
679 _ = orchestrator_timer.tick() => {
680 if let Err(e) = self.orchestrator.on_timer_tick().await {
682 error!("Orchestrator timer tick failed: {}", e);
683 }
684 }
685 else => {}
686 }
687
688 if !progressed && !self.shutdown.sleep(backoff).await {
689 break;
690 }
691 }
692 info!("live session stopping");
693 self.metrics_task.abort();
694 if let Some(handle) = self.alert_task.take() {
695 handle.abort();
696 }
697 if let Some(handle) = self.reconciliation_task.take() {
698 handle.abort();
699 }
700 self.order_timeout_task.abort();
701 for handle in self.subscriber_handles.drain(..) {
702 handle.abort();
703 }
704 for handle in self.connection_monitors.drain(..) {
705 handle.abort();
706 }
707 if let Err(err) = self.save_state().await {
708 warn!(error = %err, "failed to persist shutdown state");
709 }
710 Ok(())
711 }
712
713 async fn save_state(&self) -> Result<()> {
714 persist_state(
715 self.state_repo.clone(),
716 self.persisted.clone(),
717 Some(self.strategy.clone()),
718 )
719 .await
720 }
721}
722
723struct ReconciliationContext {
724 client: Arc<dyn ExecutionClient>,
725 portfolio: Arc<Mutex<Portfolio>>,
726 persisted: Arc<Mutex<LiveState>>,
727 state_repo: Arc<dyn StateRepository>,
728 alerts: Arc<AlertManager>,
729 metrics: Arc<LiveMetrics>,
730 reporting_currency: Symbol,
731 threshold: Decimal,
732}
733
734struct ReconciliationContextConfig {
735 client: Arc<dyn ExecutionClient>,
736 portfolio: Arc<Mutex<Portfolio>>,
737 persisted: Arc<Mutex<LiveState>>,
738 state_repo: Arc<dyn StateRepository>,
739 alerts: Arc<AlertManager>,
740 metrics: Arc<LiveMetrics>,
741 reporting_currency: Symbol,
742 threshold: Decimal,
743}
744
745impl ReconciliationContext {
746 fn new(config: ReconciliationContextConfig) -> Self {
747 let ReconciliationContextConfig {
748 client,
749 portfolio,
750 persisted,
751 state_repo,
752 alerts,
753 metrics,
754 reporting_currency,
755 threshold,
756 } = config;
757 let min_threshold = Decimal::new(1, 6); let threshold = if threshold <= Decimal::ZERO {
759 min_threshold
760 } else {
761 threshold
762 };
763 Self {
764 client,
765 portfolio,
766 persisted,
767 state_repo,
768 alerts,
769 metrics,
770 reporting_currency,
771 threshold,
772 }
773 }
774}
775
776fn spawn_reconciliation_loop(
777 ctx: Arc<ReconciliationContext>,
778 shutdown: ShutdownSignal,
779 interval: Duration,
780) -> JoinHandle<()> {
781 tokio::spawn(async move {
782 while shutdown.sleep(interval).await {
783 if let Err(err) = perform_state_reconciliation(ctx.as_ref()).await {
784 error!(error = %err, "periodic state reconciliation failed");
785 }
786 }
787 })
788}
789
790async fn perform_state_reconciliation(ctx: &ReconciliationContext) -> Result<()> {
791 info!("running state reconciliation");
792 let remote_positions = ctx
793 .client
794 .positions()
795 .await
796 .context("failed to fetch remote positions")?;
797 let remote_balances = ctx
798 .client
799 .account_balances()
800 .await
801 .context("failed to fetch remote balances")?;
802 let (local_positions, local_cash) = {
803 let guard = ctx.portfolio.lock().await;
804 (guard.positions(), guard.cash())
805 };
806
807 let remote_map = positions_to_map(remote_positions);
808 let local_map = positions_to_map(local_positions);
809 let mut tracked_symbols: HashSet<String> = HashSet::new();
810 tracked_symbols.extend(remote_map.keys().cloned());
811 tracked_symbols.extend(local_map.keys().cloned());
812
813 let mut severe_findings = Vec::new();
814 for symbol in tracked_symbols {
815 let local_qty = local_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
816 let remote_qty = remote_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
817 let diff = (local_qty - remote_qty).abs();
818 let diff_value = diff.to_f64().unwrap_or(0.0);
819 ctx.metrics.update_position_diff(&symbol, diff_value);
820 if diff > Decimal::ZERO {
821 warn!(
822 symbol = %symbol,
823 local = %local_qty,
824 remote = %remote_qty,
825 diff = %diff,
826 "position mismatch detected during reconciliation"
827 );
828 let pct = normalize_diff(diff, remote_qty);
829 if pct >= ctx.threshold {
830 error!(
831 symbol = %symbol,
832 local = %local_qty,
833 remote = %remote_qty,
834 diff = %diff,
835 pct = %pct,
836 "position mismatch exceeds threshold"
837 );
838 severe_findings.push(format!(
839 "{symbol} local={local_qty} remote={remote_qty} diff={diff}"
840 ));
841 }
842 }
843 }
844
845 let reporting = ctx.reporting_currency.as_str();
846 let remote_cash = remote_balances
847 .iter()
848 .find(|balance| balance.currency == reporting)
849 .map(|balance| balance.available)
850 .unwrap_or_else(|| Decimal::ZERO);
851 let cash_diff = (remote_cash - local_cash).abs();
852 ctx.metrics
853 .update_balance_diff(reporting, cash_diff.to_f64().unwrap_or(0.0));
854 if cash_diff > Decimal::ZERO {
855 warn!(
856 currency = %reporting,
857 local = %local_cash,
858 remote = %remote_cash,
859 diff = %cash_diff,
860 "balance mismatch detected during reconciliation"
861 );
862 let pct = normalize_diff(cash_diff, remote_cash);
863 if pct >= ctx.threshold {
864 error!(
865 currency = %reporting,
866 local = %local_cash,
867 remote = %remote_cash,
868 diff = %cash_diff,
869 pct = %pct,
870 "balance mismatch exceeds threshold"
871 );
872 severe_findings.push(format!(
873 "{reporting} balance local={local_cash} remote={remote_cash} diff={cash_diff}"
874 ));
875 }
876 }
877
878 if severe_findings.is_empty() {
879 info!("state reconciliation complete with no critical divergence");
880 return Ok(());
881 }
882
883 let alert_body = severe_findings.join("; ");
884 ctx.alerts
885 .notify("State reconciliation divergence", &alert_body)
886 .await;
887 enforce_liquidate_only(ctx).await;
888 Ok(())
889}
890
891async fn enforce_liquidate_only(ctx: &ReconciliationContext) {
892 let snapshot = {
893 let mut guard = ctx.portfolio.lock().await;
894 if !guard.set_liquidate_only(true) {
895 return;
896 }
897 info!("entering liquidate-only mode due to reconciliation divergence");
898 guard.snapshot()
899 };
900 {
901 let mut state = ctx.persisted.lock().await;
902 state.portfolio = Some(snapshot);
903 }
904 if let Err(err) = persist_state(ctx.state_repo.clone(), ctx.persisted.clone(), None).await {
905 warn!(error = %err, "failed to persist liquidate-only transition");
906 }
907}
908
909fn positions_to_map(positions: Vec<Position>) -> HashMap<String, Decimal> {
910 let mut map = HashMap::new();
911 for position in positions {
912 map.insert(position.symbol.clone(), position_signed_qty(&position));
913 }
914 map
915}
916
917fn position_signed_qty(position: &Position) -> Decimal {
918 match position.side {
919 Some(Side::Buy) => position.quantity,
920 Some(Side::Sell) => -position.quantity,
921 None => Decimal::ZERO,
922 }
923}
924
925fn normalize_diff(diff: Decimal, reference: Decimal) -> Decimal {
926 if diff <= Decimal::ZERO {
927 Decimal::ZERO
928 } else {
929 let denominator = std::cmp::max(reference.abs(), Decimal::ONE);
930 diff / denominator
931 }
932}
933
934#[derive(Default)]
935struct MarketSnapshot {
936 last_trade: Option<Price>,
937 last_trade_ts: Option<DateTime<Utc>>,
938 last_candle: Option<Candle>,
939}
940
941impl MarketSnapshot {
942 fn price(&self) -> Option<Price> {
943 self.last_trade
944 .or_else(|| self.last_candle.as_ref().map(|c| c.close))
945 }
946}
947
948#[derive(Clone)]
949pub struct ShutdownSignal {
950 flag: Arc<AtomicBool>,
951 notify: Arc<Notify>,
952}
953
954impl ShutdownSignal {
955 pub fn new() -> Self {
956 let flag = Arc::new(AtomicBool::new(false));
957 let notify = Arc::new(Notify::new());
958 let flag_clone = flag.clone();
959 let notify_clone = notify.clone();
960 tokio::spawn(async move {
961 if tokio::signal::ctrl_c().await.is_ok() {
962 flag_clone.store(true, Ordering::SeqCst);
963 notify_clone.notify_waiters();
964 }
965 });
966 Self { flag, notify }
967 }
968
969 pub fn trigger(&self) {
970 self.flag.store(true, Ordering::SeqCst);
971 self.notify.notify_waiters();
972 }
973
974 fn triggered(&self) -> bool {
975 self.flag.load(Ordering::SeqCst)
976 }
977
978 async fn sleep(&self, duration: Duration) -> bool {
979 tokio::select! {
980 _ = tokio::time::sleep(duration) => true,
981 _ = self.notify.notified() => false,
982 }
983 }
984}
985
986impl Default for ShutdownSignal {
987 fn default() -> Self {
988 Self::new()
989 }
990}
991
992#[allow(clippy::too_many_arguments)]
993fn spawn_event_subscribers(
994 bus: Arc<EventBus>,
995 strategy: Arc<Mutex<Box<dyn Strategy>>>,
996 strategy_ctx: Arc<Mutex<StrategyContext>>,
997 orchestrator: Arc<OrderOrchestrator>,
998 portfolio: Arc<Mutex<Portfolio>>,
999 metrics: Arc<LiveMetrics>,
1000 alerts: Arc<AlertManager>,
1001 market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1002 state_repo: Arc<dyn StateRepository>,
1003 persisted: Arc<Mutex<LiveState>>,
1004 exec_backend: ExecutionBackend,
1005) -> Vec<JoinHandle<()>> {
1006 let mut handles = Vec::new();
1007
1008 let market_bus = bus.clone();
1009 let market_strategy = strategy.clone();
1010 let market_ctx = strategy_ctx.clone();
1011 let market_metrics = metrics.clone();
1012 let market_alerts = alerts.clone();
1013 let market_state = state_repo.clone();
1014 let market_persisted = persisted.clone();
1015 let market_portfolio = portfolio.clone();
1016 let market_snapshot = market.clone();
1017 let orchestrator_clone = orchestrator.clone();
1018 handles.push(tokio::spawn(async move {
1019 let mut stream = market_bus.subscribe();
1020 loop {
1021 match stream.recv().await {
1022 Ok(Event::Tick(evt)) => {
1023 if let Err(err) = process_tick_event(
1024 evt.tick,
1025 market_strategy.clone(),
1026 market_ctx.clone(),
1027 market_metrics.clone(),
1028 market_alerts.clone(),
1029 market_snapshot.clone(),
1030 market_portfolio.clone(),
1031 market_state.clone(),
1032 market_persisted.clone(),
1033 market_bus.clone(),
1034 )
1035 .await
1036 {
1037 warn!(error = %err, "tick handler failed");
1038 }
1039 }
1040 Ok(Event::Candle(evt)) => {
1041 if let Err(err) = process_candle_event(
1042 evt.candle,
1043 market_strategy.clone(),
1044 market_ctx.clone(),
1045 market_metrics.clone(),
1046 market_alerts.clone(),
1047 market_snapshot.clone(),
1048 market_portfolio.clone(),
1049 orchestrator_clone.clone(),
1050 exec_backend,
1051 market_state.clone(),
1052 market_persisted.clone(),
1053 market_bus.clone(),
1054 )
1055 .await
1056 {
1057 warn!(error = %err, "candle handler failed");
1058 }
1059 }
1060 Ok(Event::OrderBook(evt)) => {
1061 if let Err(err) = process_order_book_event(
1062 evt.order_book,
1063 market_strategy.clone(),
1064 market_ctx.clone(),
1065 market_metrics.clone(),
1066 market_alerts.clone(),
1067 market_snapshot.clone(),
1068 market_bus.clone(),
1069 )
1070 .await
1071 {
1072 warn!(error = %err, "order book handler failed");
1073 }
1074 }
1075 Ok(_) => {}
1076 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1077 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1078 warn!(lag = lag, "market subscriber lagged");
1079 continue;
1080 }
1081 }
1082 }
1083 }));
1084
1085 let exec_bus = bus.clone();
1086 let exec_portfolio = portfolio.clone();
1087 let exec_market = market.clone();
1088 let exec_persisted = persisted.clone();
1089 let exec_alerts = alerts.clone();
1090 let exec_metrics = metrics.clone();
1091 let exec_orchestrator = orchestrator.clone();
1092 handles.push(tokio::spawn(async move {
1093 let orchestrator = exec_orchestrator.clone();
1094 let mut stream = exec_bus.subscribe();
1095 loop {
1096 match stream.recv().await {
1097 Ok(Event::Signal(evt)) => {
1098 if let Err(err) = process_signal_event(
1099 evt.signal,
1100 orchestrator.clone(),
1101 exec_portfolio.clone(),
1102 exec_market.clone(),
1103 exec_persisted.clone(),
1104 exec_alerts.clone(),
1105 exec_metrics.clone(),
1106 )
1107 .await
1108 {
1109 warn!(error = %err, "signal handler failed");
1110 }
1111 }
1112 Ok(_) => {}
1113 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1114 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1115 warn!(lag = lag, "signal subscriber lagged");
1116 continue;
1117 }
1118 }
1119 }
1120 }));
1121
1122 let fill_bus = bus.clone();
1123 let fill_state = state_repo.clone();
1124 let fill_orchestrator = orchestrator.clone();
1125 let fill_persisted = persisted.clone();
1126 let fill_alerts = alerts.clone();
1127 handles.push(tokio::spawn(async move {
1128 let orchestrator = fill_orchestrator.clone();
1129 let persisted = fill_persisted.clone();
1130 let mut stream = fill_bus.subscribe();
1131 loop {
1132 match stream.recv().await {
1133 Ok(Event::Fill(evt)) => {
1134 if let Err(err) = process_fill_event(
1135 evt.fill,
1136 portfolio.clone(),
1137 strategy.clone(),
1138 strategy_ctx.clone(),
1139 orchestrator.clone(),
1140 metrics.clone(),
1141 fill_alerts.clone(),
1142 fill_state.clone(),
1143 persisted.clone(),
1144 )
1145 .await
1146 {
1147 warn!(error = %err, "fill handler failed");
1148 }
1149 }
1150 Ok(_) => {}
1151 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1152 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1153 warn!(lag = lag, "fill subscriber lagged");
1154 continue;
1155 }
1156 }
1157 }
1158 }));
1159
1160 let order_bus = bus.clone();
1161 let order_persisted = persisted.clone();
1162 let order_alerts = alerts.clone();
1163 let order_orchestrator = orchestrator.clone();
1164 handles.push(tokio::spawn(async move {
1167 let orchestrator = order_orchestrator.clone();
1168 let persisted = order_persisted.clone();
1169 let mut stream = order_bus.subscribe();
1170 loop {
1171 match stream.recv().await {
1172 Ok(Event::OrderUpdate(evt)) => {
1173 if let Err(err) = process_order_update_event(
1174 evt.order,
1175 orchestrator.clone(),
1176 order_alerts.clone(),
1177 state_repo.clone(),
1178 persisted.clone(),
1179 )
1180 .await
1181 {
1182 warn!(error = %err, "order update handler failed");
1183 }
1184 }
1185 Ok(_) => {}
1186 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1187 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1188 warn!(lag = lag, "order subscriber lagged");
1189 continue;
1190 }
1191 }
1192 }
1193 }));
1194
1195 handles
1196}
1197
1198#[allow(clippy::too_many_arguments)]
1199async fn process_tick_event(
1200 tick: Tick,
1201 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1202 strategy_ctx: Arc<Mutex<StrategyContext>>,
1203 metrics: Arc<LiveMetrics>,
1204 alerts: Arc<AlertManager>,
1205 market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1206 portfolio: Arc<Mutex<Portfolio>>,
1207 state_repo: Arc<dyn StateRepository>,
1208 persisted: Arc<Mutex<LiveState>>,
1209 bus: Arc<EventBus>,
1210) -> Result<()> {
1211 metrics.inc_tick();
1212 metrics.update_staleness(0.0);
1213 metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1214 alerts.heartbeat().await;
1215 {
1216 let mut guard = market.lock().await;
1217 if let Some(snapshot) = guard.get_mut(&tick.symbol) {
1218 snapshot.last_trade = Some(tick.price);
1219 snapshot.last_trade_ts = Some(tick.exchange_timestamp);
1220 }
1221 }
1222 let mut drawdown_triggered = false;
1223 let mut snapshot_on_trigger = None;
1224 {
1225 let mut guard = portfolio.lock().await;
1226 let was_liquidate_only = guard.liquidate_only();
1227 match guard.update_market_data(&tick.symbol, tick.price) {
1228 Ok(_) => {
1229 if !was_liquidate_only && guard.liquidate_only() {
1230 drawdown_triggered = true;
1231 snapshot_on_trigger = Some(guard.snapshot());
1232 }
1233 }
1234 Err(err) => {
1235 warn!(
1236 symbol = %tick.symbol,
1237 error = %err,
1238 "failed to refresh market data"
1239 );
1240 }
1241 }
1242 }
1243 {
1244 let mut state = persisted.lock().await;
1245 state.last_prices.insert(tick.symbol.clone(), tick.price);
1246 if drawdown_triggered {
1247 if let Some(snapshot) = snapshot_on_trigger.take() {
1248 state.portfolio = Some(snapshot);
1249 }
1250 }
1251 }
1252 if drawdown_triggered {
1253 persist_state(
1254 state_repo.clone(),
1255 persisted.clone(),
1256 Some(strategy.clone()),
1257 )
1258 .await?;
1259 alert_liquidate_only(alerts.clone()).await;
1260 }
1261 {
1262 let mut ctx = strategy_ctx.lock().await;
1263 ctx.push_tick(tick.clone());
1264 let lock_start = Instant::now();
1265 let mut strat = strategy.lock().await;
1266 log_strategy_lock("tick", lock_start.elapsed());
1267 let call_start = Instant::now();
1268 strat
1269 .on_tick(&ctx, &tick)
1270 .await
1271 .context("strategy failure on tick event")?;
1272 log_strategy_call("tick", call_start.elapsed());
1273 }
1274 emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1275 Ok(())
1276}
1277
1278#[allow(clippy::too_many_arguments)]
1279async fn process_candle_event(
1280 candle: Candle,
1281 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1282 strategy_ctx: Arc<Mutex<StrategyContext>>,
1283 metrics: Arc<LiveMetrics>,
1284 alerts: Arc<AlertManager>,
1285 market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1286 portfolio: Arc<Mutex<Portfolio>>,
1287 orchestrator: Arc<OrderOrchestrator>,
1288 exec_backend: ExecutionBackend,
1289 state_repo: Arc<dyn StateRepository>,
1290 persisted: Arc<Mutex<LiveState>>,
1291 bus: Arc<EventBus>,
1292) -> Result<()> {
1293 metrics.inc_candle();
1294 metrics.update_staleness(0.0);
1295 metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1296 alerts.heartbeat().await;
1297 metrics.update_price(&candle.symbol, candle.close.to_f64().unwrap_or(0.0));
1298 {
1299 let mut guard = market.lock().await;
1300 if let Some(snapshot) = guard.get_mut(&candle.symbol) {
1301 snapshot.last_candle = Some(candle.clone());
1302 snapshot.last_trade = Some(candle.close);
1303 }
1304 }
1305 if exec_backend.is_paper() {
1306 let client = orchestrator.execution_engine().client();
1307 if let Some(paper) = client.as_any().downcast_ref::<PaperExecutionClient>() {
1308 paper.update_price(&candle.symbol, candle.close);
1309 }
1310 }
1311 let mut candle_drawdown_triggered = false;
1312 let mut candle_snapshot = None;
1313 {
1314 let mut guard = portfolio.lock().await;
1315 let was_liquidate_only = guard.liquidate_only();
1316 match guard.update_market_data(&candle.symbol, candle.close) {
1317 Ok(_) => {
1318 if !was_liquidate_only && guard.liquidate_only() {
1319 candle_drawdown_triggered = true;
1320 candle_snapshot = Some(guard.snapshot());
1321 }
1322 }
1323 Err(err) => {
1324 warn!(
1325 symbol = %candle.symbol,
1326 error = %err,
1327 "failed to refresh market data"
1328 );
1329 }
1330 }
1331 }
1332 if candle_drawdown_triggered {
1333 if let Some(snapshot) = candle_snapshot.take() {
1334 let mut persisted_guard = persisted.lock().await;
1335 persisted_guard.portfolio = Some(snapshot);
1336 }
1337 alert_liquidate_only(alerts.clone()).await;
1338 }
1339 {
1340 let mut ctx = strategy_ctx.lock().await;
1341 ctx.push_candle(candle.clone());
1342 let lock_start = Instant::now();
1343 let mut strat = strategy.lock().await;
1344 log_strategy_lock("candle", lock_start.elapsed());
1345 let call_start = Instant::now();
1346 strat
1347 .on_candle(&ctx, &candle)
1348 .await
1349 .context("strategy failure on candle event")?;
1350 log_strategy_call("candle", call_start.elapsed());
1351 }
1352 {
1353 let mut snapshot = persisted.lock().await;
1354 snapshot.last_candle_ts = Some(candle.timestamp);
1355 snapshot
1356 .last_prices
1357 .insert(candle.symbol.clone(), candle.close);
1358 }
1359 persist_state(
1360 state_repo.clone(),
1361 persisted.clone(),
1362 Some(strategy.clone()),
1363 )
1364 .await?;
1365 let ctx = shared_risk_context(&candle.symbol, &portfolio, &market, &persisted).await;
1366 orchestrator.update_risk_context(candle.symbol.clone(), ctx);
1367 emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1368 Ok(())
1369}
1370
1371async fn process_order_book_event(
1372 book: OrderBook,
1373 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1374 strategy_ctx: Arc<Mutex<StrategyContext>>,
1375 metrics: Arc<LiveMetrics>,
1376 alerts: Arc<AlertManager>,
1377 _market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1378 bus: Arc<EventBus>,
1379) -> Result<()> {
1380 metrics.update_staleness(0.0);
1381 alerts.heartbeat().await;
1382 {
1383 let mut ctx = strategy_ctx.lock().await;
1384 ctx.push_order_book(book.clone());
1385 let lock_start = Instant::now();
1386 let mut strat = strategy.lock().await;
1387 log_strategy_lock("order_book", lock_start.elapsed());
1388 let call_start = Instant::now();
1389 strat
1390 .on_order_book(&ctx, &book)
1391 .await
1392 .context("strategy failure on order book")?;
1393 log_strategy_call("order_book", call_start.elapsed());
1394 }
1395 emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1396 Ok(())
1397}
1398
1399async fn process_signal_event(
1400 signal: Signal,
1401 orchestrator: Arc<OrderOrchestrator>,
1402 portfolio: Arc<Mutex<Portfolio>>,
1403 market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1404 persisted: Arc<Mutex<LiveState>>,
1405 alerts: Arc<AlertManager>,
1406 metrics: Arc<LiveMetrics>,
1407) -> Result<()> {
1408 let ctx = shared_risk_context(&signal.symbol, &portfolio, &market, &persisted).await;
1409 orchestrator.update_risk_context(signal.symbol.clone(), ctx);
1410 match orchestrator.on_signal(&signal, &ctx).await {
1411 Ok(()) => {
1412 alerts.reset_order_failures().await;
1413 }
1414 Err(err) => {
1415 metrics.inc_order_failure();
1416 alerts
1417 .order_failure(&format!("orchestrator error: {err}"))
1418 .await;
1419 }
1420 }
1421 Ok(())
1422}
1423
1424fn log_strategy_lock(event: &str, wait: Duration) {
1425 let wait_ms = wait.as_secs_f64() * 1000.0;
1426 if wait >= STRATEGY_LOCK_WARN_THRESHOLD {
1427 warn!(target: "strategy", event, wait_ms, "strategy lock wait exceeded threshold");
1428 } else {
1429 trace!(target: "strategy", event, wait_ms, "strategy lock acquired");
1430 }
1431}
1432
1433fn log_strategy_call(event: &str, elapsed: Duration) {
1434 let duration_ms = elapsed.as_secs_f64() * 1000.0;
1435 if elapsed >= STRATEGY_CALL_WARN_THRESHOLD {
1436 warn!(target: "strategy", event, duration_ms, "strategy call latency above threshold");
1437 } else {
1438 trace!(target: "strategy", event, duration_ms, "strategy call completed");
1439 }
1440}
1441
1442#[allow(clippy::too_many_arguments)]
1443async fn process_fill_event(
1444 fill: Fill,
1445 portfolio: Arc<Mutex<Portfolio>>,
1446 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1447 strategy_ctx: Arc<Mutex<StrategyContext>>,
1448 orchestrator: Arc<OrderOrchestrator>,
1449 metrics: Arc<LiveMetrics>,
1450 alerts: Arc<AlertManager>,
1451 state_repo: Arc<dyn StateRepository>,
1452 persisted: Arc<Mutex<LiveState>>,
1453) -> Result<()> {
1454 let mut drawdown_triggered = false;
1455 {
1456 let mut guard = portfolio.lock().await;
1457 let was_liquidate_only = guard.liquidate_only();
1458 guard
1459 .apply_fill(&fill)
1460 .context("Failed to apply fill to portfolio")?;
1461 if !was_liquidate_only && guard.liquidate_only() {
1462 drawdown_triggered = true;
1463 }
1464 let snapshot = guard.snapshot();
1465 let mut persisted_guard = persisted.lock().await;
1466 persisted_guard.portfolio = Some(snapshot);
1467 }
1468 {
1469 let positions = {
1470 let guard = portfolio.lock().await;
1471 guard.positions()
1472 };
1473 let mut ctx = strategy_ctx.lock().await;
1474 ctx.update_positions(positions);
1475 }
1476 orchestrator.on_fill(&fill).await.ok();
1477 {
1478 let ctx = strategy_ctx.lock().await;
1479 let lock_start = Instant::now();
1480 let mut strat = strategy.lock().await;
1481 log_strategy_lock("fill", lock_start.elapsed());
1482 let call_start = Instant::now();
1483 strat
1484 .on_fill(&ctx, &fill)
1485 .await
1486 .context("Strategy failed on fill event")?;
1487 log_strategy_call("fill", call_start.elapsed());
1488 }
1489 let equity = {
1490 let guard = portfolio.lock().await;
1491 guard.equity()
1492 };
1493 if let Some(value) = equity.to_f64() {
1494 metrics.update_equity(value);
1495 }
1496 alerts.update_equity(equity).await;
1497 metrics.inc_order();
1498 alerts
1499 .notify(
1500 "Order Filled",
1501 &format!(
1502 "order filled: {}@{} ({})",
1503 fill.fill_quantity,
1504 fill.fill_price,
1505 match fill.side {
1506 Side::Buy => "buy",
1507 Side::Sell => "sell",
1508 }
1509 ),
1510 )
1511 .await;
1512 if drawdown_triggered {
1513 alert_liquidate_only(alerts.clone()).await;
1514 }
1515 persist_state(
1516 state_repo.clone(),
1517 persisted.clone(),
1518 Some(strategy.clone()),
1519 )
1520 .await?;
1521 Ok(())
1522}
1523
1524async fn process_order_update_event(
1525 order: Order,
1526 orchestrator: Arc<OrderOrchestrator>,
1527 alerts: Arc<AlertManager>,
1528 state_repo: Arc<dyn StateRepository>,
1529 persisted: Arc<Mutex<LiveState>>,
1530) -> Result<()> {
1531 orchestrator.on_order_update(&order);
1532 if matches!(order.status, OrderStatus::Rejected) {
1533 error!(
1534 order_id = %order.id,
1535 symbol = %order.request.symbol,
1536 "order rejected by exchange"
1537 );
1538 alerts.order_failure("order rejected by exchange").await;
1539 alerts
1540 .notify(
1541 "Order rejected",
1542 &format!(
1543 "Order {} for {} was rejected",
1544 order.id, order.request.symbol
1545 ),
1546 )
1547 .await;
1548 }
1549 {
1550 let mut snapshot = persisted.lock().await;
1551 let mut found = false;
1552 for existing in &mut snapshot.open_orders {
1553 if existing.id == order.id {
1554 *existing = order.clone();
1555 found = true;
1556 break;
1557 }
1558 }
1559 if !found {
1560 snapshot.open_orders.push(order.clone());
1561 }
1562 if matches!(
1563 order.status,
1564 OrderStatus::Filled | OrderStatus::Canceled | OrderStatus::Rejected
1565 ) {
1566 snapshot.open_orders.retain(|o| o.id != order.id);
1567 }
1568 }
1569 persist_state(state_repo, persisted, None).await?;
1570 Ok(())
1571}
1572
1573async fn emit_signals(
1574 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1575 bus: Arc<EventBus>,
1576 metrics: Arc<LiveMetrics>,
1577) {
1578 let signals = {
1579 let mut strat = strategy.lock().await;
1580 strat.drain_signals()
1581 };
1582 if signals.is_empty() {
1583 return;
1584 }
1585 metrics.inc_signals(signals.len());
1586 for signal in signals {
1587 bus.publish(Event::Signal(SignalEvent { signal }));
1588 }
1589}
1590
1591async fn persist_state(
1592 repo: Arc<dyn StateRepository>,
1593 persisted: Arc<Mutex<LiveState>>,
1594 strategy: Option<Arc<Mutex<Box<dyn Strategy>>>>,
1595) -> Result<()> {
1596 if let Some(strat_lock) = strategy {
1597 let strat = strat_lock.lock().await;
1599 if let Ok(json_state) = strat.snapshot() {
1600 let mut guard = persisted.lock().await;
1601 guard.strategy_state = Some(json_state);
1602 } else {
1603 warn!("failed to snapshot strategy state");
1604 }
1605 }
1606
1607 let snapshot = {
1608 let guard = persisted.lock().await;
1609 guard.clone()
1610 };
1611 tokio::task::spawn_blocking(move || repo.save(&snapshot))
1612 .await
1613 .map_err(|err| anyhow!("state persistence task failed: {err}"))?
1614 .map_err(|err| anyhow!(err.to_string()))
1615}
1616
1617async fn shared_risk_context(
1618 symbol: &str,
1619 portfolio: &Arc<Mutex<Portfolio>>,
1620 market: &Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1621 persisted: &Arc<Mutex<LiveState>>,
1622) -> RiskContext {
1623 let (signed_qty, equity, liquidate_only) = {
1624 let guard = portfolio.lock().await;
1625 (
1626 guard.signed_position_qty(symbol),
1627 guard.equity(),
1628 guard.liquidate_only(),
1629 )
1630 };
1631 let observed_price = {
1632 let guard = market.lock().await;
1633 guard.get(symbol).and_then(|snapshot| snapshot.price())
1634 };
1635 let last_price = if let Some(price) = observed_price {
1636 price
1637 } else {
1638 let guard = persisted.lock().await;
1639 guard
1640 .last_prices
1641 .get(symbol)
1642 .copied()
1643 .unwrap_or(Decimal::ZERO)
1644 };
1645 RiskContext {
1646 signed_position_qty: signed_qty,
1647 portfolio_equity: equity,
1648 last_price,
1649 liquidate_only,
1650 }
1651}
1652
1653async fn alert_liquidate_only(alerts: Arc<AlertManager>) {
1654 alerts
1655 .notify(
1656 "Max drawdown triggered",
1657 "Portfolio entered liquidate-only mode; new exposure blocked until review",
1658 )
1659 .await;
1660}
1661
1662fn spawn_connection_monitor(
1663 shutdown: ShutdownSignal,
1664 flag: Arc<AtomicBool>,
1665 metrics: Arc<LiveMetrics>,
1666 stream: &'static str,
1667) -> JoinHandle<()> {
1668 tokio::spawn(async move {
1669 loop {
1670 metrics.update_connection_status(stream, flag.load(Ordering::SeqCst));
1671 if !shutdown.sleep(Duration::from_secs(5)).await {
1672 break;
1673 }
1674 }
1675 })
1676}
1677
1678fn spawn_order_timeout_monitor(
1679 orchestrator: Arc<OrderOrchestrator>,
1680 bus: Arc<EventBus>,
1681 alerts: Arc<AlertManager>,
1682 shutdown: ShutdownSignal,
1683) -> JoinHandle<()> {
1684 tokio::spawn(async move {
1685 let mut ticker = tokio::time::interval(tesser_execution::orchestrator::ORDER_POLL_INTERVAL);
1686 loop {
1687 ticker.tick().await;
1688 if shutdown.triggered() {
1689 break;
1690 }
1691 match orchestrator.poll_stale_orders().await {
1692 Ok(updates) => {
1693 for order in updates {
1694 if matches!(order.status, OrderStatus::Rejected | OrderStatus::Canceled) {
1695 let message = format!(
1696 "Order {} for {} timed out after {}s",
1697 order.id,
1698 order.request.symbol,
1699 tesser_execution::orchestrator::ORDER_TIMEOUT.as_secs()
1700 );
1701 error!(%message);
1702 alerts.order_failure(&message).await;
1703 alerts.notify("Order timeout", &message).await;
1704 }
1705 bus.publish(Event::OrderUpdate(OrderUpdateEvent { order }));
1706 }
1707 }
1708 Err(err) => {
1709 warn!(error = %err, "order timeout monitor failed");
1710 }
1711 }
1712 }
1713 })
1714}
1715
1716async fn load_market_registry(
1717 client: Arc<dyn ExecutionClient>,
1718 settings: &LiveSessionSettings,
1719) -> Result<Arc<MarketRegistry>> {
1720 if let Some(path) = &settings.markets_file {
1721 let registry = MarketRegistry::load_from_file(path)
1722 .with_context(|| format!("failed to load markets from {}", path.display()))?;
1723 return Ok(Arc::new(registry));
1724 }
1725
1726 if settings.exec_backend.is_paper() {
1727 return Err(anyhow!(
1728 "paper execution requires --markets-file when exchange metadata is unavailable"
1729 ));
1730 }
1731
1732 let instruments = client
1733 .list_instruments(settings.category.as_path())
1734 .await
1735 .context("failed to fetch instruments from execution client")?;
1736 let registry =
1737 MarketRegistry::from_instruments(instruments).map_err(|err| anyhow!(err.to_string()))?;
1738 Ok(Arc::new(registry))
1739}