1use std::collections::{HashMap, HashSet};
2use std::net::SocketAddr;
3use std::path::PathBuf;
4use std::sync::{
5 atomic::{AtomicBool, AtomicI64, Ordering},
6 Arc, Once,
7};
8use std::time::{Duration, Instant};
9
10use anyhow::{anyhow, bail, Context, Result};
11use chrono::{DateTime, Utc};
12use clap::ValueEnum;
13use futures::StreamExt;
14use rust_decimal::{prelude::ToPrimitive, Decimal};
15use tokio::sync::{mpsc, Mutex, Notify};
16use tokio::task::JoinHandle;
17use tokio_tungstenite::tungstenite::Message;
18use tracing::{error, info, trace, warn};
19
20use serde_json::{json, Value};
21
22fn ensure_builtin_connectors_registered() {
23 static INIT: Once = Once::new();
24 INIT.call_once(|| {
25 register_connector_factory(Arc::new(PaperFactory::default()));
26 #[cfg(feature = "bybit")]
27 register_bybit_factory();
28 #[cfg(feature = "binance")]
29 register_binance_factory();
30 });
31}
32
33#[cfg(feature = "binance")]
34use tesser_binance::{
35 fill_from_update, order_from_update, register_factory as register_binance_factory,
36 ws::{extract_order_update, BinanceUserDataStream, UserDataStreamEventsResponse},
37 BinanceClient,
38};
39use tesser_broker::{
40 get_connector_factory, register_connector_factory, BrokerResult, ConnectorFactory,
41 ConnectorStream, ConnectorStreamConfig, ExecutionClient,
42};
43#[cfg(feature = "bybit")]
44use tesser_bybit::ws::{BybitWsExecution, BybitWsOrder, PrivateMessage};
45#[cfg(feature = "bybit")]
46use tesser_bybit::{register_factory as register_bybit_factory, BybitClient, BybitCredentials};
47use tesser_config::{AlertingConfig, ExchangeConfig, RiskManagementConfig};
48use tesser_core::{
49 Candle, Fill, Interval, Order, OrderBook, OrderStatus, Position, Price, Quantity, Side, Signal,
50 Symbol, Tick,
51};
52use tesser_data::recorder::{ParquetRecorder, RecorderConfig, RecorderHandle};
53use tesser_events::{
54 CandleEvent, Event, EventBus, FillEvent, OrderBookEvent, OrderUpdateEvent, SignalEvent,
55 TickEvent,
56};
57use tesser_execution::{
58 BasicRiskChecker, ExecutionEngine, FixedOrderSizer, OrderOrchestrator, PreTradeRiskChecker,
59 RiskContext, RiskLimits, SqliteAlgoStateRepository,
60};
61use tesser_markets::MarketRegistry;
62use tesser_paper::{PaperExecutionClient, PaperFactory};
63use tesser_portfolio::{
64 LiveState, Portfolio, PortfolioConfig, SqliteStateRepository, StateRepository,
65};
66use tesser_strategy::{Strategy, StrategyContext};
67
68use crate::alerts::{AlertDispatcher, AlertManager};
69use crate::control;
70use crate::telemetry::{spawn_metrics_server, LiveMetrics};
71use crate::PublicChannel;
72
73#[derive(Debug)]
75pub enum BrokerEvent {
76 OrderUpdate(Order),
77 Fill(Fill),
78}
79
80#[derive(Clone, Copy, Debug, Eq, PartialEq, ValueEnum)]
81#[value(rename_all = "kebab-case")]
82pub enum ExecutionBackend {
83 Paper,
84 Live,
85}
86
87impl ExecutionBackend {
88 fn is_paper(self) -> bool {
89 matches!(self, Self::Paper)
90 }
91}
92
93const DEFAULT_ORDER_BOOK_DEPTH: usize = 50;
94
95pub const fn default_order_book_depth() -> usize {
96 DEFAULT_ORDER_BOOK_DEPTH
97}
98const STRATEGY_LOCK_WARN_THRESHOLD: Duration = Duration::from_millis(25);
99const STRATEGY_CALL_WARN_THRESHOLD: Duration = Duration::from_millis(250);
100
101#[async_trait::async_trait]
102trait LiveMarketStream: Send {
103 async fn next_tick(&mut self) -> BrokerResult<Option<Tick>>;
104 async fn next_candle(&mut self) -> BrokerResult<Option<Candle>>;
105 async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>>;
106}
107
108struct FactoryStreamAdapter {
109 inner: Box<dyn ConnectorStream>,
110}
111
112impl FactoryStreamAdapter {
113 fn new(inner: Box<dyn ConnectorStream>) -> Self {
114 Self { inner }
115 }
116}
117
118#[async_trait::async_trait]
119impl LiveMarketStream for FactoryStreamAdapter {
120 async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
121 self.inner.next_tick().await
122 }
123
124 async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
125 self.inner.next_candle().await
126 }
127
128 async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>> {
129 self.inner.next_order_book().await
130 }
131}
132
133pub struct LiveSessionSettings {
134 pub category: PublicChannel,
135 pub interval: Interval,
136 pub quantity: Quantity,
137 pub slippage_bps: Decimal,
138 pub fee_bps: Decimal,
139 pub history: usize,
140 pub metrics_addr: SocketAddr,
141 pub state_path: PathBuf,
142 pub initial_balances: HashMap<Symbol, Decimal>,
143 pub reporting_currency: Symbol,
144 pub markets_file: Option<PathBuf>,
145 pub alerting: AlertingConfig,
146 pub exec_backend: ExecutionBackend,
147 pub risk: RiskManagementConfig,
148 pub reconciliation_interval: Duration,
149 pub reconciliation_threshold: Decimal,
150 pub driver: String,
151 pub orderbook_depth: usize,
152 pub record_path: Option<PathBuf>,
153 pub control_addr: SocketAddr,
154}
155
156impl LiveSessionSettings {
157 fn risk_limits(&self) -> RiskLimits {
158 RiskLimits {
159 max_order_quantity: self.risk.max_order_quantity.max(Decimal::ZERO),
160 max_position_quantity: self.risk.max_position_quantity.max(Decimal::ZERO),
161 }
162 }
163}
164
165pub async fn run_live(
166 strategy: Box<dyn Strategy>,
167 symbols: Vec<String>,
168 exchange: ExchangeConfig,
169 settings: LiveSessionSettings,
170) -> Result<()> {
171 run_live_with_shutdown(strategy, symbols, exchange, settings, ShutdownSignal::new()).await
172}
173
174pub async fn run_live_with_shutdown(
176 strategy: Box<dyn Strategy>,
177 symbols: Vec<String>,
178 exchange: ExchangeConfig,
179 settings: LiveSessionSettings,
180 shutdown: ShutdownSignal,
181) -> Result<()> {
182 if symbols.is_empty() {
183 return Err(anyhow!("strategy did not declare any subscriptions"));
184 }
185 if settings.quantity <= Decimal::ZERO {
186 return Err(anyhow!("--quantity must be positive"));
187 }
188
189 let public_connection = Arc::new(AtomicBool::new(false));
190 let private_connection = if matches!(settings.exec_backend, ExecutionBackend::Live) {
191 Some(Arc::new(AtomicBool::new(false)))
192 } else {
193 None
194 };
195 ensure_builtin_connectors_registered();
196 let connector_payload = build_exchange_payload(&exchange, &settings);
197 let connector_factory = get_connector_factory(&settings.driver)
198 .ok_or_else(|| anyhow!("driver {} is not registered", settings.driver))?;
199 let stream_config = ConnectorStreamConfig {
200 ws_url: Some(exchange.ws_url.clone()),
201 metadata: json!({
202 "category": settings.category.as_path(),
203 "symbols": symbols.clone(),
204 "orderbook_depth": settings.orderbook_depth,
205 }),
206 connection_status: Some(public_connection.clone()),
207 };
208 let mut connector_stream = connector_factory
209 .create_market_stream(&connector_payload, stream_config)
210 .await
211 .map_err(|err| anyhow!("failed to create market stream: {err}"))?;
212 connector_stream
213 .subscribe(&symbols, settings.interval)
214 .await
215 .map_err(|err| anyhow!("failed to subscribe via connector: {err}"))?;
216 let market_stream: Box<dyn LiveMarketStream> =
217 Box::new(FactoryStreamAdapter::new(connector_stream));
218
219 let execution_client =
220 build_execution_client(&settings, connector_factory.clone(), &connector_payload).await?;
221 let market_registry = load_market_registry(execution_client.clone(), &settings).await?;
222 if matches!(settings.exec_backend, ExecutionBackend::Live) {
223 info!(
224 rest = %exchange.rest_url,
225 driver = ?settings.driver,
226 "live execution enabled via {:?} REST",
227 settings.driver
228 );
229 }
230 let risk_checker: Arc<dyn PreTradeRiskChecker> =
231 Arc::new(BasicRiskChecker::new(settings.risk_limits()));
232 let execution = ExecutionEngine::new(
233 execution_client.clone(),
234 Box::new(FixedOrderSizer {
235 quantity: settings.quantity,
236 }),
237 risk_checker,
238 );
239
240 let algo_repo_path = settings.state_path.with_extension("algos.db");
242 let algo_state_repo = Arc::new(SqliteAlgoStateRepository::new(&algo_repo_path)?);
243
244 let orchestrator = OrderOrchestrator::new(Arc::new(execution), algo_state_repo).await?;
246
247 let runtime = LiveRuntime::new(
248 market_stream,
249 strategy,
250 symbols,
251 orchestrator,
252 settings,
253 exchange.ws_url.clone(),
254 market_registry,
255 shutdown,
256 public_connection,
257 private_connection,
258 )
259 .await?;
260 runtime.run().await
261}
262
263async fn build_execution_client(
264 settings: &LiveSessionSettings,
265 connector_factory: Arc<dyn ConnectorFactory>,
266 connector_payload: &Value,
267) -> Result<Arc<dyn ExecutionClient>> {
268 match settings.exec_backend {
269 ExecutionBackend::Paper => {
270 if settings.driver == "paper" {
271 return connector_factory
272 .create_execution_client(connector_payload)
273 .await
274 .map_err(|err| anyhow!("failed to create execution client: {err}"));
275 }
276 Ok(Arc::new(PaperExecutionClient::new(
277 "paper".to_string(),
278 vec!["BTCUSDT".to_string()],
279 settings.slippage_bps,
280 settings.fee_bps,
281 )))
282 }
283 ExecutionBackend::Live => connector_factory
284 .create_execution_client(connector_payload)
285 .await
286 .map_err(|err| anyhow!("failed to create execution client: {err}")),
287 }
288}
289
290struct LiveRuntime {
291 market: Box<dyn LiveMarketStream>,
292 orchestrator: Arc<OrderOrchestrator>,
293 state_repo: Arc<dyn StateRepository>,
294 persisted: Arc<Mutex<LiveState>>,
295 event_bus: Arc<EventBus>,
296 recorder: Option<ParquetRecorder>,
297 control_task: Option<JoinHandle<()>>,
298 shutdown: ShutdownSignal,
299 metrics_task: JoinHandle<()>,
300 alert_task: Option<JoinHandle<()>>,
301 reconciliation_task: Option<JoinHandle<()>>,
302 reconciliation_ctx: Option<Arc<ReconciliationContext>>,
303 private_event_rx: mpsc::Receiver<BrokerEvent>,
304 #[allow(dead_code)]
305 last_private_sync: Arc<tokio::sync::Mutex<Option<DateTime<Utc>>>>,
306 subscriber_handles: Vec<JoinHandle<()>>,
307 connection_monitors: Vec<JoinHandle<()>>,
308 order_timeout_task: JoinHandle<()>,
309 strategy: Arc<Mutex<Box<dyn Strategy>>>,
310 _public_connection: Arc<AtomicBool>,
311 _private_connection: Option<Arc<AtomicBool>>,
312}
313
314impl LiveRuntime {
315 #[allow(clippy::too_many_arguments)]
316 async fn new(
317 market: Box<dyn LiveMarketStream>,
318 mut strategy: Box<dyn Strategy>,
319 symbols: Vec<String>,
320 orchestrator: OrderOrchestrator,
321 settings: LiveSessionSettings,
322 #[cfg_attr(not(feature = "binance"), allow(unused_variables))] exchange_ws_url: String,
323 market_registry: Arc<MarketRegistry>,
324 shutdown: ShutdownSignal,
325 public_connection: Arc<AtomicBool>,
326 private_connection: Option<Arc<AtomicBool>>,
327 ) -> Result<Self> {
328 let mut strategy_ctx = StrategyContext::new(settings.history);
329 let state_repo: Arc<dyn StateRepository> =
330 Arc::new(SqliteStateRepository::new(settings.state_path.clone()));
331 let mut persisted = match tokio::task::spawn_blocking({
332 let repo = state_repo.clone();
333 move || repo.load()
334 })
335 .await
336 {
337 Ok(Ok(state)) => state,
338 Ok(Err(err)) => {
339 warn!(error = %err, "failed to load live state; starting from defaults");
340 LiveState::default()
341 }
342 Err(err) => {
343 warn!(error = %err, "state load task failed; starting from defaults");
344 LiveState::default()
345 }
346 };
347 let mut live_bootstrap = None;
348 if matches!(settings.exec_backend, ExecutionBackend::Live) {
349 info!("Synchronizing portfolio state from exchange");
350 let execution_client = orchestrator.execution_engine().client();
351 let positions = execution_client
352 .positions()
353 .await
354 .context("failed to fetch remote positions")?;
355 let balances = execution_client
356 .account_balances()
357 .await
358 .context("failed to fetch remote account balances")?;
359 let mut open_orders = Vec::new();
360 for symbol in &symbols {
361 let mut symbol_orders = execution_client
362 .list_open_orders(symbol)
363 .await
364 .with_context(|| format!("failed to fetch open orders for {symbol}"))?;
365 open_orders.append(&mut symbol_orders);
366 }
367 persisted.open_orders = open_orders;
368 live_bootstrap = Some((positions, balances));
369 }
370
371 let portfolio_cfg = PortfolioConfig {
372 initial_balances: settings.initial_balances.clone(),
373 reporting_currency: settings.reporting_currency.clone(),
374 max_drawdown: Some(settings.risk.max_drawdown),
375 };
376 let portfolio = if let Some((positions, balances)) = live_bootstrap {
377 Portfolio::from_exchange_state(
378 positions,
379 balances,
380 portfolio_cfg.clone(),
381 market_registry.clone(),
382 )
383 } else if let Some(snapshot) = persisted.portfolio.take() {
384 Portfolio::from_state(snapshot, portfolio_cfg.clone(), market_registry.clone())
385 } else {
386 Portfolio::new(portfolio_cfg.clone(), market_registry.clone())
387 };
388 strategy_ctx.update_positions(portfolio.positions());
389 if let Some(state) = persisted.strategy_state.take() {
391 info!("restoring strategy state from persistence");
392 strategy
393 .restore(state)
394 .context("failed to restore strategy state")?;
395 }
396 persisted.portfolio = Some(portfolio.snapshot());
397
398 let mut market_snapshots = HashMap::new();
399 for symbol in &symbols {
400 let mut snapshot = MarketSnapshot::default();
401 if let Some(price) = persisted.last_prices.get(symbol).copied() {
402 snapshot.last_trade = Some(price);
403 }
404 market_snapshots.insert(symbol.clone(), snapshot);
405 }
406
407 let metrics = LiveMetrics::new();
408 metrics.update_connection_status("public", public_connection.load(Ordering::SeqCst));
409 if let Some(flag) = &private_connection {
410 metrics.update_connection_status("private", flag.load(Ordering::SeqCst));
411 }
412 let metrics_task = spawn_metrics_server(metrics.registry(), settings.metrics_addr);
413 let dispatcher = AlertDispatcher::new(settings.alerting.webhook_url.clone());
414 let alerts = AlertManager::new(
415 settings.alerting,
416 dispatcher,
417 Some(public_connection.clone()),
418 private_connection.clone(),
419 );
420 let (private_event_tx, private_event_rx) = mpsc::channel(1024);
421 let last_private_sync = Arc::new(tokio::sync::Mutex::new(persisted.last_candle_ts));
422 let alerts = Arc::new(alerts);
423 let alert_task = alerts.spawn_watchdog();
424 let metrics = Arc::new(metrics);
425 let mut connection_monitors = Vec::new();
426 connection_monitors.push(spawn_connection_monitor(
427 shutdown.clone(),
428 public_connection.clone(),
429 metrics.clone(),
430 "public",
431 ));
432 if let Some(flag) = private_connection.clone() {
433 connection_monitors.push(spawn_connection_monitor(
434 shutdown.clone(),
435 flag,
436 metrics.clone(),
437 "private",
438 ));
439 }
440
441 if !settings.exec_backend.is_paper() {
442 let execution_engine = orchestrator.execution_engine();
443 let exec_client = execution_engine.client();
444 match settings.driver.as_str() {
445 "bybit" | "" => {
446 #[cfg(feature = "bybit")]
447 {
448 let bybit = exec_client
449 .as_ref()
450 .as_any()
451 .downcast_ref::<BybitClient>()
452 .ok_or_else(|| anyhow!("execution client is not Bybit"))?;
453 let creds = bybit
454 .get_credentials()
455 .ok_or_else(|| anyhow!("live execution requires Bybit credentials"))?;
456 spawn_bybit_private_stream(
457 creds,
458 bybit.get_ws_url(),
459 private_event_tx.clone(),
460 exec_client.clone(),
461 symbols.clone(),
462 last_private_sync.clone(),
463 private_connection.clone(),
464 metrics.clone(),
465 shutdown.clone(),
466 );
467 }
468 #[cfg(not(feature = "bybit"))]
469 {
470 bail!("driver 'bybit' is unavailable without the 'bybit' feature");
471 }
472 }
473 "binance" => {
474 #[cfg(feature = "binance")]
475 {
476 spawn_binance_private_stream(
477 exec_client.clone(),
478 exchange_ws_url.clone(),
479 private_event_tx.clone(),
480 private_connection.clone(),
481 metrics.clone(),
482 shutdown.clone(),
483 );
484 }
485 #[cfg(not(feature = "binance"))]
486 {
487 bail!("driver 'binance' is unavailable without the 'binance' feature");
488 }
489 }
490 "paper" => {}
491 other => {
492 bail!("private stream unsupported for driver '{other}'");
493 }
494 }
495 }
496
497 let recorder = if let Some(record_path) = settings.record_path.clone() {
498 let config = RecorderConfig {
499 root: record_path.clone(),
500 ..RecorderConfig::default()
501 };
502 match ParquetRecorder::spawn(config).await {
503 Ok(recorder) => {
504 info!(path = %record_path.display(), "flight recorder enabled");
505 Some(recorder)
506 }
507 Err(err) => {
508 warn!(
509 error = %err,
510 path = %record_path.display(),
511 "failed to start flight recorder"
512 );
513 None
514 }
515 }
516 } else {
517 None
518 };
519 let recorder_handle = recorder.as_ref().map(|rec| rec.handle());
520
521 let strategy = Arc::new(Mutex::new(strategy));
522 let strategy_ctx = Arc::new(Mutex::new(strategy_ctx));
523 let portfolio = Arc::new(Mutex::new(portfolio));
524 let market_cache = Arc::new(Mutex::new(market_snapshots));
525 let persisted = Arc::new(Mutex::new(persisted));
526 let orchestrator = Arc::new(orchestrator);
527 let event_bus = Arc::new(EventBus::new(2048));
528 let last_data_timestamp = Arc::new(AtomicI64::new(0));
529 let control_task = control::spawn_control_plane(
530 settings.control_addr,
531 portfolio.clone(),
532 orchestrator.clone(),
533 persisted.clone(),
534 last_data_timestamp.clone(),
535 shutdown.clone(),
536 );
537 let reconciliation_ctx = (!settings.exec_backend.is_paper()).then(|| {
538 Arc::new(ReconciliationContext::new(ReconciliationContextConfig {
539 client: orchestrator.execution_engine().client(),
540 portfolio: portfolio.clone(),
541 persisted: persisted.clone(),
542 state_repo: state_repo.clone(),
543 alerts: alerts.clone(),
544 metrics: metrics.clone(),
545 reporting_currency: settings.reporting_currency.clone(),
546 threshold: settings.reconciliation_threshold,
547 }))
548 });
549 let reconciliation_task = reconciliation_ctx.as_ref().map(|ctx| {
550 spawn_reconciliation_loop(
551 ctx.clone(),
552 shutdown.clone(),
553 settings.reconciliation_interval,
554 )
555 });
556 let subscriber_handles = spawn_event_subscribers(
557 event_bus.clone(),
558 strategy.clone(),
559 strategy_ctx.clone(),
560 orchestrator.clone(),
561 portfolio.clone(),
562 metrics.clone(),
563 alerts.clone(),
564 market_cache.clone(),
565 state_repo.clone(),
566 persisted.clone(),
567 settings.exec_backend,
568 recorder_handle.clone(),
569 last_data_timestamp.clone(),
570 );
571 let order_timeout_task = spawn_order_timeout_monitor(
572 orchestrator.clone(),
573 event_bus.clone(),
574 alerts.clone(),
575 shutdown.clone(),
576 );
577
578 info!(
579 symbols = ?symbols,
580 category = ?settings.category,
581 metrics_addr = %settings.metrics_addr,
582 state_path = %settings.state_path.display(),
583 history = settings.history,
584 "market stream ready"
585 );
586
587 for symbol in &symbols {
588 let ctx = shared_risk_context(symbol, &portfolio, &market_cache, &persisted).await;
589 orchestrator.update_risk_context(symbol.clone(), ctx);
590 }
591
592 Ok(Self {
593 market,
594 orchestrator,
595 state_repo,
596 persisted,
597 event_bus,
598 recorder,
599 control_task: Some(control_task),
600 shutdown,
601 metrics_task,
602 alert_task,
603 reconciliation_task,
604 reconciliation_ctx,
605 private_event_rx,
606 last_private_sync,
607 subscriber_handles,
608 connection_monitors,
609 order_timeout_task,
610 strategy,
611 _public_connection: public_connection,
612 _private_connection: private_connection,
613 })
614 }
615
616 async fn run(mut self) -> Result<()> {
617 info!("live session started");
618 if let Some(ctx) = self.reconciliation_ctx.as_ref() {
619 perform_state_reconciliation(ctx.as_ref())
620 .await
621 .context("initial state reconciliation failed")?;
622 }
623 let backoff = Duration::from_millis(200);
624 let mut orchestrator_timer = tokio::time::interval(Duration::from_secs(1));
625
626 while !self.shutdown.triggered() {
627 let mut progressed = false;
628
629 if let Some(tick) = self.market.next_tick().await? {
630 progressed = true;
631 self.event_bus.publish(Event::Tick(TickEvent { tick }));
632 }
633
634 if let Some(candle) = self.market.next_candle().await? {
635 progressed = true;
636 self.event_bus
637 .publish(Event::Candle(CandleEvent { candle }));
638 }
639
640 if let Some(book) = self.market.next_order_book().await? {
641 progressed = true;
642 self.event_bus
643 .publish(Event::OrderBook(OrderBookEvent { order_book: book }));
644 }
645
646 tokio::select! {
647 biased;
648 Some(event) = self.private_event_rx.recv() => {
649 progressed = true;
650 match event {
651 BrokerEvent::OrderUpdate(order) => {
652 info!(
653 order_id = %order.id,
654 status = ?order.status,
655 symbol = %order.request.symbol,
656 "received private order update"
657 );
658 self.event_bus
659 .publish(Event::OrderUpdate(OrderUpdateEvent { order }));
660 }
661 BrokerEvent::Fill(fill) => {
662 info!(
663 order_id = %fill.order_id,
664 symbol = %fill.symbol,
665 qty = %fill.fill_quantity,
666 price = %fill.fill_price,
667 "received private fill"
668 );
669 self.event_bus.publish(Event::Fill(FillEvent { fill }));
670 }
671 }
672 }
673 _ = orchestrator_timer.tick() => {
674 if let Err(e) = self.orchestrator.on_timer_tick().await {
676 error!("Orchestrator timer tick failed: {}", e);
677 }
678 }
679 else => {}
680 }
681
682 if !progressed && !self.shutdown.sleep(backoff).await {
683 break;
684 }
685 }
686 info!("live session stopping");
687 self.metrics_task.abort();
688 if let Some(handle) = self.alert_task.take() {
689 handle.abort();
690 }
691 if let Some(handle) = self.reconciliation_task.take() {
692 handle.abort();
693 }
694 self.order_timeout_task.abort();
695 for handle in self.subscriber_handles.drain(..) {
696 handle.abort();
697 }
698 for handle in self.connection_monitors.drain(..) {
699 handle.abort();
700 }
701 if let Err(err) = persist_state(
702 self.state_repo.clone(),
703 self.persisted.clone(),
704 Some(self.strategy.clone()),
705 )
706 .await
707 {
708 warn!(error = %err, "failed to persist shutdown state");
709 }
710 if let Some(task) = self.control_task.take() {
711 if let Err(err) = task.await {
712 warn!(error = %err, "control plane server task aborted");
713 }
714 }
715 if let Some(recorder) = self.recorder.take() {
716 if let Err(err) = recorder.shutdown().await {
717 warn!(error = %err, "failed to flush flight recorder");
718 }
719 }
720 Ok(())
721 }
722}
723
724struct ReconciliationContext {
725 client: Arc<dyn ExecutionClient>,
726 portfolio: Arc<Mutex<Portfolio>>,
727 persisted: Arc<Mutex<LiveState>>,
728 state_repo: Arc<dyn StateRepository>,
729 alerts: Arc<AlertManager>,
730 metrics: Arc<LiveMetrics>,
731 reporting_currency: Symbol,
732 threshold: Decimal,
733}
734
735struct ReconciliationContextConfig {
736 client: Arc<dyn ExecutionClient>,
737 portfolio: Arc<Mutex<Portfolio>>,
738 persisted: Arc<Mutex<LiveState>>,
739 state_repo: Arc<dyn StateRepository>,
740 alerts: Arc<AlertManager>,
741 metrics: Arc<LiveMetrics>,
742 reporting_currency: Symbol,
743 threshold: Decimal,
744}
745
746impl ReconciliationContext {
747 fn new(config: ReconciliationContextConfig) -> Self {
748 let ReconciliationContextConfig {
749 client,
750 portfolio,
751 persisted,
752 state_repo,
753 alerts,
754 metrics,
755 reporting_currency,
756 threshold,
757 } = config;
758 let min_threshold = Decimal::new(1, 6); let threshold = if threshold <= Decimal::ZERO {
760 min_threshold
761 } else {
762 threshold
763 };
764 Self {
765 client,
766 portfolio,
767 persisted,
768 state_repo,
769 alerts,
770 metrics,
771 reporting_currency,
772 threshold,
773 }
774 }
775}
776
777fn spawn_reconciliation_loop(
778 ctx: Arc<ReconciliationContext>,
779 shutdown: ShutdownSignal,
780 interval: Duration,
781) -> JoinHandle<()> {
782 tokio::spawn(async move {
783 while shutdown.sleep(interval).await {
784 if let Err(err) = perform_state_reconciliation(ctx.as_ref()).await {
785 error!(error = %err, "periodic state reconciliation failed");
786 }
787 }
788 })
789}
790
791async fn perform_state_reconciliation(ctx: &ReconciliationContext) -> Result<()> {
792 info!("running state reconciliation");
793 let remote_positions = ctx
794 .client
795 .positions()
796 .await
797 .context("failed to fetch remote positions")?;
798 let remote_balances = ctx
799 .client
800 .account_balances()
801 .await
802 .context("failed to fetch remote balances")?;
803 let (local_positions, local_cash) = {
804 let guard = ctx.portfolio.lock().await;
805 (guard.positions(), guard.cash())
806 };
807
808 let remote_map = positions_to_map(remote_positions);
809 let local_map = positions_to_map(local_positions);
810 let mut tracked_symbols: HashSet<String> = HashSet::new();
811 tracked_symbols.extend(remote_map.keys().cloned());
812 tracked_symbols.extend(local_map.keys().cloned());
813
814 let mut severe_findings = Vec::new();
815 for symbol in tracked_symbols {
816 let local_qty = local_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
817 let remote_qty = remote_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
818 let diff = (local_qty - remote_qty).abs();
819 let diff_value = diff.to_f64().unwrap_or(0.0);
820 ctx.metrics.update_position_diff(&symbol, diff_value);
821 if diff > Decimal::ZERO {
822 warn!(
823 symbol = %symbol,
824 local = %local_qty,
825 remote = %remote_qty,
826 diff = %diff,
827 "position mismatch detected during reconciliation"
828 );
829 let pct = normalize_diff(diff, remote_qty);
830 if pct >= ctx.threshold {
831 error!(
832 symbol = %symbol,
833 local = %local_qty,
834 remote = %remote_qty,
835 diff = %diff,
836 pct = %pct,
837 "position mismatch exceeds threshold"
838 );
839 severe_findings.push(format!(
840 "{symbol} local={local_qty} remote={remote_qty} diff={diff}"
841 ));
842 }
843 }
844 }
845
846 let reporting = ctx.reporting_currency.as_str();
847 let remote_cash = remote_balances
848 .iter()
849 .find(|balance| balance.currency == reporting)
850 .map(|balance| balance.available)
851 .unwrap_or_else(|| Decimal::ZERO);
852 let cash_diff = (remote_cash - local_cash).abs();
853 ctx.metrics
854 .update_balance_diff(reporting, cash_diff.to_f64().unwrap_or(0.0));
855 if cash_diff > Decimal::ZERO {
856 warn!(
857 currency = %reporting,
858 local = %local_cash,
859 remote = %remote_cash,
860 diff = %cash_diff,
861 "balance mismatch detected during reconciliation"
862 );
863 let pct = normalize_diff(cash_diff, remote_cash);
864 if pct >= ctx.threshold {
865 error!(
866 currency = %reporting,
867 local = %local_cash,
868 remote = %remote_cash,
869 diff = %cash_diff,
870 pct = %pct,
871 "balance mismatch exceeds threshold"
872 );
873 severe_findings.push(format!(
874 "{reporting} balance local={local_cash} remote={remote_cash} diff={cash_diff}"
875 ));
876 }
877 }
878
879 if severe_findings.is_empty() {
880 info!("state reconciliation complete with no critical divergence");
881 return Ok(());
882 }
883
884 let alert_body = severe_findings.join("; ");
885 ctx.alerts
886 .notify("State reconciliation divergence", &alert_body)
887 .await;
888 enforce_liquidate_only(ctx).await;
889 Ok(())
890}
891
892async fn enforce_liquidate_only(ctx: &ReconciliationContext) {
893 let snapshot = {
894 let mut guard = ctx.portfolio.lock().await;
895 if !guard.set_liquidate_only(true) {
896 return;
897 }
898 info!("entering liquidate-only mode due to reconciliation divergence");
899 guard.snapshot()
900 };
901 {
902 let mut state = ctx.persisted.lock().await;
903 state.portfolio = Some(snapshot);
904 }
905 if let Err(err) = persist_state(ctx.state_repo.clone(), ctx.persisted.clone(), None).await {
906 warn!(error = %err, "failed to persist liquidate-only transition");
907 }
908}
909
910fn positions_to_map(positions: Vec<Position>) -> HashMap<String, Decimal> {
911 let mut map = HashMap::new();
912 for position in positions {
913 map.insert(position.symbol.clone(), position_signed_qty(&position));
914 }
915 map
916}
917
918fn position_signed_qty(position: &Position) -> Decimal {
919 match position.side {
920 Some(Side::Buy) => position.quantity,
921 Some(Side::Sell) => -position.quantity,
922 None => Decimal::ZERO,
923 }
924}
925
926fn normalize_diff(diff: Decimal, reference: Decimal) -> Decimal {
927 if diff <= Decimal::ZERO {
928 Decimal::ZERO
929 } else {
930 let denominator = std::cmp::max(reference.abs(), Decimal::ONE);
931 diff / denominator
932 }
933}
934
935fn build_exchange_payload(exchange: &ExchangeConfig, settings: &LiveSessionSettings) -> Value {
936 let mut payload = serde_json::Map::new();
937 payload.insert("rest_url".into(), Value::String(exchange.rest_url.clone()));
938 payload.insert("ws_url".into(), Value::String(exchange.ws_url.clone()));
939 payload.insert("api_key".into(), Value::String(exchange.api_key.clone()));
940 payload.insert(
941 "api_secret".into(),
942 Value::String(exchange.api_secret.clone()),
943 );
944 payload.insert(
945 "category".into(),
946 Value::String(settings.category.as_path().to_string()),
947 );
948 payload.insert(
949 "orderbook_depth".into(),
950 Value::Number(serde_json::Number::from(settings.orderbook_depth as u64)),
951 );
952 if let Value::Object(extra) = exchange.params.clone() {
953 for (key, value) in extra {
954 payload.insert(key, value);
955 }
956 }
957 Value::Object(payload)
958}
959
960#[derive(Default)]
961struct MarketSnapshot {
962 last_trade: Option<Price>,
963 last_trade_ts: Option<DateTime<Utc>>,
964 last_candle: Option<Candle>,
965}
966
967impl MarketSnapshot {
968 fn price(&self) -> Option<Price> {
969 self.last_trade
970 .or_else(|| self.last_candle.as_ref().map(|c| c.close))
971 }
972}
973
974pub struct ShutdownSignal {
975 flag: Arc<AtomicBool>,
976 notify: Arc<Notify>,
977}
978
979impl ShutdownSignal {
980 pub fn new() -> Self {
981 let flag = Arc::new(AtomicBool::new(false));
982 let notify = Arc::new(Notify::new());
983 let flag_clone = flag.clone();
984 let notify_clone = notify.clone();
985 tokio::spawn(async move {
986 if tokio::signal::ctrl_c().await.is_ok() {
987 flag_clone.store(true, Ordering::SeqCst);
988 notify_clone.notify_waiters();
989 }
990 });
991 Self { flag, notify }
992 }
993
994 pub fn trigger(&self) {
995 self.flag.store(true, Ordering::SeqCst);
996 self.notify.notify_waiters();
997 }
998
999 pub fn triggered(&self) -> bool {
1000 self.flag.load(Ordering::SeqCst)
1001 }
1002
1003 pub async fn wait(&self) {
1004 if self.triggered() {
1005 return;
1006 }
1007 self.notify.notified().await;
1008 }
1009
1010 async fn sleep(&self, duration: Duration) -> bool {
1011 tokio::select! {
1012 _ = tokio::time::sleep(duration) => true,
1013 _ = self.notify.notified() => false,
1014 }
1015 }
1016}
1017
1018impl Default for ShutdownSignal {
1019 fn default() -> Self {
1020 Self::new()
1021 }
1022}
1023
1024impl Clone for ShutdownSignal {
1025 fn clone(&self) -> Self {
1026 Self {
1027 flag: self.flag.clone(),
1028 notify: self.notify.clone(),
1029 }
1030 }
1031}
1032
1033#[allow(clippy::too_many_arguments)]
1034fn spawn_event_subscribers(
1035 bus: Arc<EventBus>,
1036 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1037 strategy_ctx: Arc<Mutex<StrategyContext>>,
1038 orchestrator: Arc<OrderOrchestrator>,
1039 portfolio: Arc<Mutex<Portfolio>>,
1040 metrics: Arc<LiveMetrics>,
1041 alerts: Arc<AlertManager>,
1042 market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1043 state_repo: Arc<dyn StateRepository>,
1044 persisted: Arc<Mutex<LiveState>>,
1045 exec_backend: ExecutionBackend,
1046 recorder: Option<RecorderHandle>,
1047 last_data_timestamp: Arc<AtomicI64>,
1048) -> Vec<JoinHandle<()>> {
1049 let mut handles = Vec::new();
1050 let market_recorder = recorder.clone();
1051
1052 let market_bus = bus.clone();
1053 let market_strategy = strategy.clone();
1054 let market_ctx = strategy_ctx.clone();
1055 let market_metrics = metrics.clone();
1056 let market_alerts = alerts.clone();
1057 let market_state = state_repo.clone();
1058 let market_persisted = persisted.clone();
1059 let market_portfolio = portfolio.clone();
1060 let market_snapshot = market.clone();
1061 let orchestrator_clone = orchestrator.clone();
1062 let market_data_tracker = last_data_timestamp.clone();
1063 handles.push(tokio::spawn(async move {
1064 let recorder = market_recorder;
1065 let mut stream = market_bus.subscribe();
1066 loop {
1067 match stream.recv().await {
1068 Ok(Event::Tick(evt)) => {
1069 if let Some(handle) = recorder.as_ref() {
1070 handle.record_tick(evt.tick.clone());
1071 }
1072 if let Err(err) = process_tick_event(
1073 evt.tick,
1074 market_strategy.clone(),
1075 market_ctx.clone(),
1076 market_metrics.clone(),
1077 market_alerts.clone(),
1078 market_snapshot.clone(),
1079 market_portfolio.clone(),
1080 market_state.clone(),
1081 market_persisted.clone(),
1082 market_bus.clone(),
1083 market_data_tracker.clone(),
1084 )
1085 .await
1086 {
1087 warn!(error = %err, "tick handler failed");
1088 }
1089 }
1090 Ok(Event::Candle(evt)) => {
1091 if let Some(handle) = recorder.as_ref() {
1092 handle.record_candle(evt.candle.clone());
1093 }
1094 if let Err(err) = process_candle_event(
1095 evt.candle,
1096 market_strategy.clone(),
1097 market_ctx.clone(),
1098 market_metrics.clone(),
1099 market_alerts.clone(),
1100 market_snapshot.clone(),
1101 market_portfolio.clone(),
1102 orchestrator_clone.clone(),
1103 exec_backend,
1104 market_state.clone(),
1105 market_persisted.clone(),
1106 market_bus.clone(),
1107 market_data_tracker.clone(),
1108 )
1109 .await
1110 {
1111 warn!(error = %err, "candle handler failed");
1112 }
1113 }
1114 Ok(Event::OrderBook(evt)) => {
1115 if let Err(err) = process_order_book_event(
1116 evt.order_book,
1117 market_strategy.clone(),
1118 market_ctx.clone(),
1119 market_metrics.clone(),
1120 market_alerts.clone(),
1121 market_snapshot.clone(),
1122 market_bus.clone(),
1123 market_data_tracker.clone(),
1124 )
1125 .await
1126 {
1127 warn!(error = %err, "order book handler failed");
1128 }
1129 }
1130 Ok(_) => {}
1131 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1132 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1133 warn!(lag = lag, "market subscriber lagged");
1134 continue;
1135 }
1136 }
1137 }
1138 }));
1139
1140 let exec_bus = bus.clone();
1141 let exec_portfolio = portfolio.clone();
1142 let exec_market = market.clone();
1143 let exec_persisted = persisted.clone();
1144 let exec_alerts = alerts.clone();
1145 let exec_metrics = metrics.clone();
1146 let exec_orchestrator = orchestrator.clone();
1147 handles.push(tokio::spawn(async move {
1148 let orchestrator = exec_orchestrator.clone();
1149 let mut stream = exec_bus.subscribe();
1150 loop {
1151 match stream.recv().await {
1152 Ok(Event::Signal(evt)) => {
1153 if let Err(err) = process_signal_event(
1154 evt.signal,
1155 orchestrator.clone(),
1156 exec_portfolio.clone(),
1157 exec_market.clone(),
1158 exec_persisted.clone(),
1159 exec_alerts.clone(),
1160 exec_metrics.clone(),
1161 )
1162 .await
1163 {
1164 warn!(error = %err, "signal handler failed");
1165 }
1166 }
1167 Ok(_) => {}
1168 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1169 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1170 warn!(lag = lag, "signal subscriber lagged");
1171 continue;
1172 }
1173 }
1174 }
1175 }));
1176
1177 let fill_bus = bus.clone();
1178 let fill_state = state_repo.clone();
1179 let fill_orchestrator = orchestrator.clone();
1180 let fill_persisted = persisted.clone();
1181 let fill_alerts = alerts.clone();
1182 let fill_recorder = recorder.clone();
1183 handles.push(tokio::spawn(async move {
1184 let orchestrator = fill_orchestrator.clone();
1185 let persisted = fill_persisted.clone();
1186 let recorder = fill_recorder;
1187 let mut stream = fill_bus.subscribe();
1188 loop {
1189 match stream.recv().await {
1190 Ok(Event::Fill(evt)) => {
1191 if let Some(handle) = recorder.as_ref() {
1192 handle.record_fill(evt.fill.clone());
1193 }
1194 if let Err(err) = process_fill_event(
1195 evt.fill,
1196 portfolio.clone(),
1197 strategy.clone(),
1198 strategy_ctx.clone(),
1199 orchestrator.clone(),
1200 metrics.clone(),
1201 fill_alerts.clone(),
1202 fill_state.clone(),
1203 persisted.clone(),
1204 )
1205 .await
1206 {
1207 warn!(error = %err, "fill handler failed");
1208 }
1209 }
1210 Ok(_) => {}
1211 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1212 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1213 warn!(lag = lag, "fill subscriber lagged");
1214 continue;
1215 }
1216 }
1217 }
1218 }));
1219
1220 let order_bus = bus.clone();
1221 let order_persisted = persisted.clone();
1222 let order_alerts = alerts.clone();
1223 let order_orchestrator = orchestrator.clone();
1224 let order_recorder = recorder;
1227 handles.push(tokio::spawn(async move {
1228 let orchestrator = order_orchestrator.clone();
1229 let persisted = order_persisted.clone();
1230 let recorder = order_recorder;
1231 let mut stream = order_bus.subscribe();
1232 loop {
1233 match stream.recv().await {
1234 Ok(Event::OrderUpdate(evt)) => {
1235 if let Some(handle) = recorder.as_ref() {
1236 handle.record_order(evt.order.clone());
1237 }
1238 if let Err(err) = process_order_update_event(
1239 evt.order,
1240 orchestrator.clone(),
1241 order_alerts.clone(),
1242 state_repo.clone(),
1243 persisted.clone(),
1244 )
1245 .await
1246 {
1247 warn!(error = %err, "order update handler failed");
1248 }
1249 }
1250 Ok(_) => {}
1251 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1252 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1253 warn!(lag = lag, "order subscriber lagged");
1254 continue;
1255 }
1256 }
1257 }
1258 }));
1259
1260 handles
1261}
1262
1263#[allow(clippy::too_many_arguments)]
1264async fn process_tick_event(
1265 tick: Tick,
1266 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1267 strategy_ctx: Arc<Mutex<StrategyContext>>,
1268 metrics: Arc<LiveMetrics>,
1269 alerts: Arc<AlertManager>,
1270 market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1271 portfolio: Arc<Mutex<Portfolio>>,
1272 state_repo: Arc<dyn StateRepository>,
1273 persisted: Arc<Mutex<LiveState>>,
1274 bus: Arc<EventBus>,
1275 last_data_timestamp: Arc<AtomicI64>,
1276) -> Result<()> {
1277 metrics.inc_tick();
1278 metrics.update_staleness(0.0);
1279 metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1280 last_data_timestamp.store(tick.exchange_timestamp.timestamp(), Ordering::SeqCst);
1281 alerts.heartbeat().await;
1282 {
1283 let mut guard = market.lock().await;
1284 if let Some(snapshot) = guard.get_mut(&tick.symbol) {
1285 snapshot.last_trade = Some(tick.price);
1286 snapshot.last_trade_ts = Some(tick.exchange_timestamp);
1287 }
1288 }
1289 let mut drawdown_triggered = false;
1290 let mut snapshot_on_trigger = None;
1291 {
1292 let mut guard = portfolio.lock().await;
1293 let was_liquidate_only = guard.liquidate_only();
1294 match guard.update_market_data(&tick.symbol, tick.price) {
1295 Ok(_) => {
1296 if !was_liquidate_only && guard.liquidate_only() {
1297 drawdown_triggered = true;
1298 snapshot_on_trigger = Some(guard.snapshot());
1299 }
1300 }
1301 Err(err) => {
1302 warn!(
1303 symbol = %tick.symbol,
1304 error = %err,
1305 "failed to refresh market data"
1306 );
1307 }
1308 }
1309 }
1310 {
1311 let mut state = persisted.lock().await;
1312 state.last_prices.insert(tick.symbol.clone(), tick.price);
1313 if drawdown_triggered {
1314 if let Some(snapshot) = snapshot_on_trigger.take() {
1315 state.portfolio = Some(snapshot);
1316 }
1317 }
1318 }
1319 if drawdown_triggered {
1320 persist_state(
1321 state_repo.clone(),
1322 persisted.clone(),
1323 Some(strategy.clone()),
1324 )
1325 .await?;
1326 alert_liquidate_only(alerts.clone()).await;
1327 }
1328 {
1329 let mut ctx = strategy_ctx.lock().await;
1330 ctx.push_tick(tick.clone());
1331 let lock_start = Instant::now();
1332 let mut strat = strategy.lock().await;
1333 log_strategy_lock("tick", lock_start.elapsed());
1334 let call_start = Instant::now();
1335 strat
1336 .on_tick(&ctx, &tick)
1337 .await
1338 .context("strategy failure on tick event")?;
1339 log_strategy_call("tick", call_start.elapsed());
1340 }
1341 emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1342 Ok(())
1343}
1344
1345#[allow(clippy::too_many_arguments)]
1346async fn process_candle_event(
1347 candle: Candle,
1348 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1349 strategy_ctx: Arc<Mutex<StrategyContext>>,
1350 metrics: Arc<LiveMetrics>,
1351 alerts: Arc<AlertManager>,
1352 market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1353 portfolio: Arc<Mutex<Portfolio>>,
1354 orchestrator: Arc<OrderOrchestrator>,
1355 exec_backend: ExecutionBackend,
1356 state_repo: Arc<dyn StateRepository>,
1357 persisted: Arc<Mutex<LiveState>>,
1358 bus: Arc<EventBus>,
1359 last_data_timestamp: Arc<AtomicI64>,
1360) -> Result<()> {
1361 metrics.inc_candle();
1362 metrics.update_staleness(0.0);
1363 metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1364 last_data_timestamp.store(candle.timestamp.timestamp(), Ordering::SeqCst);
1365 alerts.heartbeat().await;
1366 metrics.update_price(&candle.symbol, candle.close.to_f64().unwrap_or(0.0));
1367 {
1368 let mut guard = market.lock().await;
1369 if let Some(snapshot) = guard.get_mut(&candle.symbol) {
1370 snapshot.last_candle = Some(candle.clone());
1371 snapshot.last_trade = Some(candle.close);
1372 }
1373 }
1374 if exec_backend.is_paper() {
1375 let client = orchestrator.execution_engine().client();
1376 if let Some(paper) = client.as_any().downcast_ref::<PaperExecutionClient>() {
1377 paper.update_price(&candle.symbol, candle.close);
1378 }
1379 }
1380 let mut candle_drawdown_triggered = false;
1381 let mut candle_snapshot = None;
1382 {
1383 let mut guard = portfolio.lock().await;
1384 let was_liquidate_only = guard.liquidate_only();
1385 match guard.update_market_data(&candle.symbol, candle.close) {
1386 Ok(_) => {
1387 if !was_liquidate_only && guard.liquidate_only() {
1388 candle_drawdown_triggered = true;
1389 candle_snapshot = Some(guard.snapshot());
1390 }
1391 }
1392 Err(err) => {
1393 warn!(
1394 symbol = %candle.symbol,
1395 error = %err,
1396 "failed to refresh market data"
1397 );
1398 }
1399 }
1400 }
1401 if candle_drawdown_triggered {
1402 if let Some(snapshot) = candle_snapshot.take() {
1403 let mut persisted_guard = persisted.lock().await;
1404 persisted_guard.portfolio = Some(snapshot);
1405 }
1406 alert_liquidate_only(alerts.clone()).await;
1407 }
1408 {
1409 let mut ctx = strategy_ctx.lock().await;
1410 ctx.push_candle(candle.clone());
1411 let lock_start = Instant::now();
1412 let mut strat = strategy.lock().await;
1413 log_strategy_lock("candle", lock_start.elapsed());
1414 let call_start = Instant::now();
1415 strat
1416 .on_candle(&ctx, &candle)
1417 .await
1418 .context("strategy failure on candle event")?;
1419 log_strategy_call("candle", call_start.elapsed());
1420 }
1421 {
1422 let mut snapshot = persisted.lock().await;
1423 snapshot.last_candle_ts = Some(candle.timestamp);
1424 snapshot
1425 .last_prices
1426 .insert(candle.symbol.clone(), candle.close);
1427 }
1428 persist_state(
1429 state_repo.clone(),
1430 persisted.clone(),
1431 Some(strategy.clone()),
1432 )
1433 .await?;
1434 let ctx = shared_risk_context(&candle.symbol, &portfolio, &market, &persisted).await;
1435 orchestrator.update_risk_context(candle.symbol.clone(), ctx);
1436 emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1437 Ok(())
1438}
1439
1440#[allow(clippy::too_many_arguments)]
1441async fn process_order_book_event(
1442 book: OrderBook,
1443 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1444 strategy_ctx: Arc<Mutex<StrategyContext>>,
1445 metrics: Arc<LiveMetrics>,
1446 alerts: Arc<AlertManager>,
1447 _market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1448 bus: Arc<EventBus>,
1449 last_data_timestamp: Arc<AtomicI64>,
1450) -> Result<()> {
1451 metrics.update_staleness(0.0);
1452 alerts.heartbeat().await;
1453 last_data_timestamp.store(book.timestamp.timestamp(), Ordering::SeqCst);
1454 {
1455 let mut ctx = strategy_ctx.lock().await;
1456 ctx.push_order_book(book.clone());
1457 let lock_start = Instant::now();
1458 let mut strat = strategy.lock().await;
1459 log_strategy_lock("order_book", lock_start.elapsed());
1460 let call_start = Instant::now();
1461 strat
1462 .on_order_book(&ctx, &book)
1463 .await
1464 .context("strategy failure on order book")?;
1465 log_strategy_call("order_book", call_start.elapsed());
1466 }
1467 emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1468 Ok(())
1469}
1470
1471async fn process_signal_event(
1472 signal: Signal,
1473 orchestrator: Arc<OrderOrchestrator>,
1474 portfolio: Arc<Mutex<Portfolio>>,
1475 market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1476 persisted: Arc<Mutex<LiveState>>,
1477 alerts: Arc<AlertManager>,
1478 metrics: Arc<LiveMetrics>,
1479) -> Result<()> {
1480 let ctx = shared_risk_context(&signal.symbol, &portfolio, &market, &persisted).await;
1481 orchestrator.update_risk_context(signal.symbol.clone(), ctx);
1482 match orchestrator.on_signal(&signal, &ctx).await {
1483 Ok(()) => {
1484 alerts.reset_order_failures().await;
1485 }
1486 Err(err) => {
1487 metrics.inc_order_failure();
1488 alerts
1489 .order_failure(&format!("orchestrator error: {err}"))
1490 .await;
1491 }
1492 }
1493 Ok(())
1494}
1495
1496fn log_strategy_lock(event: &str, wait: Duration) {
1497 let wait_ms = wait.as_secs_f64() * 1000.0;
1498 if wait >= STRATEGY_LOCK_WARN_THRESHOLD {
1499 warn!(target: "strategy", event, wait_ms, "strategy lock wait exceeded threshold");
1500 } else {
1501 trace!(target: "strategy", event, wait_ms, "strategy lock acquired");
1502 }
1503}
1504
1505fn log_strategy_call(event: &str, elapsed: Duration) {
1506 let duration_ms = elapsed.as_secs_f64() * 1000.0;
1507 if elapsed >= STRATEGY_CALL_WARN_THRESHOLD {
1508 warn!(target: "strategy", event, duration_ms, "strategy call latency above threshold");
1509 } else {
1510 trace!(target: "strategy", event, duration_ms, "strategy call completed");
1511 }
1512}
1513
1514#[allow(clippy::too_many_arguments)]
1515async fn process_fill_event(
1516 fill: Fill,
1517 portfolio: Arc<Mutex<Portfolio>>,
1518 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1519 strategy_ctx: Arc<Mutex<StrategyContext>>,
1520 orchestrator: Arc<OrderOrchestrator>,
1521 metrics: Arc<LiveMetrics>,
1522 alerts: Arc<AlertManager>,
1523 state_repo: Arc<dyn StateRepository>,
1524 persisted: Arc<Mutex<LiveState>>,
1525) -> Result<()> {
1526 let mut drawdown_triggered = false;
1527 {
1528 let mut guard = portfolio.lock().await;
1529 let was_liquidate_only = guard.liquidate_only();
1530 guard
1531 .apply_fill(&fill)
1532 .context("Failed to apply fill to portfolio")?;
1533 if !was_liquidate_only && guard.liquidate_only() {
1534 drawdown_triggered = true;
1535 }
1536 let snapshot = guard.snapshot();
1537 let mut persisted_guard = persisted.lock().await;
1538 persisted_guard.portfolio = Some(snapshot);
1539 }
1540 {
1541 let positions = {
1542 let guard = portfolio.lock().await;
1543 guard.positions()
1544 };
1545 let mut ctx = strategy_ctx.lock().await;
1546 ctx.update_positions(positions);
1547 }
1548 orchestrator.on_fill(&fill).await.ok();
1549 {
1550 let ctx = strategy_ctx.lock().await;
1551 let lock_start = Instant::now();
1552 let mut strat = strategy.lock().await;
1553 log_strategy_lock("fill", lock_start.elapsed());
1554 let call_start = Instant::now();
1555 strat
1556 .on_fill(&ctx, &fill)
1557 .await
1558 .context("Strategy failed on fill event")?;
1559 log_strategy_call("fill", call_start.elapsed());
1560 }
1561 let equity = {
1562 let guard = portfolio.lock().await;
1563 guard.equity()
1564 };
1565 if let Some(value) = equity.to_f64() {
1566 metrics.update_equity(value);
1567 }
1568 alerts.update_equity(equity).await;
1569 metrics.inc_order();
1570 alerts
1571 .notify(
1572 "Order Filled",
1573 &format!(
1574 "order filled: {}@{} ({})",
1575 fill.fill_quantity,
1576 fill.fill_price,
1577 match fill.side {
1578 Side::Buy => "buy",
1579 Side::Sell => "sell",
1580 }
1581 ),
1582 )
1583 .await;
1584 if drawdown_triggered {
1585 alert_liquidate_only(alerts.clone()).await;
1586 }
1587 persist_state(
1588 state_repo.clone(),
1589 persisted.clone(),
1590 Some(strategy.clone()),
1591 )
1592 .await?;
1593 Ok(())
1594}
1595
1596async fn process_order_update_event(
1597 order: Order,
1598 orchestrator: Arc<OrderOrchestrator>,
1599 alerts: Arc<AlertManager>,
1600 state_repo: Arc<dyn StateRepository>,
1601 persisted: Arc<Mutex<LiveState>>,
1602) -> Result<()> {
1603 orchestrator.on_order_update(&order);
1604 if matches!(order.status, OrderStatus::Rejected) {
1605 error!(
1606 order_id = %order.id,
1607 symbol = %order.request.symbol,
1608 "order rejected by exchange"
1609 );
1610 alerts.order_failure("order rejected by exchange").await;
1611 alerts
1612 .notify(
1613 "Order rejected",
1614 &format!(
1615 "Order {} for {} was rejected",
1616 order.id, order.request.symbol
1617 ),
1618 )
1619 .await;
1620 }
1621 {
1622 let mut snapshot = persisted.lock().await;
1623 let mut found = false;
1624 for existing in &mut snapshot.open_orders {
1625 if existing.id == order.id {
1626 *existing = order.clone();
1627 found = true;
1628 break;
1629 }
1630 }
1631 if !found {
1632 snapshot.open_orders.push(order.clone());
1633 }
1634 if matches!(
1635 order.status,
1636 OrderStatus::Filled | OrderStatus::Canceled | OrderStatus::Rejected
1637 ) {
1638 snapshot.open_orders.retain(|o| o.id != order.id);
1639 }
1640 }
1641 persist_state(state_repo, persisted, None).await?;
1642 Ok(())
1643}
1644
1645async fn emit_signals(
1646 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1647 bus: Arc<EventBus>,
1648 metrics: Arc<LiveMetrics>,
1649) {
1650 let signals = {
1651 let mut strat = strategy.lock().await;
1652 strat.drain_signals()
1653 };
1654 if signals.is_empty() {
1655 return;
1656 }
1657 metrics.inc_signals(signals.len());
1658 for signal in signals {
1659 bus.publish(Event::Signal(SignalEvent { signal }));
1660 }
1661}
1662
1663async fn persist_state(
1664 repo: Arc<dyn StateRepository>,
1665 persisted: Arc<Mutex<LiveState>>,
1666 strategy: Option<Arc<Mutex<Box<dyn Strategy>>>>,
1667) -> Result<()> {
1668 if let Some(strat_lock) = strategy {
1669 let strat = strat_lock.lock().await;
1671 if let Ok(json_state) = strat.snapshot() {
1672 let mut guard = persisted.lock().await;
1673 guard.strategy_state = Some(json_state);
1674 } else {
1675 warn!("failed to snapshot strategy state");
1676 }
1677 }
1678
1679 let snapshot = {
1680 let guard = persisted.lock().await;
1681 guard.clone()
1682 };
1683 tokio::task::spawn_blocking(move || repo.save(&snapshot))
1684 .await
1685 .map_err(|err| anyhow!("state persistence task failed: {err}"))?
1686 .map_err(|err| anyhow!(err.to_string()))
1687}
1688
1689async fn shared_risk_context(
1690 symbol: &str,
1691 portfolio: &Arc<Mutex<Portfolio>>,
1692 market: &Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1693 persisted: &Arc<Mutex<LiveState>>,
1694) -> RiskContext {
1695 let (signed_qty, equity, liquidate_only) = {
1696 let guard = portfolio.lock().await;
1697 (
1698 guard.signed_position_qty(symbol),
1699 guard.equity(),
1700 guard.liquidate_only(),
1701 )
1702 };
1703 let observed_price = {
1704 let guard = market.lock().await;
1705 guard.get(symbol).and_then(|snapshot| snapshot.price())
1706 };
1707 let last_price = if let Some(price) = observed_price {
1708 price
1709 } else {
1710 let guard = persisted.lock().await;
1711 guard
1712 .last_prices
1713 .get(symbol)
1714 .copied()
1715 .unwrap_or(Decimal::ZERO)
1716 };
1717 RiskContext {
1718 signed_position_qty: signed_qty,
1719 portfolio_equity: equity,
1720 last_price,
1721 liquidate_only,
1722 }
1723}
1724
1725async fn alert_liquidate_only(alerts: Arc<AlertManager>) {
1726 alerts
1727 .notify(
1728 "Max drawdown triggered",
1729 "Portfolio entered liquidate-only mode; new exposure blocked until review",
1730 )
1731 .await;
1732}
1733
1734fn spawn_connection_monitor(
1735 shutdown: ShutdownSignal,
1736 flag: Arc<AtomicBool>,
1737 metrics: Arc<LiveMetrics>,
1738 stream: &'static str,
1739) -> JoinHandle<()> {
1740 tokio::spawn(async move {
1741 loop {
1742 metrics.update_connection_status(stream, flag.load(Ordering::SeqCst));
1743 if !shutdown.sleep(Duration::from_secs(5)).await {
1744 break;
1745 }
1746 }
1747 })
1748}
1749
1750fn spawn_order_timeout_monitor(
1751 orchestrator: Arc<OrderOrchestrator>,
1752 bus: Arc<EventBus>,
1753 alerts: Arc<AlertManager>,
1754 shutdown: ShutdownSignal,
1755) -> JoinHandle<()> {
1756 tokio::spawn(async move {
1757 let mut ticker = tokio::time::interval(tesser_execution::orchestrator::ORDER_POLL_INTERVAL);
1758 loop {
1759 ticker.tick().await;
1760 if shutdown.triggered() {
1761 break;
1762 }
1763 match orchestrator.poll_stale_orders().await {
1764 Ok(updates) => {
1765 for order in updates {
1766 if matches!(order.status, OrderStatus::Rejected | OrderStatus::Canceled) {
1767 let message = format!(
1768 "Order {} for {} timed out after {}s",
1769 order.id,
1770 order.request.symbol,
1771 tesser_execution::orchestrator::ORDER_TIMEOUT.as_secs()
1772 );
1773 error!(%message);
1774 alerts.order_failure(&message).await;
1775 alerts.notify("Order timeout", &message).await;
1776 }
1777 bus.publish(Event::OrderUpdate(OrderUpdateEvent { order }));
1778 }
1779 }
1780 Err(err) => {
1781 warn!(error = %err, "order timeout monitor failed");
1782 }
1783 }
1784 }
1785 })
1786}
1787
1788async fn load_market_registry(
1789 client: Arc<dyn ExecutionClient>,
1790 settings: &LiveSessionSettings,
1791) -> Result<Arc<MarketRegistry>> {
1792 if let Some(path) = &settings.markets_file {
1793 let registry = MarketRegistry::load_from_file(path)
1794 .with_context(|| format!("failed to load markets from {}", path.display()))?;
1795 return Ok(Arc::new(registry));
1796 }
1797
1798 if settings.exec_backend.is_paper() {
1799 return Err(anyhow!(
1800 "paper execution requires --markets-file when exchange metadata is unavailable"
1801 ));
1802 }
1803
1804 let instruments = client
1805 .list_instruments(settings.category.as_path())
1806 .await
1807 .context("failed to fetch instruments from execution client")?;
1808 let registry =
1809 MarketRegistry::from_instruments(instruments).map_err(|err| anyhow!(err.to_string()))?;
1810 Ok(Arc::new(registry))
1811}
1812
1813#[cfg(feature = "bybit")]
1814#[allow(clippy::too_many_arguments)]
1815fn spawn_bybit_private_stream(
1816 creds: BybitCredentials,
1817 ws_url: String,
1818 private_tx: mpsc::Sender<BrokerEvent>,
1819 exec_client: Arc<dyn ExecutionClient>,
1820 symbols: Vec<String>,
1821 last_sync: Arc<tokio::sync::Mutex<Option<DateTime<Utc>>>>,
1822 private_connection_flag: Option<Arc<AtomicBool>>,
1823 metrics: Arc<LiveMetrics>,
1824 shutdown: ShutdownSignal,
1825) {
1826 tokio::spawn(async move {
1827 loop {
1828 match tesser_bybit::ws::connect_private(
1829 &ws_url,
1830 &creds,
1831 private_connection_flag.clone(),
1832 )
1833 .await
1834 {
1835 Ok(mut socket) => {
1836 if let Some(flag) = &private_connection_flag {
1837 flag.store(true, Ordering::SeqCst);
1838 }
1839 metrics.update_connection_status("private", true);
1840 info!("Connected to Bybit private WebSocket stream");
1841 for symbol in &symbols {
1842 match exec_client.list_open_orders(symbol).await {
1843 Ok(orders) => {
1844 for order in orders {
1845 if let Err(err) =
1846 private_tx.send(BrokerEvent::OrderUpdate(order)).await
1847 {
1848 error!("failed to send reconciled order update: {err}");
1849 }
1850 }
1851 }
1852 Err(e) => {
1853 error!("failed to reconcile open orders for {symbol}: {e}");
1854 }
1855 }
1856 }
1857 if let Some(bybit) = exec_client.as_any().downcast_ref::<BybitClient>() {
1858 let since = {
1859 let guard = last_sync.lock().await;
1860 guard.unwrap_or_else(|| Utc::now() - chrono::Duration::minutes(30))
1861 };
1862 match bybit.list_executions_since(since).await {
1863 Ok(fills) => {
1864 for fill in fills {
1865 if let Err(err) = private_tx.send(BrokerEvent::Fill(fill)).await
1866 {
1867 error!("failed to send reconciled fill: {err}");
1868 }
1869 }
1870 }
1871 Err(e) => {
1872 error!("failed to reconcile executions since {:?}: {}", since, e);
1873 }
1874 }
1875 let mut guard = last_sync.lock().await;
1876 *guard = Some(Utc::now());
1877 }
1878
1879 while let Some(msg) = socket.next().await {
1880 if shutdown.triggered() {
1881 break;
1882 }
1883 if let Ok(Message::Text(text)) = msg {
1884 if let Ok(value) = serde_json::from_str::<serde_json::Value>(&text) {
1885 if let Some(topic) = value.get("topic").and_then(|v| v.as_str()) {
1886 match topic {
1887 "order" => {
1888 if let Ok(msg) = serde_json::from_value::<
1889 PrivateMessage<BybitWsOrder>,
1890 >(
1891 value.clone()
1892 ) {
1893 for update in msg.data {
1894 if let Ok(order) = update.to_tesser_order(None)
1895 {
1896 if let Err(err) = private_tx
1897 .send(BrokerEvent::OrderUpdate(order))
1898 .await
1899 {
1900 error!(
1901 "failed to send private order update: {err}"
1902 );
1903 }
1904 }
1905 }
1906 }
1907 }
1908 "execution" => {
1909 if let Ok(msg) = serde_json::from_value::<
1910 PrivateMessage<BybitWsExecution>,
1911 >(
1912 value.clone()
1913 ) {
1914 for exec in msg.data {
1915 if let Ok(fill) = exec.to_tesser_fill() {
1916 if let Err(err) = private_tx
1917 .send(BrokerEvent::Fill(fill))
1918 .await
1919 {
1920 error!(
1921 "failed to send private fill event: {err}"
1922 );
1923 }
1924 }
1925 }
1926 }
1927 }
1928 _ => {}
1929 }
1930 }
1931 }
1932 }
1933 }
1934 }
1935 Err(e) => {
1936 if let Some(flag) = &private_connection_flag {
1937 flag.store(false, Ordering::SeqCst);
1938 }
1939 metrics.update_connection_status("private", false);
1940 error!("Bybit private WebSocket connection failed: {e}. Retrying...");
1941 tokio::time::sleep(Duration::from_secs(5)).await;
1942 }
1943 }
1944 if shutdown.triggered() {
1945 break;
1946 }
1947 }
1948 });
1949}
1950
1951#[cfg(feature = "binance")]
1952#[allow(clippy::too_many_arguments)]
1953fn spawn_binance_private_stream(
1954 exec_client: Arc<dyn ExecutionClient>,
1955 ws_url: String,
1956 private_tx: mpsc::Sender<BrokerEvent>,
1957 private_connection_flag: Option<Arc<AtomicBool>>,
1958 metrics: Arc<LiveMetrics>,
1959 shutdown: ShutdownSignal,
1960) {
1961 tokio::spawn(async move {
1962 loop {
1963 let Some(binance) = exec_client
1964 .as_ref()
1965 .as_any()
1966 .downcast_ref::<BinanceClient>()
1967 else {
1968 warn!("execution client is not Binance");
1969 return;
1970 };
1971 let listen_key = match binance.start_user_stream().await {
1972 Ok(key) => key,
1973 Err(err) => {
1974 error!("failed to start Binance user stream: {err}");
1975 tokio::time::sleep(Duration::from_secs(5)).await;
1976 continue;
1977 }
1978 };
1979 match BinanceUserDataStream::connect(&ws_url, &listen_key).await {
1980 Ok(user_stream) => {
1981 if let Some(flag) = &private_connection_flag {
1982 flag.store(true, Ordering::SeqCst);
1983 }
1984 metrics.update_connection_status("private", true);
1985 let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
1986 let tx_orders = private_tx.clone();
1987 user_stream.on_event(move |event| {
1988 if let Some(update) = extract_order_update(&event) {
1989 if let Some(order) = order_from_update(update) {
1990 let _ = tx_orders.blocking_send(BrokerEvent::OrderUpdate(order));
1991 }
1992 if let Some(fill) = fill_from_update(update) {
1993 let _ = tx_orders.blocking_send(BrokerEvent::Fill(fill));
1994 }
1995 }
1996 if matches!(event, UserDataStreamEventsResponse::ListenKeyExpired(_)) {
1997 let _ = reconnect_tx.try_send(());
1998 }
1999 });
2000 let keepalive_client = exec_client.clone();
2001 let keepalive_handle = tokio::spawn(async move {
2002 let mut interval = tokio::time::interval(Duration::from_secs(30 * 60));
2003 loop {
2004 interval.tick().await;
2005 let Some(client) = keepalive_client
2006 .as_ref()
2007 .as_any()
2008 .downcast_ref::<BinanceClient>()
2009 else {
2010 break;
2011 };
2012 if client.keepalive_user_stream().await.is_err() {
2013 break;
2014 }
2015 }
2016 });
2017 tokio::select! {
2018 _ = reconnect_rx.recv() => {
2019 warn!("binance listen key expired; reconnecting");
2020 }
2021 _ = shutdown.wait() => {
2022 keepalive_handle.abort();
2023 let _ = user_stream.unsubscribe().await;
2024 return;
2025 }
2026 }
2027 keepalive_handle.abort();
2028 let _ = user_stream.unsubscribe().await;
2029 }
2030 Err(err) => {
2031 error!("failed to connect to Binance user stream: {err}");
2032 }
2033 }
2034 if let Some(flag) = &private_connection_flag {
2035 flag.store(false, Ordering::SeqCst);
2036 }
2037 metrics.update_connection_status("private", false);
2038 if shutdown.triggered() {
2039 break;
2040 }
2041 tokio::time::sleep(Duration::from_secs(5)).await;
2042 }
2043 });
2044}