1use std::collections::{HashMap, HashSet};
2use std::net::SocketAddr;
3use std::path::PathBuf;
4use std::sync::{
5 atomic::{AtomicBool, Ordering},
6 Arc, Once,
7};
8use std::time::{Duration, Instant};
9
10use anyhow::{anyhow, bail, Context, Result};
11use chrono::{DateTime, Utc};
12use clap::ValueEnum;
13use futures::StreamExt;
14use rust_decimal::{prelude::ToPrimitive, Decimal};
15use tokio::sync::{mpsc, Mutex, Notify};
16use tokio::task::JoinHandle;
17use tokio_tungstenite::tungstenite::Message;
18use tracing::{error, info, trace, warn};
19
20use serde_json::{json, Value};
21
22fn ensure_builtin_connectors_registered() {
23 static INIT: Once = Once::new();
24 INIT.call_once(|| {
25 register_connector_factory(Arc::new(PaperFactory::default()));
26 #[cfg(feature = "bybit")]
27 register_bybit_factory();
28 #[cfg(feature = "binance")]
29 register_binance_factory();
30 });
31}
32
33#[cfg(feature = "binance")]
34use tesser_binance::{
35 fill_from_update, order_from_update, register_factory as register_binance_factory,
36 ws::{extract_order_update, BinanceUserDataStream, UserDataStreamEventsResponse},
37 BinanceClient,
38};
39use tesser_broker::{
40 get_connector_factory, register_connector_factory, BrokerResult, ConnectorFactory,
41 ConnectorStream, ConnectorStreamConfig, ExecutionClient,
42};
43#[cfg(feature = "bybit")]
44use tesser_bybit::ws::{BybitWsExecution, BybitWsOrder, PrivateMessage};
45#[cfg(feature = "bybit")]
46use tesser_bybit::{register_factory as register_bybit_factory, BybitClient, BybitCredentials};
47use tesser_config::{AlertingConfig, ExchangeConfig, RiskManagementConfig};
48use tesser_core::{
49 Candle, Fill, Interval, Order, OrderBook, OrderStatus, Position, Price, Quantity, Side, Signal,
50 Symbol, Tick,
51};
52use tesser_events::{
53 CandleEvent, Event, EventBus, FillEvent, OrderBookEvent, OrderUpdateEvent, SignalEvent,
54 TickEvent,
55};
56use tesser_execution::{
57 BasicRiskChecker, ExecutionEngine, FixedOrderSizer, OrderOrchestrator, PreTradeRiskChecker,
58 RiskContext, RiskLimits, SqliteAlgoStateRepository,
59};
60use tesser_markets::MarketRegistry;
61use tesser_paper::{PaperExecutionClient, PaperFactory};
62use tesser_portfolio::{
63 LiveState, Portfolio, PortfolioConfig, SqliteStateRepository, StateRepository,
64};
65use tesser_strategy::{Strategy, StrategyContext};
66
67use crate::alerts::{AlertDispatcher, AlertManager};
68use crate::telemetry::{spawn_metrics_server, LiveMetrics};
69use crate::PublicChannel;
70
71#[derive(Debug)]
73pub enum BrokerEvent {
74 OrderUpdate(Order),
75 Fill(Fill),
76}
77
78#[derive(Clone, Copy, Debug, Eq, PartialEq, ValueEnum)]
79#[value(rename_all = "kebab-case")]
80pub enum ExecutionBackend {
81 Paper,
82 Live,
83}
84
85impl ExecutionBackend {
86 fn is_paper(self) -> bool {
87 matches!(self, Self::Paper)
88 }
89}
90
91const DEFAULT_ORDER_BOOK_DEPTH: usize = 50;
92
93pub const fn default_order_book_depth() -> usize {
94 DEFAULT_ORDER_BOOK_DEPTH
95}
96const STRATEGY_LOCK_WARN_THRESHOLD: Duration = Duration::from_millis(25);
97const STRATEGY_CALL_WARN_THRESHOLD: Duration = Duration::from_millis(250);
98
99#[async_trait::async_trait]
100trait LiveMarketStream: Send {
101 async fn next_tick(&mut self) -> BrokerResult<Option<Tick>>;
102 async fn next_candle(&mut self) -> BrokerResult<Option<Candle>>;
103 async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>>;
104}
105
106struct FactoryStreamAdapter {
107 inner: Box<dyn ConnectorStream>,
108}
109
110impl FactoryStreamAdapter {
111 fn new(inner: Box<dyn ConnectorStream>) -> Self {
112 Self { inner }
113 }
114}
115
116#[async_trait::async_trait]
117impl LiveMarketStream for FactoryStreamAdapter {
118 async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
119 self.inner.next_tick().await
120 }
121
122 async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
123 self.inner.next_candle().await
124 }
125
126 async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>> {
127 self.inner.next_order_book().await
128 }
129}
130
131pub struct LiveSessionSettings {
132 pub category: PublicChannel,
133 pub interval: Interval,
134 pub quantity: Quantity,
135 pub slippage_bps: Decimal,
136 pub fee_bps: Decimal,
137 pub history: usize,
138 pub metrics_addr: SocketAddr,
139 pub state_path: PathBuf,
140 pub initial_balances: HashMap<Symbol, Decimal>,
141 pub reporting_currency: Symbol,
142 pub markets_file: Option<PathBuf>,
143 pub alerting: AlertingConfig,
144 pub exec_backend: ExecutionBackend,
145 pub risk: RiskManagementConfig,
146 pub reconciliation_interval: Duration,
147 pub reconciliation_threshold: Decimal,
148 pub driver: String,
149 pub orderbook_depth: usize,
150}
151
152impl LiveSessionSettings {
153 fn risk_limits(&self) -> RiskLimits {
154 RiskLimits {
155 max_order_quantity: self.risk.max_order_quantity.max(Decimal::ZERO),
156 max_position_quantity: self.risk.max_position_quantity.max(Decimal::ZERO),
157 }
158 }
159}
160
161pub async fn run_live(
162 strategy: Box<dyn Strategy>,
163 symbols: Vec<String>,
164 exchange: ExchangeConfig,
165 settings: LiveSessionSettings,
166) -> Result<()> {
167 run_live_with_shutdown(strategy, symbols, exchange, settings, ShutdownSignal::new()).await
168}
169
170pub async fn run_live_with_shutdown(
172 strategy: Box<dyn Strategy>,
173 symbols: Vec<String>,
174 exchange: ExchangeConfig,
175 settings: LiveSessionSettings,
176 shutdown: ShutdownSignal,
177) -> Result<()> {
178 if symbols.is_empty() {
179 return Err(anyhow!("strategy did not declare any subscriptions"));
180 }
181 if settings.quantity <= Decimal::ZERO {
182 return Err(anyhow!("--quantity must be positive"));
183 }
184
185 let public_connection = Arc::new(AtomicBool::new(false));
186 let private_connection = if matches!(settings.exec_backend, ExecutionBackend::Live) {
187 Some(Arc::new(AtomicBool::new(false)))
188 } else {
189 None
190 };
191 ensure_builtin_connectors_registered();
192 let connector_payload = build_exchange_payload(&exchange, &settings);
193 let connector_factory = get_connector_factory(&settings.driver)
194 .ok_or_else(|| anyhow!("driver {} is not registered", settings.driver))?;
195 let stream_config = ConnectorStreamConfig {
196 ws_url: Some(exchange.ws_url.clone()),
197 metadata: json!({
198 "category": settings.category.as_path(),
199 "symbols": symbols.clone(),
200 "orderbook_depth": settings.orderbook_depth,
201 }),
202 connection_status: Some(public_connection.clone()),
203 };
204 let mut connector_stream = connector_factory
205 .create_market_stream(&connector_payload, stream_config)
206 .await
207 .map_err(|err| anyhow!("failed to create market stream: {err}"))?;
208 connector_stream
209 .subscribe(&symbols, settings.interval)
210 .await
211 .map_err(|err| anyhow!("failed to subscribe via connector: {err}"))?;
212 let market_stream: Box<dyn LiveMarketStream> =
213 Box::new(FactoryStreamAdapter::new(connector_stream));
214
215 let execution_client =
216 build_execution_client(&settings, connector_factory.clone(), &connector_payload).await?;
217 let market_registry = load_market_registry(execution_client.clone(), &settings).await?;
218 if matches!(settings.exec_backend, ExecutionBackend::Live) {
219 info!(
220 rest = %exchange.rest_url,
221 driver = ?settings.driver,
222 "live execution enabled via {:?} REST",
223 settings.driver
224 );
225 }
226 let risk_checker: Arc<dyn PreTradeRiskChecker> =
227 Arc::new(BasicRiskChecker::new(settings.risk_limits()));
228 let execution = ExecutionEngine::new(
229 execution_client.clone(),
230 Box::new(FixedOrderSizer {
231 quantity: settings.quantity,
232 }),
233 risk_checker,
234 );
235
236 let algo_repo_path = settings.state_path.with_extension("algos.db");
238 let algo_state_repo = Arc::new(SqliteAlgoStateRepository::new(&algo_repo_path)?);
239
240 let orchestrator = OrderOrchestrator::new(Arc::new(execution), algo_state_repo).await?;
242
243 let runtime = LiveRuntime::new(
244 market_stream,
245 strategy,
246 symbols,
247 orchestrator,
248 settings,
249 exchange.ws_url.clone(),
250 market_registry,
251 shutdown,
252 public_connection,
253 private_connection,
254 )
255 .await?;
256 runtime.run().await
257}
258
259async fn build_execution_client(
260 settings: &LiveSessionSettings,
261 connector_factory: Arc<dyn ConnectorFactory>,
262 connector_payload: &Value,
263) -> Result<Arc<dyn ExecutionClient>> {
264 match settings.exec_backend {
265 ExecutionBackend::Paper => {
266 if settings.driver == "paper" {
267 return connector_factory
268 .create_execution_client(connector_payload)
269 .await
270 .map_err(|err| anyhow!("failed to create execution client: {err}"));
271 }
272 Ok(Arc::new(PaperExecutionClient::new(
273 "paper".to_string(),
274 vec!["BTCUSDT".to_string()],
275 settings.slippage_bps,
276 settings.fee_bps,
277 )))
278 }
279 ExecutionBackend::Live => connector_factory
280 .create_execution_client(connector_payload)
281 .await
282 .map_err(|err| anyhow!("failed to create execution client: {err}")),
283 }
284}
285
286struct LiveRuntime {
287 market: Box<dyn LiveMarketStream>,
288 orchestrator: Arc<OrderOrchestrator>,
289 state_repo: Arc<dyn StateRepository>,
290 persisted: Arc<Mutex<LiveState>>,
291 event_bus: Arc<EventBus>,
292 shutdown: ShutdownSignal,
293 metrics_task: JoinHandle<()>,
294 alert_task: Option<JoinHandle<()>>,
295 reconciliation_task: Option<JoinHandle<()>>,
296 reconciliation_ctx: Option<Arc<ReconciliationContext>>,
297 private_event_rx: mpsc::Receiver<BrokerEvent>,
298 #[allow(dead_code)]
299 last_private_sync: Arc<tokio::sync::Mutex<Option<DateTime<Utc>>>>,
300 subscriber_handles: Vec<JoinHandle<()>>,
301 connection_monitors: Vec<JoinHandle<()>>,
302 order_timeout_task: JoinHandle<()>,
303 strategy: Arc<Mutex<Box<dyn Strategy>>>,
304 _public_connection: Arc<AtomicBool>,
305 _private_connection: Option<Arc<AtomicBool>>,
306}
307
308impl LiveRuntime {
309 #[allow(clippy::too_many_arguments)]
310 async fn new(
311 market: Box<dyn LiveMarketStream>,
312 mut strategy: Box<dyn Strategy>,
313 symbols: Vec<String>,
314 orchestrator: OrderOrchestrator,
315 settings: LiveSessionSettings,
316 #[cfg_attr(not(feature = "binance"), allow(unused_variables))] exchange_ws_url: String,
317 market_registry: Arc<MarketRegistry>,
318 shutdown: ShutdownSignal,
319 public_connection: Arc<AtomicBool>,
320 private_connection: Option<Arc<AtomicBool>>,
321 ) -> Result<Self> {
322 let mut strategy_ctx = StrategyContext::new(settings.history);
323 let state_repo: Arc<dyn StateRepository> =
324 Arc::new(SqliteStateRepository::new(settings.state_path.clone()));
325 let mut persisted = match tokio::task::spawn_blocking({
326 let repo = state_repo.clone();
327 move || repo.load()
328 })
329 .await
330 {
331 Ok(Ok(state)) => state,
332 Ok(Err(err)) => {
333 warn!(error = %err, "failed to load live state; starting from defaults");
334 LiveState::default()
335 }
336 Err(err) => {
337 warn!(error = %err, "state load task failed; starting from defaults");
338 LiveState::default()
339 }
340 };
341 let mut live_bootstrap = None;
342 if matches!(settings.exec_backend, ExecutionBackend::Live) {
343 info!("Synchronizing portfolio state from exchange");
344 let execution_client = orchestrator.execution_engine().client();
345 let positions = execution_client
346 .positions()
347 .await
348 .context("failed to fetch remote positions")?;
349 let balances = execution_client
350 .account_balances()
351 .await
352 .context("failed to fetch remote account balances")?;
353 let mut open_orders = Vec::new();
354 for symbol in &symbols {
355 let mut symbol_orders = execution_client
356 .list_open_orders(symbol)
357 .await
358 .with_context(|| format!("failed to fetch open orders for {symbol}"))?;
359 open_orders.append(&mut symbol_orders);
360 }
361 persisted.open_orders = open_orders;
362 live_bootstrap = Some((positions, balances));
363 }
364
365 let portfolio_cfg = PortfolioConfig {
366 initial_balances: settings.initial_balances.clone(),
367 reporting_currency: settings.reporting_currency.clone(),
368 max_drawdown: Some(settings.risk.max_drawdown),
369 };
370 let portfolio = if let Some((positions, balances)) = live_bootstrap {
371 Portfolio::from_exchange_state(
372 positions,
373 balances,
374 portfolio_cfg.clone(),
375 market_registry.clone(),
376 )
377 } else if let Some(snapshot) = persisted.portfolio.take() {
378 Portfolio::from_state(snapshot, portfolio_cfg.clone(), market_registry.clone())
379 } else {
380 Portfolio::new(portfolio_cfg.clone(), market_registry.clone())
381 };
382 strategy_ctx.update_positions(portfolio.positions());
383 if let Some(state) = persisted.strategy_state.take() {
385 info!("restoring strategy state from persistence");
386 strategy
387 .restore(state)
388 .context("failed to restore strategy state")?;
389 }
390 persisted.portfolio = Some(portfolio.snapshot());
391
392 let mut market_snapshots = HashMap::new();
393 for symbol in &symbols {
394 let mut snapshot = MarketSnapshot::default();
395 if let Some(price) = persisted.last_prices.get(symbol).copied() {
396 snapshot.last_trade = Some(price);
397 }
398 market_snapshots.insert(symbol.clone(), snapshot);
399 }
400
401 let metrics = LiveMetrics::new();
402 metrics.update_connection_status("public", public_connection.load(Ordering::SeqCst));
403 if let Some(flag) = &private_connection {
404 metrics.update_connection_status("private", flag.load(Ordering::SeqCst));
405 }
406 let metrics_task = spawn_metrics_server(metrics.registry(), settings.metrics_addr);
407 let dispatcher = AlertDispatcher::new(settings.alerting.webhook_url.clone());
408 let alerts = AlertManager::new(
409 settings.alerting,
410 dispatcher,
411 Some(public_connection.clone()),
412 private_connection.clone(),
413 );
414 let (private_event_tx, private_event_rx) = mpsc::channel(1024);
415 let last_private_sync = Arc::new(tokio::sync::Mutex::new(persisted.last_candle_ts));
416 let alerts = Arc::new(alerts);
417 let alert_task = alerts.spawn_watchdog();
418 let metrics = Arc::new(metrics);
419 let mut connection_monitors = Vec::new();
420 connection_monitors.push(spawn_connection_monitor(
421 shutdown.clone(),
422 public_connection.clone(),
423 metrics.clone(),
424 "public",
425 ));
426 if let Some(flag) = private_connection.clone() {
427 connection_monitors.push(spawn_connection_monitor(
428 shutdown.clone(),
429 flag,
430 metrics.clone(),
431 "private",
432 ));
433 }
434
435 if !settings.exec_backend.is_paper() {
436 let execution_engine = orchestrator.execution_engine();
437 let exec_client = execution_engine.client();
438 match settings.driver.as_str() {
439 "bybit" | "" => {
440 #[cfg(feature = "bybit")]
441 {
442 let bybit = exec_client
443 .as_ref()
444 .as_any()
445 .downcast_ref::<BybitClient>()
446 .ok_or_else(|| anyhow!("execution client is not Bybit"))?;
447 let creds = bybit
448 .get_credentials()
449 .ok_or_else(|| anyhow!("live execution requires Bybit credentials"))?;
450 spawn_bybit_private_stream(
451 creds,
452 bybit.get_ws_url(),
453 private_event_tx.clone(),
454 exec_client.clone(),
455 symbols.clone(),
456 last_private_sync.clone(),
457 private_connection.clone(),
458 metrics.clone(),
459 shutdown.clone(),
460 );
461 }
462 #[cfg(not(feature = "bybit"))]
463 {
464 bail!("driver 'bybit' is unavailable without the 'bybit' feature");
465 }
466 }
467 "binance" => {
468 #[cfg(feature = "binance")]
469 {
470 spawn_binance_private_stream(
471 exec_client.clone(),
472 exchange_ws_url.clone(),
473 private_event_tx.clone(),
474 private_connection.clone(),
475 metrics.clone(),
476 shutdown.clone(),
477 );
478 }
479 #[cfg(not(feature = "binance"))]
480 {
481 bail!("driver 'binance' is unavailable without the 'binance' feature");
482 }
483 }
484 "paper" => {}
485 other => {
486 bail!("private stream unsupported for driver '{other}'");
487 }
488 }
489 }
490
491 let strategy = Arc::new(Mutex::new(strategy));
492 let strategy_ctx = Arc::new(Mutex::new(strategy_ctx));
493 let portfolio = Arc::new(Mutex::new(portfolio));
494 let market_cache = Arc::new(Mutex::new(market_snapshots));
495 let persisted = Arc::new(Mutex::new(persisted));
496 let orchestrator = Arc::new(orchestrator);
497 let event_bus = Arc::new(EventBus::new(2048));
498 let reconciliation_ctx = (!settings.exec_backend.is_paper()).then(|| {
499 Arc::new(ReconciliationContext::new(ReconciliationContextConfig {
500 client: orchestrator.execution_engine().client(),
501 portfolio: portfolio.clone(),
502 persisted: persisted.clone(),
503 state_repo: state_repo.clone(),
504 alerts: alerts.clone(),
505 metrics: metrics.clone(),
506 reporting_currency: settings.reporting_currency.clone(),
507 threshold: settings.reconciliation_threshold,
508 }))
509 });
510 let reconciliation_task = reconciliation_ctx.as_ref().map(|ctx| {
511 spawn_reconciliation_loop(
512 ctx.clone(),
513 shutdown.clone(),
514 settings.reconciliation_interval,
515 )
516 });
517 let subscriber_handles = spawn_event_subscribers(
518 event_bus.clone(),
519 strategy.clone(),
520 strategy_ctx.clone(),
521 orchestrator.clone(),
522 portfolio.clone(),
523 metrics.clone(),
524 alerts.clone(),
525 market_cache.clone(),
526 state_repo.clone(),
527 persisted.clone(),
528 settings.exec_backend,
529 );
530 let order_timeout_task = spawn_order_timeout_monitor(
531 orchestrator.clone(),
532 event_bus.clone(),
533 alerts.clone(),
534 shutdown.clone(),
535 );
536
537 info!(
538 symbols = ?symbols,
539 category = ?settings.category,
540 metrics_addr = %settings.metrics_addr,
541 state_path = %settings.state_path.display(),
542 history = settings.history,
543 "market stream ready"
544 );
545
546 for symbol in &symbols {
547 let ctx = shared_risk_context(symbol, &portfolio, &market_cache, &persisted).await;
548 orchestrator.update_risk_context(symbol.clone(), ctx);
549 }
550
551 Ok(Self {
552 market,
553 orchestrator,
554 state_repo,
555 persisted,
556 event_bus,
557 shutdown,
558 metrics_task,
559 alert_task,
560 reconciliation_task,
561 reconciliation_ctx,
562 private_event_rx,
563 last_private_sync,
564 subscriber_handles,
565 connection_monitors,
566 order_timeout_task,
567 strategy,
568 _public_connection: public_connection,
569 _private_connection: private_connection,
570 })
571 }
572
573 async fn run(mut self) -> Result<()> {
574 info!("live session started");
575 if let Some(ctx) = self.reconciliation_ctx.as_ref() {
576 perform_state_reconciliation(ctx.as_ref())
577 .await
578 .context("initial state reconciliation failed")?;
579 }
580 let backoff = Duration::from_millis(200);
581 let mut orchestrator_timer = tokio::time::interval(Duration::from_secs(1));
582
583 while !self.shutdown.triggered() {
584 let mut progressed = false;
585
586 if let Some(tick) = self.market.next_tick().await? {
587 progressed = true;
588 self.event_bus.publish(Event::Tick(TickEvent { tick }));
589 }
590
591 if let Some(candle) = self.market.next_candle().await? {
592 progressed = true;
593 self.event_bus
594 .publish(Event::Candle(CandleEvent { candle }));
595 }
596
597 if let Some(book) = self.market.next_order_book().await? {
598 progressed = true;
599 self.event_bus
600 .publish(Event::OrderBook(OrderBookEvent { order_book: book }));
601 }
602
603 tokio::select! {
604 biased;
605 Some(event) = self.private_event_rx.recv() => {
606 progressed = true;
607 match event {
608 BrokerEvent::OrderUpdate(order) => {
609 info!(
610 order_id = %order.id,
611 status = ?order.status,
612 symbol = %order.request.symbol,
613 "received private order update"
614 );
615 self.event_bus
616 .publish(Event::OrderUpdate(OrderUpdateEvent { order }));
617 }
618 BrokerEvent::Fill(fill) => {
619 info!(
620 order_id = %fill.order_id,
621 symbol = %fill.symbol,
622 qty = %fill.fill_quantity,
623 price = %fill.fill_price,
624 "received private fill"
625 );
626 self.event_bus.publish(Event::Fill(FillEvent { fill }));
627 }
628 }
629 }
630 _ = orchestrator_timer.tick() => {
631 if let Err(e) = self.orchestrator.on_timer_tick().await {
633 error!("Orchestrator timer tick failed: {}", e);
634 }
635 }
636 else => {}
637 }
638
639 if !progressed && !self.shutdown.sleep(backoff).await {
640 break;
641 }
642 }
643 info!("live session stopping");
644 self.metrics_task.abort();
645 if let Some(handle) = self.alert_task.take() {
646 handle.abort();
647 }
648 if let Some(handle) = self.reconciliation_task.take() {
649 handle.abort();
650 }
651 self.order_timeout_task.abort();
652 for handle in self.subscriber_handles.drain(..) {
653 handle.abort();
654 }
655 for handle in self.connection_monitors.drain(..) {
656 handle.abort();
657 }
658 if let Err(err) = persist_state(
659 self.state_repo.clone(),
660 self.persisted.clone(),
661 Some(self.strategy.clone()),
662 )
663 .await
664 {
665 warn!(error = %err, "failed to persist shutdown state");
666 }
667 Ok(())
668 }
669}
670
671struct ReconciliationContext {
672 client: Arc<dyn ExecutionClient>,
673 portfolio: Arc<Mutex<Portfolio>>,
674 persisted: Arc<Mutex<LiveState>>,
675 state_repo: Arc<dyn StateRepository>,
676 alerts: Arc<AlertManager>,
677 metrics: Arc<LiveMetrics>,
678 reporting_currency: Symbol,
679 threshold: Decimal,
680}
681
682struct ReconciliationContextConfig {
683 client: Arc<dyn ExecutionClient>,
684 portfolio: Arc<Mutex<Portfolio>>,
685 persisted: Arc<Mutex<LiveState>>,
686 state_repo: Arc<dyn StateRepository>,
687 alerts: Arc<AlertManager>,
688 metrics: Arc<LiveMetrics>,
689 reporting_currency: Symbol,
690 threshold: Decimal,
691}
692
693impl ReconciliationContext {
694 fn new(config: ReconciliationContextConfig) -> Self {
695 let ReconciliationContextConfig {
696 client,
697 portfolio,
698 persisted,
699 state_repo,
700 alerts,
701 metrics,
702 reporting_currency,
703 threshold,
704 } = config;
705 let min_threshold = Decimal::new(1, 6); let threshold = if threshold <= Decimal::ZERO {
707 min_threshold
708 } else {
709 threshold
710 };
711 Self {
712 client,
713 portfolio,
714 persisted,
715 state_repo,
716 alerts,
717 metrics,
718 reporting_currency,
719 threshold,
720 }
721 }
722}
723
724fn spawn_reconciliation_loop(
725 ctx: Arc<ReconciliationContext>,
726 shutdown: ShutdownSignal,
727 interval: Duration,
728) -> JoinHandle<()> {
729 tokio::spawn(async move {
730 while shutdown.sleep(interval).await {
731 if let Err(err) = perform_state_reconciliation(ctx.as_ref()).await {
732 error!(error = %err, "periodic state reconciliation failed");
733 }
734 }
735 })
736}
737
738async fn perform_state_reconciliation(ctx: &ReconciliationContext) -> Result<()> {
739 info!("running state reconciliation");
740 let remote_positions = ctx
741 .client
742 .positions()
743 .await
744 .context("failed to fetch remote positions")?;
745 let remote_balances = ctx
746 .client
747 .account_balances()
748 .await
749 .context("failed to fetch remote balances")?;
750 let (local_positions, local_cash) = {
751 let guard = ctx.portfolio.lock().await;
752 (guard.positions(), guard.cash())
753 };
754
755 let remote_map = positions_to_map(remote_positions);
756 let local_map = positions_to_map(local_positions);
757 let mut tracked_symbols: HashSet<String> = HashSet::new();
758 tracked_symbols.extend(remote_map.keys().cloned());
759 tracked_symbols.extend(local_map.keys().cloned());
760
761 let mut severe_findings = Vec::new();
762 for symbol in tracked_symbols {
763 let local_qty = local_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
764 let remote_qty = remote_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
765 let diff = (local_qty - remote_qty).abs();
766 let diff_value = diff.to_f64().unwrap_or(0.0);
767 ctx.metrics.update_position_diff(&symbol, diff_value);
768 if diff > Decimal::ZERO {
769 warn!(
770 symbol = %symbol,
771 local = %local_qty,
772 remote = %remote_qty,
773 diff = %diff,
774 "position mismatch detected during reconciliation"
775 );
776 let pct = normalize_diff(diff, remote_qty);
777 if pct >= ctx.threshold {
778 error!(
779 symbol = %symbol,
780 local = %local_qty,
781 remote = %remote_qty,
782 diff = %diff,
783 pct = %pct,
784 "position mismatch exceeds threshold"
785 );
786 severe_findings.push(format!(
787 "{symbol} local={local_qty} remote={remote_qty} diff={diff}"
788 ));
789 }
790 }
791 }
792
793 let reporting = ctx.reporting_currency.as_str();
794 let remote_cash = remote_balances
795 .iter()
796 .find(|balance| balance.currency == reporting)
797 .map(|balance| balance.available)
798 .unwrap_or_else(|| Decimal::ZERO);
799 let cash_diff = (remote_cash - local_cash).abs();
800 ctx.metrics
801 .update_balance_diff(reporting, cash_diff.to_f64().unwrap_or(0.0));
802 if cash_diff > Decimal::ZERO {
803 warn!(
804 currency = %reporting,
805 local = %local_cash,
806 remote = %remote_cash,
807 diff = %cash_diff,
808 "balance mismatch detected during reconciliation"
809 );
810 let pct = normalize_diff(cash_diff, remote_cash);
811 if pct >= ctx.threshold {
812 error!(
813 currency = %reporting,
814 local = %local_cash,
815 remote = %remote_cash,
816 diff = %cash_diff,
817 pct = %pct,
818 "balance mismatch exceeds threshold"
819 );
820 severe_findings.push(format!(
821 "{reporting} balance local={local_cash} remote={remote_cash} diff={cash_diff}"
822 ));
823 }
824 }
825
826 if severe_findings.is_empty() {
827 info!("state reconciliation complete with no critical divergence");
828 return Ok(());
829 }
830
831 let alert_body = severe_findings.join("; ");
832 ctx.alerts
833 .notify("State reconciliation divergence", &alert_body)
834 .await;
835 enforce_liquidate_only(ctx).await;
836 Ok(())
837}
838
839async fn enforce_liquidate_only(ctx: &ReconciliationContext) {
840 let snapshot = {
841 let mut guard = ctx.portfolio.lock().await;
842 if !guard.set_liquidate_only(true) {
843 return;
844 }
845 info!("entering liquidate-only mode due to reconciliation divergence");
846 guard.snapshot()
847 };
848 {
849 let mut state = ctx.persisted.lock().await;
850 state.portfolio = Some(snapshot);
851 }
852 if let Err(err) = persist_state(ctx.state_repo.clone(), ctx.persisted.clone(), None).await {
853 warn!(error = %err, "failed to persist liquidate-only transition");
854 }
855}
856
857fn positions_to_map(positions: Vec<Position>) -> HashMap<String, Decimal> {
858 let mut map = HashMap::new();
859 for position in positions {
860 map.insert(position.symbol.clone(), position_signed_qty(&position));
861 }
862 map
863}
864
865fn position_signed_qty(position: &Position) -> Decimal {
866 match position.side {
867 Some(Side::Buy) => position.quantity,
868 Some(Side::Sell) => -position.quantity,
869 None => Decimal::ZERO,
870 }
871}
872
873fn normalize_diff(diff: Decimal, reference: Decimal) -> Decimal {
874 if diff <= Decimal::ZERO {
875 Decimal::ZERO
876 } else {
877 let denominator = std::cmp::max(reference.abs(), Decimal::ONE);
878 diff / denominator
879 }
880}
881
882fn build_exchange_payload(exchange: &ExchangeConfig, settings: &LiveSessionSettings) -> Value {
883 let mut payload = serde_json::Map::new();
884 payload.insert("rest_url".into(), Value::String(exchange.rest_url.clone()));
885 payload.insert("ws_url".into(), Value::String(exchange.ws_url.clone()));
886 payload.insert("api_key".into(), Value::String(exchange.api_key.clone()));
887 payload.insert(
888 "api_secret".into(),
889 Value::String(exchange.api_secret.clone()),
890 );
891 payload.insert(
892 "category".into(),
893 Value::String(settings.category.as_path().to_string()),
894 );
895 payload.insert(
896 "orderbook_depth".into(),
897 Value::Number(serde_json::Number::from(settings.orderbook_depth as u64)),
898 );
899 if let Value::Object(extra) = exchange.params.clone() {
900 for (key, value) in extra {
901 payload.insert(key, value);
902 }
903 }
904 Value::Object(payload)
905}
906
907#[derive(Default)]
908struct MarketSnapshot {
909 last_trade: Option<Price>,
910 last_trade_ts: Option<DateTime<Utc>>,
911 last_candle: Option<Candle>,
912}
913
914impl MarketSnapshot {
915 fn price(&self) -> Option<Price> {
916 self.last_trade
917 .or_else(|| self.last_candle.as_ref().map(|c| c.close))
918 }
919}
920
921#[derive(Clone)]
922pub struct ShutdownSignal {
923 flag: Arc<AtomicBool>,
924 notify: Arc<Notify>,
925}
926
927impl ShutdownSignal {
928 pub fn new() -> Self {
929 let flag = Arc::new(AtomicBool::new(false));
930 let notify = Arc::new(Notify::new());
931 let flag_clone = flag.clone();
932 let notify_clone = notify.clone();
933 tokio::spawn(async move {
934 if tokio::signal::ctrl_c().await.is_ok() {
935 flag_clone.store(true, Ordering::SeqCst);
936 notify_clone.notify_waiters();
937 }
938 });
939 Self { flag, notify }
940 }
941
942 pub fn trigger(&self) {
943 self.flag.store(true, Ordering::SeqCst);
944 self.notify.notify_waiters();
945 }
946
947 fn triggered(&self) -> bool {
948 self.flag.load(Ordering::SeqCst)
949 }
950
951 #[cfg_attr(not(feature = "binance"), allow(dead_code))]
952 async fn wait(&self) {
953 if self.triggered() {
954 return;
955 }
956 self.notify.notified().await;
957 }
958
959 async fn sleep(&self, duration: Duration) -> bool {
960 tokio::select! {
961 _ = tokio::time::sleep(duration) => true,
962 _ = self.notify.notified() => false,
963 }
964 }
965}
966
967impl Default for ShutdownSignal {
968 fn default() -> Self {
969 Self::new()
970 }
971}
972
973#[allow(clippy::too_many_arguments)]
974fn spawn_event_subscribers(
975 bus: Arc<EventBus>,
976 strategy: Arc<Mutex<Box<dyn Strategy>>>,
977 strategy_ctx: Arc<Mutex<StrategyContext>>,
978 orchestrator: Arc<OrderOrchestrator>,
979 portfolio: Arc<Mutex<Portfolio>>,
980 metrics: Arc<LiveMetrics>,
981 alerts: Arc<AlertManager>,
982 market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
983 state_repo: Arc<dyn StateRepository>,
984 persisted: Arc<Mutex<LiveState>>,
985 exec_backend: ExecutionBackend,
986) -> Vec<JoinHandle<()>> {
987 let mut handles = Vec::new();
988
989 let market_bus = bus.clone();
990 let market_strategy = strategy.clone();
991 let market_ctx = strategy_ctx.clone();
992 let market_metrics = metrics.clone();
993 let market_alerts = alerts.clone();
994 let market_state = state_repo.clone();
995 let market_persisted = persisted.clone();
996 let market_portfolio = portfolio.clone();
997 let market_snapshot = market.clone();
998 let orchestrator_clone = orchestrator.clone();
999 handles.push(tokio::spawn(async move {
1000 let mut stream = market_bus.subscribe();
1001 loop {
1002 match stream.recv().await {
1003 Ok(Event::Tick(evt)) => {
1004 if let Err(err) = process_tick_event(
1005 evt.tick,
1006 market_strategy.clone(),
1007 market_ctx.clone(),
1008 market_metrics.clone(),
1009 market_alerts.clone(),
1010 market_snapshot.clone(),
1011 market_portfolio.clone(),
1012 market_state.clone(),
1013 market_persisted.clone(),
1014 market_bus.clone(),
1015 )
1016 .await
1017 {
1018 warn!(error = %err, "tick handler failed");
1019 }
1020 }
1021 Ok(Event::Candle(evt)) => {
1022 if let Err(err) = process_candle_event(
1023 evt.candle,
1024 market_strategy.clone(),
1025 market_ctx.clone(),
1026 market_metrics.clone(),
1027 market_alerts.clone(),
1028 market_snapshot.clone(),
1029 market_portfolio.clone(),
1030 orchestrator_clone.clone(),
1031 exec_backend,
1032 market_state.clone(),
1033 market_persisted.clone(),
1034 market_bus.clone(),
1035 )
1036 .await
1037 {
1038 warn!(error = %err, "candle handler failed");
1039 }
1040 }
1041 Ok(Event::OrderBook(evt)) => {
1042 if let Err(err) = process_order_book_event(
1043 evt.order_book,
1044 market_strategy.clone(),
1045 market_ctx.clone(),
1046 market_metrics.clone(),
1047 market_alerts.clone(),
1048 market_snapshot.clone(),
1049 market_bus.clone(),
1050 )
1051 .await
1052 {
1053 warn!(error = %err, "order book handler failed");
1054 }
1055 }
1056 Ok(_) => {}
1057 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1058 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1059 warn!(lag = lag, "market subscriber lagged");
1060 continue;
1061 }
1062 }
1063 }
1064 }));
1065
1066 let exec_bus = bus.clone();
1067 let exec_portfolio = portfolio.clone();
1068 let exec_market = market.clone();
1069 let exec_persisted = persisted.clone();
1070 let exec_alerts = alerts.clone();
1071 let exec_metrics = metrics.clone();
1072 let exec_orchestrator = orchestrator.clone();
1073 handles.push(tokio::spawn(async move {
1074 let orchestrator = exec_orchestrator.clone();
1075 let mut stream = exec_bus.subscribe();
1076 loop {
1077 match stream.recv().await {
1078 Ok(Event::Signal(evt)) => {
1079 if let Err(err) = process_signal_event(
1080 evt.signal,
1081 orchestrator.clone(),
1082 exec_portfolio.clone(),
1083 exec_market.clone(),
1084 exec_persisted.clone(),
1085 exec_alerts.clone(),
1086 exec_metrics.clone(),
1087 )
1088 .await
1089 {
1090 warn!(error = %err, "signal handler failed");
1091 }
1092 }
1093 Ok(_) => {}
1094 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1095 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1096 warn!(lag = lag, "signal subscriber lagged");
1097 continue;
1098 }
1099 }
1100 }
1101 }));
1102
1103 let fill_bus = bus.clone();
1104 let fill_state = state_repo.clone();
1105 let fill_orchestrator = orchestrator.clone();
1106 let fill_persisted = persisted.clone();
1107 let fill_alerts = alerts.clone();
1108 handles.push(tokio::spawn(async move {
1109 let orchestrator = fill_orchestrator.clone();
1110 let persisted = fill_persisted.clone();
1111 let mut stream = fill_bus.subscribe();
1112 loop {
1113 match stream.recv().await {
1114 Ok(Event::Fill(evt)) => {
1115 if let Err(err) = process_fill_event(
1116 evt.fill,
1117 portfolio.clone(),
1118 strategy.clone(),
1119 strategy_ctx.clone(),
1120 orchestrator.clone(),
1121 metrics.clone(),
1122 fill_alerts.clone(),
1123 fill_state.clone(),
1124 persisted.clone(),
1125 )
1126 .await
1127 {
1128 warn!(error = %err, "fill handler failed");
1129 }
1130 }
1131 Ok(_) => {}
1132 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1133 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1134 warn!(lag = lag, "fill subscriber lagged");
1135 continue;
1136 }
1137 }
1138 }
1139 }));
1140
1141 let order_bus = bus.clone();
1142 let order_persisted = persisted.clone();
1143 let order_alerts = alerts.clone();
1144 let order_orchestrator = orchestrator.clone();
1145 handles.push(tokio::spawn(async move {
1148 let orchestrator = order_orchestrator.clone();
1149 let persisted = order_persisted.clone();
1150 let mut stream = order_bus.subscribe();
1151 loop {
1152 match stream.recv().await {
1153 Ok(Event::OrderUpdate(evt)) => {
1154 if let Err(err) = process_order_update_event(
1155 evt.order,
1156 orchestrator.clone(),
1157 order_alerts.clone(),
1158 state_repo.clone(),
1159 persisted.clone(),
1160 )
1161 .await
1162 {
1163 warn!(error = %err, "order update handler failed");
1164 }
1165 }
1166 Ok(_) => {}
1167 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1168 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1169 warn!(lag = lag, "order subscriber lagged");
1170 continue;
1171 }
1172 }
1173 }
1174 }));
1175
1176 handles
1177}
1178
1179#[allow(clippy::too_many_arguments)]
1180async fn process_tick_event(
1181 tick: Tick,
1182 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1183 strategy_ctx: Arc<Mutex<StrategyContext>>,
1184 metrics: Arc<LiveMetrics>,
1185 alerts: Arc<AlertManager>,
1186 market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1187 portfolio: Arc<Mutex<Portfolio>>,
1188 state_repo: Arc<dyn StateRepository>,
1189 persisted: Arc<Mutex<LiveState>>,
1190 bus: Arc<EventBus>,
1191) -> Result<()> {
1192 metrics.inc_tick();
1193 metrics.update_staleness(0.0);
1194 metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1195 alerts.heartbeat().await;
1196 {
1197 let mut guard = market.lock().await;
1198 if let Some(snapshot) = guard.get_mut(&tick.symbol) {
1199 snapshot.last_trade = Some(tick.price);
1200 snapshot.last_trade_ts = Some(tick.exchange_timestamp);
1201 }
1202 }
1203 let mut drawdown_triggered = false;
1204 let mut snapshot_on_trigger = None;
1205 {
1206 let mut guard = portfolio.lock().await;
1207 let was_liquidate_only = guard.liquidate_only();
1208 match guard.update_market_data(&tick.symbol, tick.price) {
1209 Ok(_) => {
1210 if !was_liquidate_only && guard.liquidate_only() {
1211 drawdown_triggered = true;
1212 snapshot_on_trigger = Some(guard.snapshot());
1213 }
1214 }
1215 Err(err) => {
1216 warn!(
1217 symbol = %tick.symbol,
1218 error = %err,
1219 "failed to refresh market data"
1220 );
1221 }
1222 }
1223 }
1224 {
1225 let mut state = persisted.lock().await;
1226 state.last_prices.insert(tick.symbol.clone(), tick.price);
1227 if drawdown_triggered {
1228 if let Some(snapshot) = snapshot_on_trigger.take() {
1229 state.portfolio = Some(snapshot);
1230 }
1231 }
1232 }
1233 if drawdown_triggered {
1234 persist_state(
1235 state_repo.clone(),
1236 persisted.clone(),
1237 Some(strategy.clone()),
1238 )
1239 .await?;
1240 alert_liquidate_only(alerts.clone()).await;
1241 }
1242 {
1243 let mut ctx = strategy_ctx.lock().await;
1244 ctx.push_tick(tick.clone());
1245 let lock_start = Instant::now();
1246 let mut strat = strategy.lock().await;
1247 log_strategy_lock("tick", lock_start.elapsed());
1248 let call_start = Instant::now();
1249 strat
1250 .on_tick(&ctx, &tick)
1251 .await
1252 .context("strategy failure on tick event")?;
1253 log_strategy_call("tick", call_start.elapsed());
1254 }
1255 emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1256 Ok(())
1257}
1258
1259#[allow(clippy::too_many_arguments)]
1260async fn process_candle_event(
1261 candle: Candle,
1262 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1263 strategy_ctx: Arc<Mutex<StrategyContext>>,
1264 metrics: Arc<LiveMetrics>,
1265 alerts: Arc<AlertManager>,
1266 market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1267 portfolio: Arc<Mutex<Portfolio>>,
1268 orchestrator: Arc<OrderOrchestrator>,
1269 exec_backend: ExecutionBackend,
1270 state_repo: Arc<dyn StateRepository>,
1271 persisted: Arc<Mutex<LiveState>>,
1272 bus: Arc<EventBus>,
1273) -> Result<()> {
1274 metrics.inc_candle();
1275 metrics.update_staleness(0.0);
1276 metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1277 alerts.heartbeat().await;
1278 metrics.update_price(&candle.symbol, candle.close.to_f64().unwrap_or(0.0));
1279 {
1280 let mut guard = market.lock().await;
1281 if let Some(snapshot) = guard.get_mut(&candle.symbol) {
1282 snapshot.last_candle = Some(candle.clone());
1283 snapshot.last_trade = Some(candle.close);
1284 }
1285 }
1286 if exec_backend.is_paper() {
1287 let client = orchestrator.execution_engine().client();
1288 if let Some(paper) = client.as_any().downcast_ref::<PaperExecutionClient>() {
1289 paper.update_price(&candle.symbol, candle.close);
1290 }
1291 }
1292 let mut candle_drawdown_triggered = false;
1293 let mut candle_snapshot = None;
1294 {
1295 let mut guard = portfolio.lock().await;
1296 let was_liquidate_only = guard.liquidate_only();
1297 match guard.update_market_data(&candle.symbol, candle.close) {
1298 Ok(_) => {
1299 if !was_liquidate_only && guard.liquidate_only() {
1300 candle_drawdown_triggered = true;
1301 candle_snapshot = Some(guard.snapshot());
1302 }
1303 }
1304 Err(err) => {
1305 warn!(
1306 symbol = %candle.symbol,
1307 error = %err,
1308 "failed to refresh market data"
1309 );
1310 }
1311 }
1312 }
1313 if candle_drawdown_triggered {
1314 if let Some(snapshot) = candle_snapshot.take() {
1315 let mut persisted_guard = persisted.lock().await;
1316 persisted_guard.portfolio = Some(snapshot);
1317 }
1318 alert_liquidate_only(alerts.clone()).await;
1319 }
1320 {
1321 let mut ctx = strategy_ctx.lock().await;
1322 ctx.push_candle(candle.clone());
1323 let lock_start = Instant::now();
1324 let mut strat = strategy.lock().await;
1325 log_strategy_lock("candle", lock_start.elapsed());
1326 let call_start = Instant::now();
1327 strat
1328 .on_candle(&ctx, &candle)
1329 .await
1330 .context("strategy failure on candle event")?;
1331 log_strategy_call("candle", call_start.elapsed());
1332 }
1333 {
1334 let mut snapshot = persisted.lock().await;
1335 snapshot.last_candle_ts = Some(candle.timestamp);
1336 snapshot
1337 .last_prices
1338 .insert(candle.symbol.clone(), candle.close);
1339 }
1340 persist_state(
1341 state_repo.clone(),
1342 persisted.clone(),
1343 Some(strategy.clone()),
1344 )
1345 .await?;
1346 let ctx = shared_risk_context(&candle.symbol, &portfolio, &market, &persisted).await;
1347 orchestrator.update_risk_context(candle.symbol.clone(), ctx);
1348 emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1349 Ok(())
1350}
1351
1352async fn process_order_book_event(
1353 book: OrderBook,
1354 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1355 strategy_ctx: Arc<Mutex<StrategyContext>>,
1356 metrics: Arc<LiveMetrics>,
1357 alerts: Arc<AlertManager>,
1358 _market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1359 bus: Arc<EventBus>,
1360) -> Result<()> {
1361 metrics.update_staleness(0.0);
1362 alerts.heartbeat().await;
1363 {
1364 let mut ctx = strategy_ctx.lock().await;
1365 ctx.push_order_book(book.clone());
1366 let lock_start = Instant::now();
1367 let mut strat = strategy.lock().await;
1368 log_strategy_lock("order_book", lock_start.elapsed());
1369 let call_start = Instant::now();
1370 strat
1371 .on_order_book(&ctx, &book)
1372 .await
1373 .context("strategy failure on order book")?;
1374 log_strategy_call("order_book", call_start.elapsed());
1375 }
1376 emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1377 Ok(())
1378}
1379
1380async fn process_signal_event(
1381 signal: Signal,
1382 orchestrator: Arc<OrderOrchestrator>,
1383 portfolio: Arc<Mutex<Portfolio>>,
1384 market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1385 persisted: Arc<Mutex<LiveState>>,
1386 alerts: Arc<AlertManager>,
1387 metrics: Arc<LiveMetrics>,
1388) -> Result<()> {
1389 let ctx = shared_risk_context(&signal.symbol, &portfolio, &market, &persisted).await;
1390 orchestrator.update_risk_context(signal.symbol.clone(), ctx);
1391 match orchestrator.on_signal(&signal, &ctx).await {
1392 Ok(()) => {
1393 alerts.reset_order_failures().await;
1394 }
1395 Err(err) => {
1396 metrics.inc_order_failure();
1397 alerts
1398 .order_failure(&format!("orchestrator error: {err}"))
1399 .await;
1400 }
1401 }
1402 Ok(())
1403}
1404
1405fn log_strategy_lock(event: &str, wait: Duration) {
1406 let wait_ms = wait.as_secs_f64() * 1000.0;
1407 if wait >= STRATEGY_LOCK_WARN_THRESHOLD {
1408 warn!(target: "strategy", event, wait_ms, "strategy lock wait exceeded threshold");
1409 } else {
1410 trace!(target: "strategy", event, wait_ms, "strategy lock acquired");
1411 }
1412}
1413
1414fn log_strategy_call(event: &str, elapsed: Duration) {
1415 let duration_ms = elapsed.as_secs_f64() * 1000.0;
1416 if elapsed >= STRATEGY_CALL_WARN_THRESHOLD {
1417 warn!(target: "strategy", event, duration_ms, "strategy call latency above threshold");
1418 } else {
1419 trace!(target: "strategy", event, duration_ms, "strategy call completed");
1420 }
1421}
1422
1423#[allow(clippy::too_many_arguments)]
1424async fn process_fill_event(
1425 fill: Fill,
1426 portfolio: Arc<Mutex<Portfolio>>,
1427 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1428 strategy_ctx: Arc<Mutex<StrategyContext>>,
1429 orchestrator: Arc<OrderOrchestrator>,
1430 metrics: Arc<LiveMetrics>,
1431 alerts: Arc<AlertManager>,
1432 state_repo: Arc<dyn StateRepository>,
1433 persisted: Arc<Mutex<LiveState>>,
1434) -> Result<()> {
1435 let mut drawdown_triggered = false;
1436 {
1437 let mut guard = portfolio.lock().await;
1438 let was_liquidate_only = guard.liquidate_only();
1439 guard
1440 .apply_fill(&fill)
1441 .context("Failed to apply fill to portfolio")?;
1442 if !was_liquidate_only && guard.liquidate_only() {
1443 drawdown_triggered = true;
1444 }
1445 let snapshot = guard.snapshot();
1446 let mut persisted_guard = persisted.lock().await;
1447 persisted_guard.portfolio = Some(snapshot);
1448 }
1449 {
1450 let positions = {
1451 let guard = portfolio.lock().await;
1452 guard.positions()
1453 };
1454 let mut ctx = strategy_ctx.lock().await;
1455 ctx.update_positions(positions);
1456 }
1457 orchestrator.on_fill(&fill).await.ok();
1458 {
1459 let ctx = strategy_ctx.lock().await;
1460 let lock_start = Instant::now();
1461 let mut strat = strategy.lock().await;
1462 log_strategy_lock("fill", lock_start.elapsed());
1463 let call_start = Instant::now();
1464 strat
1465 .on_fill(&ctx, &fill)
1466 .await
1467 .context("Strategy failed on fill event")?;
1468 log_strategy_call("fill", call_start.elapsed());
1469 }
1470 let equity = {
1471 let guard = portfolio.lock().await;
1472 guard.equity()
1473 };
1474 if let Some(value) = equity.to_f64() {
1475 metrics.update_equity(value);
1476 }
1477 alerts.update_equity(equity).await;
1478 metrics.inc_order();
1479 alerts
1480 .notify(
1481 "Order Filled",
1482 &format!(
1483 "order filled: {}@{} ({})",
1484 fill.fill_quantity,
1485 fill.fill_price,
1486 match fill.side {
1487 Side::Buy => "buy",
1488 Side::Sell => "sell",
1489 }
1490 ),
1491 )
1492 .await;
1493 if drawdown_triggered {
1494 alert_liquidate_only(alerts.clone()).await;
1495 }
1496 persist_state(
1497 state_repo.clone(),
1498 persisted.clone(),
1499 Some(strategy.clone()),
1500 )
1501 .await?;
1502 Ok(())
1503}
1504
1505async fn process_order_update_event(
1506 order: Order,
1507 orchestrator: Arc<OrderOrchestrator>,
1508 alerts: Arc<AlertManager>,
1509 state_repo: Arc<dyn StateRepository>,
1510 persisted: Arc<Mutex<LiveState>>,
1511) -> Result<()> {
1512 orchestrator.on_order_update(&order);
1513 if matches!(order.status, OrderStatus::Rejected) {
1514 error!(
1515 order_id = %order.id,
1516 symbol = %order.request.symbol,
1517 "order rejected by exchange"
1518 );
1519 alerts.order_failure("order rejected by exchange").await;
1520 alerts
1521 .notify(
1522 "Order rejected",
1523 &format!(
1524 "Order {} for {} was rejected",
1525 order.id, order.request.symbol
1526 ),
1527 )
1528 .await;
1529 }
1530 {
1531 let mut snapshot = persisted.lock().await;
1532 let mut found = false;
1533 for existing in &mut snapshot.open_orders {
1534 if existing.id == order.id {
1535 *existing = order.clone();
1536 found = true;
1537 break;
1538 }
1539 }
1540 if !found {
1541 snapshot.open_orders.push(order.clone());
1542 }
1543 if matches!(
1544 order.status,
1545 OrderStatus::Filled | OrderStatus::Canceled | OrderStatus::Rejected
1546 ) {
1547 snapshot.open_orders.retain(|o| o.id != order.id);
1548 }
1549 }
1550 persist_state(state_repo, persisted, None).await?;
1551 Ok(())
1552}
1553
1554async fn emit_signals(
1555 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1556 bus: Arc<EventBus>,
1557 metrics: Arc<LiveMetrics>,
1558) {
1559 let signals = {
1560 let mut strat = strategy.lock().await;
1561 strat.drain_signals()
1562 };
1563 if signals.is_empty() {
1564 return;
1565 }
1566 metrics.inc_signals(signals.len());
1567 for signal in signals {
1568 bus.publish(Event::Signal(SignalEvent { signal }));
1569 }
1570}
1571
1572async fn persist_state(
1573 repo: Arc<dyn StateRepository>,
1574 persisted: Arc<Mutex<LiveState>>,
1575 strategy: Option<Arc<Mutex<Box<dyn Strategy>>>>,
1576) -> Result<()> {
1577 if let Some(strat_lock) = strategy {
1578 let strat = strat_lock.lock().await;
1580 if let Ok(json_state) = strat.snapshot() {
1581 let mut guard = persisted.lock().await;
1582 guard.strategy_state = Some(json_state);
1583 } else {
1584 warn!("failed to snapshot strategy state");
1585 }
1586 }
1587
1588 let snapshot = {
1589 let guard = persisted.lock().await;
1590 guard.clone()
1591 };
1592 tokio::task::spawn_blocking(move || repo.save(&snapshot))
1593 .await
1594 .map_err(|err| anyhow!("state persistence task failed: {err}"))?
1595 .map_err(|err| anyhow!(err.to_string()))
1596}
1597
1598async fn shared_risk_context(
1599 symbol: &str,
1600 portfolio: &Arc<Mutex<Portfolio>>,
1601 market: &Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1602 persisted: &Arc<Mutex<LiveState>>,
1603) -> RiskContext {
1604 let (signed_qty, equity, liquidate_only) = {
1605 let guard = portfolio.lock().await;
1606 (
1607 guard.signed_position_qty(symbol),
1608 guard.equity(),
1609 guard.liquidate_only(),
1610 )
1611 };
1612 let observed_price = {
1613 let guard = market.lock().await;
1614 guard.get(symbol).and_then(|snapshot| snapshot.price())
1615 };
1616 let last_price = if let Some(price) = observed_price {
1617 price
1618 } else {
1619 let guard = persisted.lock().await;
1620 guard
1621 .last_prices
1622 .get(symbol)
1623 .copied()
1624 .unwrap_or(Decimal::ZERO)
1625 };
1626 RiskContext {
1627 signed_position_qty: signed_qty,
1628 portfolio_equity: equity,
1629 last_price,
1630 liquidate_only,
1631 }
1632}
1633
1634async fn alert_liquidate_only(alerts: Arc<AlertManager>) {
1635 alerts
1636 .notify(
1637 "Max drawdown triggered",
1638 "Portfolio entered liquidate-only mode; new exposure blocked until review",
1639 )
1640 .await;
1641}
1642
1643fn spawn_connection_monitor(
1644 shutdown: ShutdownSignal,
1645 flag: Arc<AtomicBool>,
1646 metrics: Arc<LiveMetrics>,
1647 stream: &'static str,
1648) -> JoinHandle<()> {
1649 tokio::spawn(async move {
1650 loop {
1651 metrics.update_connection_status(stream, flag.load(Ordering::SeqCst));
1652 if !shutdown.sleep(Duration::from_secs(5)).await {
1653 break;
1654 }
1655 }
1656 })
1657}
1658
1659fn spawn_order_timeout_monitor(
1660 orchestrator: Arc<OrderOrchestrator>,
1661 bus: Arc<EventBus>,
1662 alerts: Arc<AlertManager>,
1663 shutdown: ShutdownSignal,
1664) -> JoinHandle<()> {
1665 tokio::spawn(async move {
1666 let mut ticker = tokio::time::interval(tesser_execution::orchestrator::ORDER_POLL_INTERVAL);
1667 loop {
1668 ticker.tick().await;
1669 if shutdown.triggered() {
1670 break;
1671 }
1672 match orchestrator.poll_stale_orders().await {
1673 Ok(updates) => {
1674 for order in updates {
1675 if matches!(order.status, OrderStatus::Rejected | OrderStatus::Canceled) {
1676 let message = format!(
1677 "Order {} for {} timed out after {}s",
1678 order.id,
1679 order.request.symbol,
1680 tesser_execution::orchestrator::ORDER_TIMEOUT.as_secs()
1681 );
1682 error!(%message);
1683 alerts.order_failure(&message).await;
1684 alerts.notify("Order timeout", &message).await;
1685 }
1686 bus.publish(Event::OrderUpdate(OrderUpdateEvent { order }));
1687 }
1688 }
1689 Err(err) => {
1690 warn!(error = %err, "order timeout monitor failed");
1691 }
1692 }
1693 }
1694 })
1695}
1696
1697async fn load_market_registry(
1698 client: Arc<dyn ExecutionClient>,
1699 settings: &LiveSessionSettings,
1700) -> Result<Arc<MarketRegistry>> {
1701 if let Some(path) = &settings.markets_file {
1702 let registry = MarketRegistry::load_from_file(path)
1703 .with_context(|| format!("failed to load markets from {}", path.display()))?;
1704 return Ok(Arc::new(registry));
1705 }
1706
1707 if settings.exec_backend.is_paper() {
1708 return Err(anyhow!(
1709 "paper execution requires --markets-file when exchange metadata is unavailable"
1710 ));
1711 }
1712
1713 let instruments = client
1714 .list_instruments(settings.category.as_path())
1715 .await
1716 .context("failed to fetch instruments from execution client")?;
1717 let registry =
1718 MarketRegistry::from_instruments(instruments).map_err(|err| anyhow!(err.to_string()))?;
1719 Ok(Arc::new(registry))
1720}
1721
1722#[cfg(feature = "bybit")]
1723#[allow(clippy::too_many_arguments)]
1724fn spawn_bybit_private_stream(
1725 creds: BybitCredentials,
1726 ws_url: String,
1727 private_tx: mpsc::Sender<BrokerEvent>,
1728 exec_client: Arc<dyn ExecutionClient>,
1729 symbols: Vec<String>,
1730 last_sync: Arc<tokio::sync::Mutex<Option<DateTime<Utc>>>>,
1731 private_connection_flag: Option<Arc<AtomicBool>>,
1732 metrics: Arc<LiveMetrics>,
1733 shutdown: ShutdownSignal,
1734) {
1735 tokio::spawn(async move {
1736 loop {
1737 match tesser_bybit::ws::connect_private(
1738 &ws_url,
1739 &creds,
1740 private_connection_flag.clone(),
1741 )
1742 .await
1743 {
1744 Ok(mut socket) => {
1745 if let Some(flag) = &private_connection_flag {
1746 flag.store(true, Ordering::SeqCst);
1747 }
1748 metrics.update_connection_status("private", true);
1749 info!("Connected to Bybit private WebSocket stream");
1750 for symbol in &symbols {
1751 match exec_client.list_open_orders(symbol).await {
1752 Ok(orders) => {
1753 for order in orders {
1754 if let Err(err) =
1755 private_tx.send(BrokerEvent::OrderUpdate(order)).await
1756 {
1757 error!("failed to send reconciled order update: {err}");
1758 }
1759 }
1760 }
1761 Err(e) => {
1762 error!("failed to reconcile open orders for {symbol}: {e}");
1763 }
1764 }
1765 }
1766 if let Some(bybit) = exec_client.as_any().downcast_ref::<BybitClient>() {
1767 let since = {
1768 let guard = last_sync.lock().await;
1769 guard.unwrap_or_else(|| Utc::now() - chrono::Duration::minutes(30))
1770 };
1771 match bybit.list_executions_since(since).await {
1772 Ok(fills) => {
1773 for fill in fills {
1774 if let Err(err) = private_tx.send(BrokerEvent::Fill(fill)).await
1775 {
1776 error!("failed to send reconciled fill: {err}");
1777 }
1778 }
1779 }
1780 Err(e) => {
1781 error!("failed to reconcile executions since {:?}: {}", since, e);
1782 }
1783 }
1784 let mut guard = last_sync.lock().await;
1785 *guard = Some(Utc::now());
1786 }
1787
1788 while let Some(msg) = socket.next().await {
1789 if shutdown.triggered() {
1790 break;
1791 }
1792 if let Ok(Message::Text(text)) = msg {
1793 if let Ok(value) = serde_json::from_str::<serde_json::Value>(&text) {
1794 if let Some(topic) = value.get("topic").and_then(|v| v.as_str()) {
1795 match topic {
1796 "order" => {
1797 if let Ok(msg) = serde_json::from_value::<
1798 PrivateMessage<BybitWsOrder>,
1799 >(
1800 value.clone()
1801 ) {
1802 for update in msg.data {
1803 if let Ok(order) = update.to_tesser_order(None)
1804 {
1805 if let Err(err) = private_tx
1806 .send(BrokerEvent::OrderUpdate(order))
1807 .await
1808 {
1809 error!(
1810 "failed to send private order update: {err}"
1811 );
1812 }
1813 }
1814 }
1815 }
1816 }
1817 "execution" => {
1818 if let Ok(msg) = serde_json::from_value::<
1819 PrivateMessage<BybitWsExecution>,
1820 >(
1821 value.clone()
1822 ) {
1823 for exec in msg.data {
1824 if let Ok(fill) = exec.to_tesser_fill() {
1825 if let Err(err) = private_tx
1826 .send(BrokerEvent::Fill(fill))
1827 .await
1828 {
1829 error!(
1830 "failed to send private fill event: {err}"
1831 );
1832 }
1833 }
1834 }
1835 }
1836 }
1837 _ => {}
1838 }
1839 }
1840 }
1841 }
1842 }
1843 }
1844 Err(e) => {
1845 if let Some(flag) = &private_connection_flag {
1846 flag.store(false, Ordering::SeqCst);
1847 }
1848 metrics.update_connection_status("private", false);
1849 error!("Bybit private WebSocket connection failed: {e}. Retrying...");
1850 tokio::time::sleep(Duration::from_secs(5)).await;
1851 }
1852 }
1853 if shutdown.triggered() {
1854 break;
1855 }
1856 }
1857 });
1858}
1859
1860#[cfg(feature = "binance")]
1861#[allow(clippy::too_many_arguments)]
1862fn spawn_binance_private_stream(
1863 exec_client: Arc<dyn ExecutionClient>,
1864 ws_url: String,
1865 private_tx: mpsc::Sender<BrokerEvent>,
1866 private_connection_flag: Option<Arc<AtomicBool>>,
1867 metrics: Arc<LiveMetrics>,
1868 shutdown: ShutdownSignal,
1869) {
1870 tokio::spawn(async move {
1871 loop {
1872 let Some(binance) = exec_client
1873 .as_ref()
1874 .as_any()
1875 .downcast_ref::<BinanceClient>()
1876 else {
1877 warn!("execution client is not Binance");
1878 return;
1879 };
1880 let listen_key = match binance.start_user_stream().await {
1881 Ok(key) => key,
1882 Err(err) => {
1883 error!("failed to start Binance user stream: {err}");
1884 tokio::time::sleep(Duration::from_secs(5)).await;
1885 continue;
1886 }
1887 };
1888 match BinanceUserDataStream::connect(&ws_url, &listen_key).await {
1889 Ok(user_stream) => {
1890 if let Some(flag) = &private_connection_flag {
1891 flag.store(true, Ordering::SeqCst);
1892 }
1893 metrics.update_connection_status("private", true);
1894 let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
1895 let tx_orders = private_tx.clone();
1896 user_stream.on_event(move |event| {
1897 if let Some(update) = extract_order_update(&event) {
1898 if let Some(order) = order_from_update(update) {
1899 let _ = tx_orders.blocking_send(BrokerEvent::OrderUpdate(order));
1900 }
1901 if let Some(fill) = fill_from_update(update) {
1902 let _ = tx_orders.blocking_send(BrokerEvent::Fill(fill));
1903 }
1904 }
1905 if matches!(event, UserDataStreamEventsResponse::ListenKeyExpired(_)) {
1906 let _ = reconnect_tx.try_send(());
1907 }
1908 });
1909 let keepalive_client = exec_client.clone();
1910 let keepalive_handle = tokio::spawn(async move {
1911 let mut interval = tokio::time::interval(Duration::from_secs(30 * 60));
1912 loop {
1913 interval.tick().await;
1914 let Some(client) = keepalive_client
1915 .as_ref()
1916 .as_any()
1917 .downcast_ref::<BinanceClient>()
1918 else {
1919 break;
1920 };
1921 if client.keepalive_user_stream().await.is_err() {
1922 break;
1923 }
1924 }
1925 });
1926 tokio::select! {
1927 _ = reconnect_rx.recv() => {
1928 warn!("binance listen key expired; reconnecting");
1929 }
1930 _ = shutdown.wait() => {
1931 keepalive_handle.abort();
1932 let _ = user_stream.unsubscribe().await;
1933 return;
1934 }
1935 }
1936 keepalive_handle.abort();
1937 let _ = user_stream.unsubscribe().await;
1938 }
1939 Err(err) => {
1940 error!("failed to connect to Binance user stream: {err}");
1941 }
1942 }
1943 if let Some(flag) = &private_connection_flag {
1944 flag.store(false, Ordering::SeqCst);
1945 }
1946 metrics.update_connection_status("private", false);
1947 if shutdown.triggered() {
1948 break;
1949 }
1950 tokio::time::sleep(Duration::from_secs(5)).await;
1951 }
1952 });
1953}