1use std::collections::{HashMap, HashSet};
2use std::net::SocketAddr;
3use std::path::PathBuf;
4use std::sync::{
5 atomic::{AtomicBool, AtomicI64, Ordering},
6 Arc, Once,
7};
8use std::time::{Duration, Instant};
9
10use anyhow::{anyhow, bail, Context, Result};
11use chrono::{DateTime, Utc};
12use clap::ValueEnum;
13use futures::StreamExt;
14use rust_decimal::{prelude::ToPrimitive, Decimal};
15use tokio::sync::{mpsc, Mutex, Notify};
16use tokio::task::JoinHandle;
17use tokio_tungstenite::tungstenite::Message;
18use tracing::{error, info, trace, warn};
19
20use serde_json::{json, Value};
21
22fn ensure_builtin_connectors_registered() {
23 static INIT: Once = Once::new();
24 INIT.call_once(|| {
25 register_connector_factory(Arc::new(PaperFactory::default()));
26 #[cfg(feature = "bybit")]
27 register_bybit_factory();
28 #[cfg(feature = "binance")]
29 register_binance_factory();
30 });
31}
32
33#[cfg(feature = "binance")]
34use tesser_binance::{
35 fill_from_update, order_from_update, register_factory as register_binance_factory,
36 ws::{extract_order_update, BinanceUserDataStream, UserDataStreamEventsResponse},
37 BinanceClient,
38};
39use tesser_broker::{
40 get_connector_factory, register_connector_factory, BrokerResult, ConnectorFactory,
41 ConnectorStream, ConnectorStreamConfig, ExecutionClient,
42};
43#[cfg(feature = "bybit")]
44use tesser_bybit::ws::{BybitWsExecution, BybitWsOrder, PrivateMessage};
45#[cfg(feature = "bybit")]
46use tesser_bybit::{register_factory as register_bybit_factory, BybitClient, BybitCredentials};
47use tesser_config::{AlertingConfig, ExchangeConfig, PersistenceEngine, RiskManagementConfig};
48use tesser_core::{
49 AccountBalance, Candle, Fill, Interval, Order, OrderBook, OrderStatus, Position, Price,
50 Quantity, Side, Signal, Symbol, Tick,
51};
52use tesser_data::recorder::{ParquetRecorder, RecorderConfig, RecorderHandle};
53use tesser_events::{
54 CandleEvent, Event, EventBus, FillEvent, OrderBookEvent, OrderUpdateEvent, SignalEvent,
55 TickEvent,
56};
57use tesser_execution::{
58 AlgoStateRepository, BasicRiskChecker, ExecutionEngine, FixedOrderSizer, OrderOrchestrator,
59 PreTradeRiskChecker, RiskContext, RiskLimits, SqliteAlgoStateRepository, StoredAlgoState,
60};
61use tesser_journal::LmdbJournal;
62use tesser_markets::MarketRegistry;
63use tesser_paper::{PaperExecutionClient, PaperFactory};
64use tesser_portfolio::{
65 LiveState, Portfolio, PortfolioConfig, SqliteStateRepository, StateRepository,
66};
67use tesser_strategy::{Strategy, StrategyContext};
68
69use crate::alerts::{AlertDispatcher, AlertManager};
70use crate::control;
71use crate::telemetry::{spawn_metrics_server, LiveMetrics};
72use crate::PublicChannel;
73
74#[derive(Debug)]
76pub enum BrokerEvent {
77 OrderUpdate(Order),
78 Fill(Fill),
79}
80
81#[derive(Clone, Copy, Debug, Eq, PartialEq, ValueEnum)]
82#[value(rename_all = "kebab-case")]
83pub enum ExecutionBackend {
84 Paper,
85 Live,
86}
87
88impl ExecutionBackend {
89 fn is_paper(self) -> bool {
90 matches!(self, Self::Paper)
91 }
92}
93
94#[derive(Clone, Copy, Debug, Eq, PartialEq, ValueEnum)]
95#[value(rename_all = "kebab-case")]
96pub enum PersistenceBackend {
97 Sqlite,
98 Lmdb,
99}
100
101impl From<PersistenceBackend> for PersistenceEngine {
102 fn from(value: PersistenceBackend) -> Self {
103 match value {
104 PersistenceBackend::Sqlite => PersistenceEngine::Sqlite,
105 PersistenceBackend::Lmdb => PersistenceEngine::Lmdb,
106 }
107 }
108}
109
110const DEFAULT_ORDER_BOOK_DEPTH: usize = 50;
111
112pub const fn default_order_book_depth() -> usize {
113 DEFAULT_ORDER_BOOK_DEPTH
114}
115const STRATEGY_LOCK_WARN_THRESHOLD: Duration = Duration::from_millis(25);
116const STRATEGY_CALL_WARN_THRESHOLD: Duration = Duration::from_millis(250);
117
118#[async_trait::async_trait]
119trait LiveMarketStream: Send {
120 async fn next_tick(&mut self) -> BrokerResult<Option<Tick>>;
121 async fn next_candle(&mut self) -> BrokerResult<Option<Candle>>;
122 async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>>;
123}
124
125struct FactoryStreamAdapter {
126 inner: Box<dyn ConnectorStream>,
127}
128
129impl FactoryStreamAdapter {
130 fn new(inner: Box<dyn ConnectorStream>) -> Self {
131 Self { inner }
132 }
133}
134
135#[async_trait::async_trait]
136impl LiveMarketStream for FactoryStreamAdapter {
137 async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
138 self.inner.next_tick().await
139 }
140
141 async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
142 self.inner.next_candle().await
143 }
144
145 async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>> {
146 self.inner.next_order_book().await
147 }
148}
149
150#[derive(Clone)]
151pub struct PersistenceSettings {
152 pub engine: PersistenceEngine,
153 pub state_path: PathBuf,
154 pub algo_path: PathBuf,
155}
156
157impl PersistenceSettings {
158 pub fn new(engine: PersistenceEngine, state_path: PathBuf) -> Self {
159 let algo_path = match engine {
160 PersistenceEngine::Sqlite => state_path.with_extension("algos.db"),
161 PersistenceEngine::Lmdb => state_path.clone(),
162 };
163 Self {
164 engine,
165 state_path,
166 algo_path,
167 }
168 }
169
170 fn algo_repo_path(&self) -> &PathBuf {
171 &self.algo_path
172 }
173}
174
175struct PersistenceHandles {
176 state: Arc<dyn StateRepository<Snapshot = LiveState>>,
177 algo: Arc<dyn AlgoStateRepository<State = StoredAlgoState>>,
178}
179
180pub struct LiveSessionSettings {
181 pub category: PublicChannel,
182 pub interval: Interval,
183 pub quantity: Quantity,
184 pub slippage_bps: Decimal,
185 pub fee_bps: Decimal,
186 pub history: usize,
187 pub metrics_addr: SocketAddr,
188 pub persistence: PersistenceSettings,
189 pub initial_balances: HashMap<Symbol, Decimal>,
190 pub reporting_currency: Symbol,
191 pub markets_file: Option<PathBuf>,
192 pub alerting: AlertingConfig,
193 pub exec_backend: ExecutionBackend,
194 pub risk: RiskManagementConfig,
195 pub reconciliation_interval: Duration,
196 pub reconciliation_threshold: Decimal,
197 pub driver: String,
198 pub orderbook_depth: usize,
199 pub record_path: Option<PathBuf>,
200 pub control_addr: SocketAddr,
201}
202
203impl LiveSessionSettings {
204 fn risk_limits(&self) -> RiskLimits {
205 RiskLimits {
206 max_order_quantity: self.risk.max_order_quantity.max(Decimal::ZERO),
207 max_position_quantity: self.risk.max_position_quantity.max(Decimal::ZERO),
208 }
209 }
210}
211
212fn build_persistence_handles(settings: &LiveSessionSettings) -> Result<PersistenceHandles> {
213 match settings.persistence.engine {
214 PersistenceEngine::Sqlite => {
215 let state_repo: Arc<dyn StateRepository<Snapshot = LiveState>> = Arc::new(
216 SqliteStateRepository::new(settings.persistence.state_path.clone()),
217 );
218 let algo_repo: Arc<dyn AlgoStateRepository<State = StoredAlgoState>> = Arc::new(
219 SqliteAlgoStateRepository::new(settings.persistence.algo_repo_path())?,
220 );
221 Ok(PersistenceHandles {
222 state: state_repo,
223 algo: algo_repo,
224 })
225 }
226 PersistenceEngine::Lmdb => {
227 let journal = Arc::new(LmdbJournal::open(&settings.persistence.state_path)?);
228 let state_repo: Arc<dyn StateRepository<Snapshot = LiveState>> =
229 Arc::new(journal.state_repo());
230 let algo_repo: Arc<dyn AlgoStateRepository<State = StoredAlgoState>> =
231 Arc::new(journal.algo_repo());
232 Ok(PersistenceHandles {
233 state: state_repo,
234 algo: algo_repo,
235 })
236 }
237 }
238}
239
240pub async fn run_live(
241 strategy: Box<dyn Strategy>,
242 symbols: Vec<String>,
243 exchange: ExchangeConfig,
244 settings: LiveSessionSettings,
245) -> Result<()> {
246 run_live_with_shutdown(strategy, symbols, exchange, settings, ShutdownSignal::new()).await
247}
248
249pub async fn run_live_with_shutdown(
251 strategy: Box<dyn Strategy>,
252 symbols: Vec<String>,
253 exchange: ExchangeConfig,
254 settings: LiveSessionSettings,
255 shutdown: ShutdownSignal,
256) -> Result<()> {
257 if symbols.is_empty() {
258 return Err(anyhow!("strategy did not declare any subscriptions"));
259 }
260 if settings.quantity <= Decimal::ZERO {
261 return Err(anyhow!("--quantity must be positive"));
262 }
263
264 let public_connection = Arc::new(AtomicBool::new(false));
265 let private_connection = if matches!(settings.exec_backend, ExecutionBackend::Live) {
266 Some(Arc::new(AtomicBool::new(false)))
267 } else {
268 None
269 };
270 ensure_builtin_connectors_registered();
271 let connector_payload = build_exchange_payload(&exchange, &settings);
272 let connector_factory = get_connector_factory(&settings.driver)
273 .ok_or_else(|| anyhow!("driver {} is not registered", settings.driver))?;
274 let stream_config = ConnectorStreamConfig {
275 ws_url: Some(exchange.ws_url.clone()),
276 metadata: json!({
277 "category": settings.category.as_path(),
278 "symbols": symbols.clone(),
279 "orderbook_depth": settings.orderbook_depth,
280 }),
281 connection_status: Some(public_connection.clone()),
282 };
283 let mut connector_stream = connector_factory
284 .create_market_stream(&connector_payload, stream_config)
285 .await
286 .map_err(|err| anyhow!("failed to create market stream: {err}"))?;
287 connector_stream
288 .subscribe(&symbols, settings.interval)
289 .await
290 .map_err(|err| anyhow!("failed to subscribe via connector: {err}"))?;
291 let market_stream: Box<dyn LiveMarketStream> =
292 Box::new(FactoryStreamAdapter::new(connector_stream));
293
294 let execution_client =
295 build_execution_client(&settings, connector_factory.clone(), &connector_payload).await?;
296 let market_registry = load_market_registry(execution_client.clone(), &settings).await?;
297 if matches!(settings.exec_backend, ExecutionBackend::Live) {
298 info!(
299 rest = %exchange.rest_url,
300 driver = ?settings.driver,
301 "live execution enabled via {:?} REST",
302 settings.driver
303 );
304 }
305 let risk_checker: Arc<dyn PreTradeRiskChecker> =
306 Arc::new(BasicRiskChecker::new(settings.risk_limits()));
307 let execution = ExecutionEngine::new(
308 execution_client.clone(),
309 Box::new(FixedOrderSizer {
310 quantity: settings.quantity,
311 }),
312 risk_checker,
313 );
314
315 let mut bootstrap = None;
316 if matches!(settings.exec_backend, ExecutionBackend::Live) {
317 info!("synchronizing portfolio snapshot from exchange");
318 let positions = execution_client
319 .positions()
320 .await
321 .context("failed to fetch remote positions")?;
322 let balances = execution_client
323 .account_balances()
324 .await
325 .context("failed to fetch remote account balances")?;
326 let mut open_orders = Vec::new();
327 for symbol in &symbols {
328 let mut symbol_orders = execution_client
329 .list_open_orders(symbol)
330 .await
331 .with_context(|| format!("failed to fetch open orders for {symbol}"))?;
332 open_orders.append(&mut symbol_orders);
333 }
334 bootstrap = Some(LiveBootstrap {
335 positions,
336 balances,
337 open_orders,
338 });
339 }
340
341 let persistence = build_persistence_handles(&settings)?;
342
343 let initial_open_orders = bootstrap
345 .as_ref()
346 .map(|data| data.open_orders.clone())
347 .unwrap_or_default();
348 let orchestrator = OrderOrchestrator::new(
349 Arc::new(execution),
350 persistence.algo.clone(),
351 initial_open_orders,
352 )
353 .await?;
354
355 let runtime = LiveRuntime::new(
356 market_stream,
357 strategy,
358 symbols,
359 orchestrator,
360 persistence.state,
361 settings,
362 exchange.ws_url.clone(),
363 market_registry,
364 shutdown,
365 public_connection,
366 private_connection,
367 bootstrap,
368 )
369 .await?;
370 runtime.run().await
371}
372
373async fn build_execution_client(
374 settings: &LiveSessionSettings,
375 connector_factory: Arc<dyn ConnectorFactory>,
376 connector_payload: &Value,
377) -> Result<Arc<dyn ExecutionClient>> {
378 match settings.exec_backend {
379 ExecutionBackend::Paper => {
380 if settings.driver == "paper" {
381 return connector_factory
382 .create_execution_client(connector_payload)
383 .await
384 .map_err(|err| anyhow!("failed to create execution client: {err}"));
385 }
386 Ok(Arc::new(PaperExecutionClient::new(
387 "paper".to_string(),
388 vec!["BTCUSDT".to_string()],
389 settings.slippage_bps,
390 settings.fee_bps,
391 )))
392 }
393 ExecutionBackend::Live => connector_factory
394 .create_execution_client(connector_payload)
395 .await
396 .map_err(|err| anyhow!("failed to create execution client: {err}")),
397 }
398}
399
400struct LiveRuntime {
401 market: Box<dyn LiveMarketStream>,
402 orchestrator: Arc<OrderOrchestrator>,
403 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
404 persisted: Arc<Mutex<LiveState>>,
405 event_bus: Arc<EventBus>,
406 recorder: Option<ParquetRecorder>,
407 control_task: Option<JoinHandle<()>>,
408 shutdown: ShutdownSignal,
409 metrics_task: JoinHandle<()>,
410 alert_task: Option<JoinHandle<()>>,
411 reconciliation_task: Option<JoinHandle<()>>,
412 reconciliation_ctx: Option<Arc<ReconciliationContext>>,
413 private_event_rx: mpsc::Receiver<BrokerEvent>,
414 #[allow(dead_code)]
415 last_private_sync: Arc<tokio::sync::Mutex<Option<DateTime<Utc>>>>,
416 subscriber_handles: Vec<JoinHandle<()>>,
417 connection_monitors: Vec<JoinHandle<()>>,
418 order_timeout_task: JoinHandle<()>,
419 strategy: Arc<Mutex<Box<dyn Strategy>>>,
420 _public_connection: Arc<AtomicBool>,
421 _private_connection: Option<Arc<AtomicBool>>,
422}
423
424struct LiveBootstrap {
425 positions: Vec<Position>,
426 balances: Vec<AccountBalance>,
427 open_orders: Vec<Order>,
428}
429
430impl LiveRuntime {
431 #[allow(clippy::too_many_arguments)]
432 async fn new(
433 market: Box<dyn LiveMarketStream>,
434 mut strategy: Box<dyn Strategy>,
435 symbols: Vec<String>,
436 orchestrator: OrderOrchestrator,
437 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
438 settings: LiveSessionSettings,
439 #[cfg_attr(not(feature = "binance"), allow(unused_variables))] exchange_ws_url: String,
440 market_registry: Arc<MarketRegistry>,
441 shutdown: ShutdownSignal,
442 public_connection: Arc<AtomicBool>,
443 private_connection: Option<Arc<AtomicBool>>,
444 bootstrap: Option<LiveBootstrap>,
445 ) -> Result<Self> {
446 let mut strategy_ctx = StrategyContext::new(settings.history);
447 let driver = Arc::new(settings.driver.clone());
448 let mut persisted = match tokio::task::spawn_blocking({
449 let repo = state_repo.clone();
450 move || repo.load()
451 })
452 .await
453 {
454 Ok(Ok(state)) => state,
455 Ok(Err(err)) => {
456 warn!(error = %err, "failed to load live state; starting from defaults");
457 LiveState::default()
458 }
459 Err(err) => {
460 warn!(error = %err, "state load task failed; starting from defaults");
461 LiveState::default()
462 }
463 };
464 let mut live_bootstrap = None;
465 if let Some(data) = bootstrap {
466 persisted.open_orders = data.open_orders;
467 live_bootstrap = Some((data.positions, data.balances));
468 } else if matches!(settings.exec_backend, ExecutionBackend::Live) {
469 warn!("live session missing bootstrap data; continuing without remote snapshot");
470 }
471
472 let portfolio_cfg = PortfolioConfig {
473 initial_balances: settings.initial_balances.clone(),
474 reporting_currency: settings.reporting_currency.clone(),
475 max_drawdown: Some(settings.risk.max_drawdown),
476 };
477 let portfolio = if let Some((positions, balances)) = live_bootstrap {
478 Portfolio::from_exchange_state(
479 positions,
480 balances,
481 portfolio_cfg.clone(),
482 market_registry.clone(),
483 )
484 } else if let Some(snapshot) = persisted.portfolio.take() {
485 Portfolio::from_state(snapshot, portfolio_cfg.clone(), market_registry.clone())
486 } else {
487 Portfolio::new(portfolio_cfg.clone(), market_registry.clone())
488 };
489 strategy_ctx.update_positions(portfolio.positions());
490 if let Some(state) = persisted.strategy_state.take() {
492 info!("restoring strategy state from persistence");
493 strategy
494 .restore(state)
495 .context("failed to restore strategy state")?;
496 }
497 persisted.portfolio = Some(portfolio.snapshot());
498
499 let mut market_snapshots = HashMap::new();
500 for symbol in &symbols {
501 let mut snapshot = MarketSnapshot::default();
502 if let Some(price) = persisted.last_prices.get(symbol).copied() {
503 snapshot.last_trade = Some(price);
504 }
505 market_snapshots.insert(symbol.clone(), snapshot);
506 }
507
508 let metrics = LiveMetrics::new();
509 metrics.update_connection_status("public", public_connection.load(Ordering::SeqCst));
510 if let Some(flag) = &private_connection {
511 metrics.update_connection_status("private", flag.load(Ordering::SeqCst));
512 }
513 let metrics_task = spawn_metrics_server(metrics.registry(), settings.metrics_addr);
514 let dispatcher = AlertDispatcher::new(settings.alerting.webhook_url.clone());
515 let alerts = AlertManager::new(
516 settings.alerting,
517 dispatcher,
518 Some(public_connection.clone()),
519 private_connection.clone(),
520 );
521 let (private_event_tx, private_event_rx) = mpsc::channel(1024);
522 let last_private_sync = Arc::new(tokio::sync::Mutex::new(persisted.last_candle_ts));
523 let alerts = Arc::new(alerts);
524 let alert_task = alerts.spawn_watchdog();
525 let metrics = Arc::new(metrics);
526 let mut connection_monitors = Vec::new();
527 connection_monitors.push(spawn_connection_monitor(
528 shutdown.clone(),
529 public_connection.clone(),
530 metrics.clone(),
531 "public",
532 ));
533 if let Some(flag) = private_connection.clone() {
534 connection_monitors.push(spawn_connection_monitor(
535 shutdown.clone(),
536 flag,
537 metrics.clone(),
538 "private",
539 ));
540 }
541
542 if !settings.exec_backend.is_paper() {
543 let execution_engine = orchestrator.execution_engine();
544 let exec_client = execution_engine.client();
545 match settings.driver.as_str() {
546 "bybit" | "" => {
547 #[cfg(feature = "bybit")]
548 {
549 let bybit = exec_client
550 .as_ref()
551 .as_any()
552 .downcast_ref::<BybitClient>()
553 .ok_or_else(|| anyhow!("execution client is not Bybit"))?;
554 let creds = bybit
555 .get_credentials()
556 .ok_or_else(|| anyhow!("live execution requires Bybit credentials"))?;
557 spawn_bybit_private_stream(
558 creds,
559 bybit.get_ws_url(),
560 private_event_tx.clone(),
561 exec_client.clone(),
562 symbols.clone(),
563 last_private_sync.clone(),
564 private_connection.clone(),
565 metrics.clone(),
566 shutdown.clone(),
567 );
568 }
569 #[cfg(not(feature = "bybit"))]
570 {
571 bail!("driver 'bybit' is unavailable without the 'bybit' feature");
572 }
573 }
574 "binance" => {
575 #[cfg(feature = "binance")]
576 {
577 spawn_binance_private_stream(
578 exec_client.clone(),
579 exchange_ws_url.clone(),
580 private_event_tx.clone(),
581 private_connection.clone(),
582 metrics.clone(),
583 shutdown.clone(),
584 );
585 }
586 #[cfg(not(feature = "binance"))]
587 {
588 bail!("driver 'binance' is unavailable without the 'binance' feature");
589 }
590 }
591 "paper" => {}
592 other => {
593 bail!("private stream unsupported for driver '{other}'");
594 }
595 }
596 }
597
598 let recorder = if let Some(record_path) = settings.record_path.clone() {
599 let config = RecorderConfig {
600 root: record_path.clone(),
601 ..RecorderConfig::default()
602 };
603 match ParquetRecorder::spawn(config).await {
604 Ok(recorder) => {
605 info!(path = %record_path.display(), "flight recorder enabled");
606 Some(recorder)
607 }
608 Err(err) => {
609 warn!(
610 error = %err,
611 path = %record_path.display(),
612 "failed to start flight recorder"
613 );
614 None
615 }
616 }
617 } else {
618 None
619 };
620 let recorder_handle = recorder.as_ref().map(|rec| rec.handle());
621
622 let strategy = Arc::new(Mutex::new(strategy));
623 let strategy_ctx = Arc::new(Mutex::new(strategy_ctx));
624 let portfolio = Arc::new(Mutex::new(portfolio));
625 let market_cache = Arc::new(Mutex::new(market_snapshots));
626 let persisted = Arc::new(Mutex::new(persisted));
627 let orchestrator = Arc::new(orchestrator);
628 let event_bus = Arc::new(EventBus::new(2048));
629 let last_data_timestamp = Arc::new(AtomicI64::new(0));
630 let control_task = control::spawn_control_plane(
631 settings.control_addr,
632 portfolio.clone(),
633 orchestrator.clone(),
634 persisted.clone(),
635 last_data_timestamp.clone(),
636 shutdown.clone(),
637 );
638 let reconciliation_ctx = (!settings.exec_backend.is_paper()).then(|| {
639 Arc::new(ReconciliationContext::new(ReconciliationContextConfig {
640 client: orchestrator.execution_engine().client(),
641 portfolio: portfolio.clone(),
642 persisted: persisted.clone(),
643 state_repo: state_repo.clone(),
644 alerts: alerts.clone(),
645 metrics: metrics.clone(),
646 reporting_currency: settings.reporting_currency.clone(),
647 threshold: settings.reconciliation_threshold,
648 }))
649 });
650 let reconciliation_task = reconciliation_ctx.as_ref().map(|ctx| {
651 spawn_reconciliation_loop(
652 ctx.clone(),
653 shutdown.clone(),
654 settings.reconciliation_interval,
655 )
656 });
657 let subscriber_handles = spawn_event_subscribers(
658 event_bus.clone(),
659 strategy.clone(),
660 strategy_ctx.clone(),
661 orchestrator.clone(),
662 portfolio.clone(),
663 metrics.clone(),
664 alerts.clone(),
665 market_cache.clone(),
666 state_repo.clone(),
667 persisted.clone(),
668 settings.exec_backend,
669 recorder_handle.clone(),
670 last_data_timestamp.clone(),
671 driver.clone(),
672 );
673 let order_timeout_task = spawn_order_timeout_monitor(
674 orchestrator.clone(),
675 event_bus.clone(),
676 alerts.clone(),
677 shutdown.clone(),
678 );
679
680 info!(
681 symbols = ?symbols,
682 category = ?settings.category,
683 metrics_addr = %settings.metrics_addr,
684 state_path = %settings.persistence.state_path.display(),
685 persistence_engine = ?settings.persistence.engine,
686 history = settings.history,
687 "market stream ready"
688 );
689
690 for symbol in &symbols {
691 let ctx = shared_risk_context(symbol, &portfolio, &market_cache, &persisted).await;
692 orchestrator.update_risk_context(symbol.clone(), ctx);
693 }
694
695 Ok(Self {
696 market,
697 orchestrator,
698 state_repo,
699 persisted,
700 event_bus,
701 recorder,
702 control_task: Some(control_task),
703 shutdown,
704 metrics_task,
705 alert_task,
706 reconciliation_task,
707 reconciliation_ctx,
708 private_event_rx,
709 last_private_sync,
710 subscriber_handles,
711 connection_monitors,
712 order_timeout_task,
713 strategy,
714 _public_connection: public_connection,
715 _private_connection: private_connection,
716 })
717 }
718
719 async fn run(mut self) -> Result<()> {
720 info!("live session started");
721 if let Some(ctx) = self.reconciliation_ctx.as_ref() {
722 perform_state_reconciliation(ctx.as_ref())
723 .await
724 .context("initial state reconciliation failed")?;
725 }
726 let backoff = Duration::from_millis(200);
727 let mut orchestrator_timer = tokio::time::interval(Duration::from_secs(1));
728
729 while !self.shutdown.triggered() {
730 let mut progressed = false;
731
732 if let Some(tick) = self.market.next_tick().await? {
733 progressed = true;
734 self.event_bus.publish(Event::Tick(TickEvent { tick }));
735 }
736
737 if let Some(candle) = self.market.next_candle().await? {
738 progressed = true;
739 self.event_bus
740 .publish(Event::Candle(CandleEvent { candle }));
741 }
742
743 if let Some(book) = self.market.next_order_book().await? {
744 progressed = true;
745 self.event_bus
746 .publish(Event::OrderBook(OrderBookEvent { order_book: book }));
747 }
748
749 tokio::select! {
750 biased;
751 Some(event) = self.private_event_rx.recv() => {
752 progressed = true;
753 match event {
754 BrokerEvent::OrderUpdate(order) => {
755 info!(
756 order_id = %order.id,
757 status = ?order.status,
758 symbol = %order.request.symbol,
759 "received private order update"
760 );
761 self.event_bus
762 .publish(Event::OrderUpdate(OrderUpdateEvent { order }));
763 }
764 BrokerEvent::Fill(fill) => {
765 info!(
766 order_id = %fill.order_id,
767 symbol = %fill.symbol,
768 qty = %fill.fill_quantity,
769 price = %fill.fill_price,
770 "received private fill"
771 );
772 self.event_bus.publish(Event::Fill(FillEvent { fill }));
773 }
774 }
775 }
776 _ = orchestrator_timer.tick() => {
777 if let Err(e) = self.orchestrator.on_timer_tick().await {
779 error!("Orchestrator timer tick failed: {}", e);
780 }
781 }
782 else => {}
783 }
784
785 if !progressed && !self.shutdown.sleep(backoff).await {
786 break;
787 }
788 }
789 info!("live session stopping");
790 self.metrics_task.abort();
791 if let Some(handle) = self.alert_task.take() {
792 handle.abort();
793 }
794 if let Some(handle) = self.reconciliation_task.take() {
795 handle.abort();
796 }
797 self.order_timeout_task.abort();
798 for handle in self.subscriber_handles.drain(..) {
799 handle.abort();
800 }
801 for handle in self.connection_monitors.drain(..) {
802 handle.abort();
803 }
804 if let Err(err) = persist_state(
805 self.state_repo.clone(),
806 self.persisted.clone(),
807 Some(self.strategy.clone()),
808 )
809 .await
810 {
811 warn!(error = %err, "failed to persist shutdown state");
812 }
813 if let Some(task) = self.control_task.take() {
814 if let Err(err) = task.await {
815 warn!(error = %err, "control plane server task aborted");
816 }
817 }
818 if let Some(recorder) = self.recorder.take() {
819 if let Err(err) = recorder.shutdown().await {
820 warn!(error = %err, "failed to flush flight recorder");
821 }
822 }
823 Ok(())
824 }
825}
826
827struct ReconciliationContext {
828 client: Arc<dyn ExecutionClient>,
829 portfolio: Arc<Mutex<Portfolio>>,
830 persisted: Arc<Mutex<LiveState>>,
831 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
832 alerts: Arc<AlertManager>,
833 metrics: Arc<LiveMetrics>,
834 reporting_currency: Symbol,
835 threshold: Decimal,
836}
837
838struct ReconciliationContextConfig {
839 client: Arc<dyn ExecutionClient>,
840 portfolio: Arc<Mutex<Portfolio>>,
841 persisted: Arc<Mutex<LiveState>>,
842 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
843 alerts: Arc<AlertManager>,
844 metrics: Arc<LiveMetrics>,
845 reporting_currency: Symbol,
846 threshold: Decimal,
847}
848
849impl ReconciliationContext {
850 fn new(config: ReconciliationContextConfig) -> Self {
851 let ReconciliationContextConfig {
852 client,
853 portfolio,
854 persisted,
855 state_repo,
856 alerts,
857 metrics,
858 reporting_currency,
859 threshold,
860 } = config;
861 let min_threshold = Decimal::new(1, 6); let threshold = if threshold <= Decimal::ZERO {
863 min_threshold
864 } else {
865 threshold
866 };
867 Self {
868 client,
869 portfolio,
870 persisted,
871 state_repo,
872 alerts,
873 metrics,
874 reporting_currency,
875 threshold,
876 }
877 }
878}
879
880fn spawn_reconciliation_loop(
881 ctx: Arc<ReconciliationContext>,
882 shutdown: ShutdownSignal,
883 interval: Duration,
884) -> JoinHandle<()> {
885 tokio::spawn(async move {
886 while shutdown.sleep(interval).await {
887 if let Err(err) = perform_state_reconciliation(ctx.as_ref()).await {
888 error!(error = %err, "periodic state reconciliation failed");
889 }
890 }
891 })
892}
893
894async fn perform_state_reconciliation(ctx: &ReconciliationContext) -> Result<()> {
895 info!("running state reconciliation");
896 let remote_positions = ctx
897 .client
898 .positions()
899 .await
900 .context("failed to fetch remote positions")?;
901 let remote_balances = ctx
902 .client
903 .account_balances()
904 .await
905 .context("failed to fetch remote balances")?;
906 let (local_positions, local_cash) = {
907 let guard = ctx.portfolio.lock().await;
908 (guard.positions(), guard.cash())
909 };
910
911 let remote_map = positions_to_map(remote_positions);
912 let local_map = positions_to_map(local_positions);
913 let mut tracked_symbols: HashSet<String> = HashSet::new();
914 tracked_symbols.extend(remote_map.keys().cloned());
915 tracked_symbols.extend(local_map.keys().cloned());
916
917 let mut severe_findings = Vec::new();
918 for symbol in tracked_symbols {
919 let local_qty = local_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
920 let remote_qty = remote_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
921 let diff = (local_qty - remote_qty).abs();
922 let diff_value = diff.to_f64().unwrap_or(0.0);
923 ctx.metrics.update_position_diff(&symbol, diff_value);
924 if diff > Decimal::ZERO {
925 warn!(
926 symbol = %symbol,
927 local = %local_qty,
928 remote = %remote_qty,
929 diff = %diff,
930 "position mismatch detected during reconciliation"
931 );
932 let pct = normalize_diff(diff, remote_qty);
933 if pct >= ctx.threshold {
934 error!(
935 symbol = %symbol,
936 local = %local_qty,
937 remote = %remote_qty,
938 diff = %diff,
939 pct = %pct,
940 "position mismatch exceeds threshold"
941 );
942 severe_findings.push(format!(
943 "{symbol} local={local_qty} remote={remote_qty} diff={diff}"
944 ));
945 }
946 }
947 }
948
949 let reporting = ctx.reporting_currency.as_str();
950 let remote_cash = remote_balances
951 .iter()
952 .find(|balance| balance.currency == reporting)
953 .map(|balance| balance.available)
954 .unwrap_or_else(|| Decimal::ZERO);
955 let cash_diff = (remote_cash - local_cash).abs();
956 ctx.metrics
957 .update_balance_diff(reporting, cash_diff.to_f64().unwrap_or(0.0));
958 if cash_diff > Decimal::ZERO {
959 warn!(
960 currency = %reporting,
961 local = %local_cash,
962 remote = %remote_cash,
963 diff = %cash_diff,
964 "balance mismatch detected during reconciliation"
965 );
966 let pct = normalize_diff(cash_diff, remote_cash);
967 if pct >= ctx.threshold {
968 error!(
969 currency = %reporting,
970 local = %local_cash,
971 remote = %remote_cash,
972 diff = %cash_diff,
973 pct = %pct,
974 "balance mismatch exceeds threshold"
975 );
976 severe_findings.push(format!(
977 "{reporting} balance local={local_cash} remote={remote_cash} diff={cash_diff}"
978 ));
979 }
980 }
981
982 if severe_findings.is_empty() {
983 info!("state reconciliation complete with no critical divergence");
984 return Ok(());
985 }
986
987 let alert_body = severe_findings.join("; ");
988 ctx.alerts
989 .notify("State reconciliation divergence", &alert_body)
990 .await;
991 enforce_liquidate_only(ctx).await;
992 Ok(())
993}
994
995async fn enforce_liquidate_only(ctx: &ReconciliationContext) {
996 let snapshot = {
997 let mut guard = ctx.portfolio.lock().await;
998 if !guard.set_liquidate_only(true) {
999 return;
1000 }
1001 info!("entering liquidate-only mode due to reconciliation divergence");
1002 guard.snapshot()
1003 };
1004 {
1005 let mut state = ctx.persisted.lock().await;
1006 state.portfolio = Some(snapshot);
1007 }
1008 if let Err(err) = persist_state(ctx.state_repo.clone(), ctx.persisted.clone(), None).await {
1009 warn!(error = %err, "failed to persist liquidate-only transition");
1010 }
1011}
1012
1013fn positions_to_map(positions: Vec<Position>) -> HashMap<String, Decimal> {
1014 let mut map = HashMap::new();
1015 for position in positions {
1016 map.insert(position.symbol.clone(), position_signed_qty(&position));
1017 }
1018 map
1019}
1020
1021fn position_signed_qty(position: &Position) -> Decimal {
1022 match position.side {
1023 Some(Side::Buy) => position.quantity,
1024 Some(Side::Sell) => -position.quantity,
1025 None => Decimal::ZERO,
1026 }
1027}
1028
1029fn normalize_diff(diff: Decimal, reference: Decimal) -> Decimal {
1030 if diff <= Decimal::ZERO {
1031 Decimal::ZERO
1032 } else {
1033 let denominator = std::cmp::max(reference.abs(), Decimal::ONE);
1034 diff / denominator
1035 }
1036}
1037
1038fn build_exchange_payload(exchange: &ExchangeConfig, settings: &LiveSessionSettings) -> Value {
1039 let mut payload = serde_json::Map::new();
1040 payload.insert("rest_url".into(), Value::String(exchange.rest_url.clone()));
1041 payload.insert("ws_url".into(), Value::String(exchange.ws_url.clone()));
1042 payload.insert("api_key".into(), Value::String(exchange.api_key.clone()));
1043 payload.insert(
1044 "api_secret".into(),
1045 Value::String(exchange.api_secret.clone()),
1046 );
1047 payload.insert(
1048 "category".into(),
1049 Value::String(settings.category.as_path().to_string()),
1050 );
1051 payload.insert(
1052 "orderbook_depth".into(),
1053 Value::Number(serde_json::Number::from(settings.orderbook_depth as u64)),
1054 );
1055 if let Value::Object(extra) = exchange.params.clone() {
1056 for (key, value) in extra {
1057 payload.insert(key, value);
1058 }
1059 }
1060 Value::Object(payload)
1061}
1062
1063#[derive(Default)]
1064struct MarketSnapshot {
1065 last_trade: Option<Price>,
1066 last_trade_ts: Option<DateTime<Utc>>,
1067 last_candle: Option<Candle>,
1068}
1069
1070impl MarketSnapshot {
1071 fn price(&self) -> Option<Price> {
1072 self.last_trade
1073 .or_else(|| self.last_candle.as_ref().map(|c| c.close))
1074 }
1075}
1076
1077pub struct ShutdownSignal {
1078 flag: Arc<AtomicBool>,
1079 notify: Arc<Notify>,
1080}
1081
1082impl ShutdownSignal {
1083 pub fn new() -> Self {
1084 let flag = Arc::new(AtomicBool::new(false));
1085 let notify = Arc::new(Notify::new());
1086 let flag_clone = flag.clone();
1087 let notify_clone = notify.clone();
1088 tokio::spawn(async move {
1089 if tokio::signal::ctrl_c().await.is_ok() {
1090 flag_clone.store(true, Ordering::SeqCst);
1091 notify_clone.notify_waiters();
1092 }
1093 });
1094 Self { flag, notify }
1095 }
1096
1097 pub fn trigger(&self) {
1098 self.flag.store(true, Ordering::SeqCst);
1099 self.notify.notify_waiters();
1100 }
1101
1102 pub fn triggered(&self) -> bool {
1103 self.flag.load(Ordering::SeqCst)
1104 }
1105
1106 pub async fn wait(&self) {
1107 if self.triggered() {
1108 return;
1109 }
1110 self.notify.notified().await;
1111 }
1112
1113 async fn sleep(&self, duration: Duration) -> bool {
1114 tokio::select! {
1115 _ = tokio::time::sleep(duration) => true,
1116 _ = self.notify.notified() => false,
1117 }
1118 }
1119}
1120
1121impl Default for ShutdownSignal {
1122 fn default() -> Self {
1123 Self::new()
1124 }
1125}
1126
1127impl Clone for ShutdownSignal {
1128 fn clone(&self) -> Self {
1129 Self {
1130 flag: self.flag.clone(),
1131 notify: self.notify.clone(),
1132 }
1133 }
1134}
1135
1136#[allow(clippy::too_many_arguments)]
1137fn spawn_event_subscribers(
1138 bus: Arc<EventBus>,
1139 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1140 strategy_ctx: Arc<Mutex<StrategyContext>>,
1141 orchestrator: Arc<OrderOrchestrator>,
1142 portfolio: Arc<Mutex<Portfolio>>,
1143 metrics: Arc<LiveMetrics>,
1144 alerts: Arc<AlertManager>,
1145 market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1146 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1147 persisted: Arc<Mutex<LiveState>>,
1148 exec_backend: ExecutionBackend,
1149 recorder: Option<RecorderHandle>,
1150 last_data_timestamp: Arc<AtomicI64>,
1151 driver: Arc<String>,
1152) -> Vec<JoinHandle<()>> {
1153 let mut handles = Vec::new();
1154 let market_recorder = recorder.clone();
1155
1156 let market_bus = bus.clone();
1157 let market_strategy = strategy.clone();
1158 let market_ctx = strategy_ctx.clone();
1159 let market_metrics = metrics.clone();
1160 let market_alerts = alerts.clone();
1161 let market_state = state_repo.clone();
1162 let market_persisted = persisted.clone();
1163 let market_portfolio = portfolio.clone();
1164 let market_snapshot = market.clone();
1165 let orchestrator_clone = orchestrator.clone();
1166 let market_data_tracker = last_data_timestamp.clone();
1167 let driver_clone = driver.clone();
1168 handles.push(tokio::spawn(async move {
1169 let recorder = market_recorder;
1170 let mut stream = market_bus.subscribe();
1171 loop {
1172 match stream.recv().await {
1173 Ok(Event::Tick(evt)) => {
1174 if let Some(handle) = recorder.as_ref() {
1175 handle.record_tick(evt.tick.clone());
1176 }
1177 if let Err(err) = process_tick_event(
1178 evt.tick,
1179 market_strategy.clone(),
1180 market_ctx.clone(),
1181 market_metrics.clone(),
1182 market_alerts.clone(),
1183 market_snapshot.clone(),
1184 market_portfolio.clone(),
1185 market_state.clone(),
1186 market_persisted.clone(),
1187 market_bus.clone(),
1188 market_data_tracker.clone(),
1189 )
1190 .await
1191 {
1192 warn!(error = %err, "tick handler failed");
1193 }
1194 }
1195 Ok(Event::Candle(evt)) => {
1196 if let Some(handle) = recorder.as_ref() {
1197 handle.record_candle(evt.candle.clone());
1198 }
1199 if let Err(err) = process_candle_event(
1200 evt.candle,
1201 market_strategy.clone(),
1202 market_ctx.clone(),
1203 market_metrics.clone(),
1204 market_alerts.clone(),
1205 market_snapshot.clone(),
1206 market_portfolio.clone(),
1207 orchestrator_clone.clone(),
1208 exec_backend,
1209 market_state.clone(),
1210 market_persisted.clone(),
1211 market_bus.clone(),
1212 market_data_tracker.clone(),
1213 )
1214 .await
1215 {
1216 warn!(error = %err, "candle handler failed");
1217 }
1218 }
1219 Ok(Event::OrderBook(evt)) => {
1220 if let Err(err) = process_order_book_event(
1221 evt.order_book,
1222 market_strategy.clone(),
1223 market_ctx.clone(),
1224 market_metrics.clone(),
1225 market_alerts.clone(),
1226 market_snapshot.clone(),
1227 market_bus.clone(),
1228 market_data_tracker.clone(),
1229 driver_clone.clone(),
1230 )
1231 .await
1232 {
1233 warn!(error = %err, "order book handler failed");
1234 }
1235 }
1236 Ok(_) => {}
1237 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1238 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1239 warn!(lag = lag, "market subscriber lagged");
1240 continue;
1241 }
1242 }
1243 }
1244 }));
1245
1246 let exec_bus = bus.clone();
1247 let exec_portfolio = portfolio.clone();
1248 let exec_market = market.clone();
1249 let exec_persisted = persisted.clone();
1250 let exec_alerts = alerts.clone();
1251 let exec_metrics = metrics.clone();
1252 let exec_orchestrator = orchestrator.clone();
1253 handles.push(tokio::spawn(async move {
1254 let orchestrator = exec_orchestrator.clone();
1255 let mut stream = exec_bus.subscribe();
1256 loop {
1257 match stream.recv().await {
1258 Ok(Event::Signal(evt)) => {
1259 if let Err(err) = process_signal_event(
1260 evt.signal,
1261 orchestrator.clone(),
1262 exec_portfolio.clone(),
1263 exec_market.clone(),
1264 exec_persisted.clone(),
1265 exec_alerts.clone(),
1266 exec_metrics.clone(),
1267 )
1268 .await
1269 {
1270 warn!(error = %err, "signal handler failed");
1271 }
1272 }
1273 Ok(_) => {}
1274 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1275 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1276 warn!(lag = lag, "signal subscriber lagged");
1277 continue;
1278 }
1279 }
1280 }
1281 }));
1282
1283 let fill_bus = bus.clone();
1284 let fill_state = state_repo.clone();
1285 let fill_orchestrator = orchestrator.clone();
1286 let fill_persisted = persisted.clone();
1287 let fill_alerts = alerts.clone();
1288 let fill_recorder = recorder.clone();
1289 handles.push(tokio::spawn(async move {
1290 let orchestrator = fill_orchestrator.clone();
1291 let persisted = fill_persisted.clone();
1292 let recorder = fill_recorder;
1293 let mut stream = fill_bus.subscribe();
1294 loop {
1295 match stream.recv().await {
1296 Ok(Event::Fill(evt)) => {
1297 if let Some(handle) = recorder.as_ref() {
1298 handle.record_fill(evt.fill.clone());
1299 }
1300 if let Err(err) = process_fill_event(
1301 evt.fill,
1302 portfolio.clone(),
1303 strategy.clone(),
1304 strategy_ctx.clone(),
1305 orchestrator.clone(),
1306 metrics.clone(),
1307 fill_alerts.clone(),
1308 fill_state.clone(),
1309 persisted.clone(),
1310 )
1311 .await
1312 {
1313 warn!(error = %err, "fill handler failed");
1314 }
1315 }
1316 Ok(_) => {}
1317 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1318 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1319 warn!(lag = lag, "fill subscriber lagged");
1320 continue;
1321 }
1322 }
1323 }
1324 }));
1325
1326 let order_bus = bus.clone();
1327 let order_persisted = persisted.clone();
1328 let order_alerts = alerts.clone();
1329 let order_orchestrator = orchestrator.clone();
1330 let order_recorder = recorder;
1333 handles.push(tokio::spawn(async move {
1334 let orchestrator = order_orchestrator.clone();
1335 let persisted = order_persisted.clone();
1336 let recorder = order_recorder;
1337 let mut stream = order_bus.subscribe();
1338 loop {
1339 match stream.recv().await {
1340 Ok(Event::OrderUpdate(evt)) => {
1341 if let Some(handle) = recorder.as_ref() {
1342 handle.record_order(evt.order.clone());
1343 }
1344 if let Err(err) = process_order_update_event(
1345 evt.order,
1346 orchestrator.clone(),
1347 order_alerts.clone(),
1348 state_repo.clone(),
1349 persisted.clone(),
1350 )
1351 .await
1352 {
1353 warn!(error = %err, "order update handler failed");
1354 }
1355 }
1356 Ok(_) => {}
1357 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1358 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1359 warn!(lag = lag, "order subscriber lagged");
1360 continue;
1361 }
1362 }
1363 }
1364 }));
1365
1366 handles
1367}
1368
1369#[allow(clippy::too_many_arguments)]
1370async fn process_tick_event(
1371 tick: Tick,
1372 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1373 strategy_ctx: Arc<Mutex<StrategyContext>>,
1374 metrics: Arc<LiveMetrics>,
1375 alerts: Arc<AlertManager>,
1376 market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1377 portfolio: Arc<Mutex<Portfolio>>,
1378 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1379 persisted: Arc<Mutex<LiveState>>,
1380 bus: Arc<EventBus>,
1381 last_data_timestamp: Arc<AtomicI64>,
1382) -> Result<()> {
1383 metrics.inc_tick();
1384 metrics.update_staleness(0.0);
1385 metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1386 last_data_timestamp.store(tick.exchange_timestamp.timestamp(), Ordering::SeqCst);
1387 alerts.heartbeat().await;
1388 {
1389 let mut guard = market.lock().await;
1390 if let Some(snapshot) = guard.get_mut(&tick.symbol) {
1391 snapshot.last_trade = Some(tick.price);
1392 snapshot.last_trade_ts = Some(tick.exchange_timestamp);
1393 }
1394 }
1395 let mut drawdown_triggered = false;
1396 let mut snapshot_on_trigger = None;
1397 {
1398 let mut guard = portfolio.lock().await;
1399 let was_liquidate_only = guard.liquidate_only();
1400 match guard.update_market_data(&tick.symbol, tick.price) {
1401 Ok(_) => {
1402 if !was_liquidate_only && guard.liquidate_only() {
1403 drawdown_triggered = true;
1404 snapshot_on_trigger = Some(guard.snapshot());
1405 }
1406 }
1407 Err(err) => {
1408 warn!(
1409 symbol = %tick.symbol,
1410 error = %err,
1411 "failed to refresh market data"
1412 );
1413 }
1414 }
1415 }
1416 {
1417 let mut state = persisted.lock().await;
1418 state.last_prices.insert(tick.symbol.clone(), tick.price);
1419 if drawdown_triggered {
1420 if let Some(snapshot) = snapshot_on_trigger.take() {
1421 state.portfolio = Some(snapshot);
1422 }
1423 }
1424 }
1425 if drawdown_triggered {
1426 persist_state(
1427 state_repo.clone(),
1428 persisted.clone(),
1429 Some(strategy.clone()),
1430 )
1431 .await?;
1432 alert_liquidate_only(alerts.clone()).await;
1433 }
1434 {
1435 let mut ctx = strategy_ctx.lock().await;
1436 ctx.push_tick(tick.clone());
1437 let lock_start = Instant::now();
1438 let mut strat = strategy.lock().await;
1439 log_strategy_lock("tick", lock_start.elapsed());
1440 let call_start = Instant::now();
1441 strat
1442 .on_tick(&ctx, &tick)
1443 .await
1444 .context("strategy failure on tick event")?;
1445 log_strategy_call("tick", call_start.elapsed());
1446 }
1447 emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1448 Ok(())
1449}
1450
1451#[allow(clippy::too_many_arguments)]
1452async fn process_candle_event(
1453 candle: Candle,
1454 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1455 strategy_ctx: Arc<Mutex<StrategyContext>>,
1456 metrics: Arc<LiveMetrics>,
1457 alerts: Arc<AlertManager>,
1458 market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1459 portfolio: Arc<Mutex<Portfolio>>,
1460 orchestrator: Arc<OrderOrchestrator>,
1461 exec_backend: ExecutionBackend,
1462 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1463 persisted: Arc<Mutex<LiveState>>,
1464 bus: Arc<EventBus>,
1465 last_data_timestamp: Arc<AtomicI64>,
1466) -> Result<()> {
1467 metrics.inc_candle();
1468 metrics.update_staleness(0.0);
1469 metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1470 last_data_timestamp.store(candle.timestamp.timestamp(), Ordering::SeqCst);
1471 alerts.heartbeat().await;
1472 metrics.update_price(&candle.symbol, candle.close.to_f64().unwrap_or(0.0));
1473 {
1474 let mut guard = market.lock().await;
1475 if let Some(snapshot) = guard.get_mut(&candle.symbol) {
1476 snapshot.last_candle = Some(candle.clone());
1477 snapshot.last_trade = Some(candle.close);
1478 }
1479 }
1480 if exec_backend.is_paper() {
1481 let client = orchestrator.execution_engine().client();
1482 if let Some(paper) = client.as_any().downcast_ref::<PaperExecutionClient>() {
1483 paper.update_price(&candle.symbol, candle.close);
1484 }
1485 }
1486 let mut candle_drawdown_triggered = false;
1487 let mut candle_snapshot = None;
1488 {
1489 let mut guard = portfolio.lock().await;
1490 let was_liquidate_only = guard.liquidate_only();
1491 match guard.update_market_data(&candle.symbol, candle.close) {
1492 Ok(_) => {
1493 if !was_liquidate_only && guard.liquidate_only() {
1494 candle_drawdown_triggered = true;
1495 candle_snapshot = Some(guard.snapshot());
1496 }
1497 }
1498 Err(err) => {
1499 warn!(
1500 symbol = %candle.symbol,
1501 error = %err,
1502 "failed to refresh market data"
1503 );
1504 }
1505 }
1506 }
1507 if candle_drawdown_triggered {
1508 if let Some(snapshot) = candle_snapshot.take() {
1509 let mut persisted_guard = persisted.lock().await;
1510 persisted_guard.portfolio = Some(snapshot);
1511 }
1512 alert_liquidate_only(alerts.clone()).await;
1513 }
1514 {
1515 let mut ctx = strategy_ctx.lock().await;
1516 ctx.push_candle(candle.clone());
1517 let lock_start = Instant::now();
1518 let mut strat = strategy.lock().await;
1519 log_strategy_lock("candle", lock_start.elapsed());
1520 let call_start = Instant::now();
1521 strat
1522 .on_candle(&ctx, &candle)
1523 .await
1524 .context("strategy failure on candle event")?;
1525 log_strategy_call("candle", call_start.elapsed());
1526 }
1527 {
1528 let mut snapshot = persisted.lock().await;
1529 snapshot.last_candle_ts = Some(candle.timestamp);
1530 snapshot
1531 .last_prices
1532 .insert(candle.symbol.clone(), candle.close);
1533 }
1534 persist_state(
1535 state_repo.clone(),
1536 persisted.clone(),
1537 Some(strategy.clone()),
1538 )
1539 .await?;
1540 let ctx = shared_risk_context(&candle.symbol, &portfolio, &market, &persisted).await;
1541 orchestrator.update_risk_context(candle.symbol.clone(), ctx);
1542 emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1543 Ok(())
1544}
1545
1546#[allow(clippy::too_many_arguments)]
1547async fn process_order_book_event(
1548 mut book: OrderBook,
1549 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1550 strategy_ctx: Arc<Mutex<StrategyContext>>,
1551 metrics: Arc<LiveMetrics>,
1552 alerts: Arc<AlertManager>,
1553 _market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1554 bus: Arc<EventBus>,
1555 last_data_timestamp: Arc<AtomicI64>,
1556 driver: Arc<String>,
1557) -> Result<()> {
1558 metrics.update_staleness(0.0);
1559 alerts.heartbeat().await;
1560 last_data_timestamp.store(book.timestamp.timestamp(), Ordering::SeqCst);
1561 let driver_name = driver.as_str();
1562 let local_checksum = if let Some(cs) = book.local_checksum {
1563 cs
1564 } else {
1565 let computed = book.computed_checksum(None);
1566 book.local_checksum = Some(computed);
1567 computed
1568 };
1569 if let Some(expected) = book.exchange_checksum {
1570 if expected != local_checksum {
1571 metrics.inc_checksum_mismatch(driver_name, &book.symbol);
1572 alerts
1573 .order_book_checksum_mismatch(driver_name, &book.symbol, expected, local_checksum)
1574 .await;
1575 }
1576 }
1577 {
1578 let mut ctx = strategy_ctx.lock().await;
1579 ctx.push_order_book(book.clone());
1580 let lock_start = Instant::now();
1581 let mut strat = strategy.lock().await;
1582 log_strategy_lock("order_book", lock_start.elapsed());
1583 let call_start = Instant::now();
1584 strat
1585 .on_order_book(&ctx, &book)
1586 .await
1587 .context("strategy failure on order book")?;
1588 log_strategy_call("order_book", call_start.elapsed());
1589 }
1590 emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1591 Ok(())
1592}
1593
1594async fn process_signal_event(
1595 signal: Signal,
1596 orchestrator: Arc<OrderOrchestrator>,
1597 portfolio: Arc<Mutex<Portfolio>>,
1598 market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1599 persisted: Arc<Mutex<LiveState>>,
1600 alerts: Arc<AlertManager>,
1601 metrics: Arc<LiveMetrics>,
1602) -> Result<()> {
1603 let ctx = shared_risk_context(&signal.symbol, &portfolio, &market, &persisted).await;
1604 orchestrator.update_risk_context(signal.symbol.clone(), ctx);
1605 match orchestrator.on_signal(&signal, &ctx).await {
1606 Ok(()) => {
1607 alerts.reset_order_failures().await;
1608 }
1609 Err(err) => {
1610 metrics.inc_order_failure();
1611 alerts
1612 .order_failure(&format!("orchestrator error: {err}"))
1613 .await;
1614 }
1615 }
1616 Ok(())
1617}
1618
1619fn log_strategy_lock(event: &str, wait: Duration) {
1620 let wait_ms = wait.as_secs_f64() * 1000.0;
1621 if wait >= STRATEGY_LOCK_WARN_THRESHOLD {
1622 warn!(target: "strategy", event, wait_ms, "strategy lock wait exceeded threshold");
1623 } else {
1624 trace!(target: "strategy", event, wait_ms, "strategy lock acquired");
1625 }
1626}
1627
1628fn log_strategy_call(event: &str, elapsed: Duration) {
1629 let duration_ms = elapsed.as_secs_f64() * 1000.0;
1630 if elapsed >= STRATEGY_CALL_WARN_THRESHOLD {
1631 warn!(target: "strategy", event, duration_ms, "strategy call latency above threshold");
1632 } else {
1633 trace!(target: "strategy", event, duration_ms, "strategy call completed");
1634 }
1635}
1636
1637#[allow(clippy::too_many_arguments)]
1638async fn process_fill_event(
1639 fill: Fill,
1640 portfolio: Arc<Mutex<Portfolio>>,
1641 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1642 strategy_ctx: Arc<Mutex<StrategyContext>>,
1643 orchestrator: Arc<OrderOrchestrator>,
1644 metrics: Arc<LiveMetrics>,
1645 alerts: Arc<AlertManager>,
1646 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1647 persisted: Arc<Mutex<LiveState>>,
1648) -> Result<()> {
1649 let mut drawdown_triggered = false;
1650 {
1651 let mut guard = portfolio.lock().await;
1652 let was_liquidate_only = guard.liquidate_only();
1653 guard
1654 .apply_fill(&fill)
1655 .context("Failed to apply fill to portfolio")?;
1656 if !was_liquidate_only && guard.liquidate_only() {
1657 drawdown_triggered = true;
1658 }
1659 let snapshot = guard.snapshot();
1660 let mut persisted_guard = persisted.lock().await;
1661 persisted_guard.portfolio = Some(snapshot);
1662 }
1663 {
1664 let positions = {
1665 let guard = portfolio.lock().await;
1666 guard.positions()
1667 };
1668 let mut ctx = strategy_ctx.lock().await;
1669 ctx.update_positions(positions);
1670 }
1671 orchestrator.on_fill(&fill).await.ok();
1672 {
1673 let ctx = strategy_ctx.lock().await;
1674 let lock_start = Instant::now();
1675 let mut strat = strategy.lock().await;
1676 log_strategy_lock("fill", lock_start.elapsed());
1677 let call_start = Instant::now();
1678 strat
1679 .on_fill(&ctx, &fill)
1680 .await
1681 .context("Strategy failed on fill event")?;
1682 log_strategy_call("fill", call_start.elapsed());
1683 }
1684 let equity = {
1685 let guard = portfolio.lock().await;
1686 guard.equity()
1687 };
1688 if let Some(value) = equity.to_f64() {
1689 metrics.update_equity(value);
1690 }
1691 alerts.update_equity(equity).await;
1692 metrics.inc_order();
1693 alerts
1694 .notify(
1695 "Order Filled",
1696 &format!(
1697 "order filled: {}@{} ({})",
1698 fill.fill_quantity,
1699 fill.fill_price,
1700 match fill.side {
1701 Side::Buy => "buy",
1702 Side::Sell => "sell",
1703 }
1704 ),
1705 )
1706 .await;
1707 if drawdown_triggered {
1708 alert_liquidate_only(alerts.clone()).await;
1709 }
1710 persist_state(
1711 state_repo.clone(),
1712 persisted.clone(),
1713 Some(strategy.clone()),
1714 )
1715 .await?;
1716 Ok(())
1717}
1718
1719async fn process_order_update_event(
1720 order: Order,
1721 orchestrator: Arc<OrderOrchestrator>,
1722 alerts: Arc<AlertManager>,
1723 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1724 persisted: Arc<Mutex<LiveState>>,
1725) -> Result<()> {
1726 orchestrator.on_order_update(&order);
1727 if matches!(order.status, OrderStatus::Rejected) {
1728 error!(
1729 order_id = %order.id,
1730 symbol = %order.request.symbol,
1731 "order rejected by exchange"
1732 );
1733 alerts.order_failure("order rejected by exchange").await;
1734 alerts
1735 .notify(
1736 "Order rejected",
1737 &format!(
1738 "Order {} for {} was rejected",
1739 order.id, order.request.symbol
1740 ),
1741 )
1742 .await;
1743 }
1744 {
1745 let mut snapshot = persisted.lock().await;
1746 let mut found = false;
1747 for existing in &mut snapshot.open_orders {
1748 if existing.id == order.id {
1749 *existing = order.clone();
1750 found = true;
1751 break;
1752 }
1753 }
1754 if !found {
1755 snapshot.open_orders.push(order.clone());
1756 }
1757 if matches!(
1758 order.status,
1759 OrderStatus::Filled | OrderStatus::Canceled | OrderStatus::Rejected
1760 ) {
1761 snapshot.open_orders.retain(|o| o.id != order.id);
1762 }
1763 }
1764 persist_state(state_repo, persisted, None).await?;
1765 Ok(())
1766}
1767
1768async fn emit_signals(
1769 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1770 bus: Arc<EventBus>,
1771 metrics: Arc<LiveMetrics>,
1772) {
1773 let signals = {
1774 let mut strat = strategy.lock().await;
1775 strat.drain_signals()
1776 };
1777 if signals.is_empty() {
1778 return;
1779 }
1780 metrics.inc_signals(signals.len());
1781 for signal in signals {
1782 bus.publish(Event::Signal(SignalEvent { signal }));
1783 }
1784}
1785
1786async fn persist_state(
1787 repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1788 persisted: Arc<Mutex<LiveState>>,
1789 strategy: Option<Arc<Mutex<Box<dyn Strategy>>>>,
1790) -> Result<()> {
1791 if let Some(strat_lock) = strategy {
1792 let strat = strat_lock.lock().await;
1794 if let Ok(json_state) = strat.snapshot() {
1795 let mut guard = persisted.lock().await;
1796 guard.strategy_state = Some(json_state);
1797 } else {
1798 warn!("failed to snapshot strategy state");
1799 }
1800 }
1801
1802 let snapshot = {
1803 let guard = persisted.lock().await;
1804 guard.clone()
1805 };
1806 tokio::task::spawn_blocking(move || repo.save(&snapshot))
1807 .await
1808 .map_err(|err| anyhow!("state persistence task failed: {err}"))?
1809 .map_err(|err| anyhow!(err.to_string()))
1810}
1811
1812async fn shared_risk_context(
1813 symbol: &str,
1814 portfolio: &Arc<Mutex<Portfolio>>,
1815 market: &Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1816 persisted: &Arc<Mutex<LiveState>>,
1817) -> RiskContext {
1818 let (signed_qty, equity, liquidate_only) = {
1819 let guard = portfolio.lock().await;
1820 (
1821 guard.signed_position_qty(symbol),
1822 guard.equity(),
1823 guard.liquidate_only(),
1824 )
1825 };
1826 let observed_price = {
1827 let guard = market.lock().await;
1828 guard.get(symbol).and_then(|snapshot| snapshot.price())
1829 };
1830 let last_price = if let Some(price) = observed_price {
1831 price
1832 } else {
1833 let guard = persisted.lock().await;
1834 guard
1835 .last_prices
1836 .get(symbol)
1837 .copied()
1838 .unwrap_or(Decimal::ZERO)
1839 };
1840 RiskContext {
1841 signed_position_qty: signed_qty,
1842 portfolio_equity: equity,
1843 last_price,
1844 liquidate_only,
1845 }
1846}
1847
1848async fn alert_liquidate_only(alerts: Arc<AlertManager>) {
1849 alerts
1850 .notify(
1851 "Max drawdown triggered",
1852 "Portfolio entered liquidate-only mode; new exposure blocked until review",
1853 )
1854 .await;
1855}
1856
1857fn spawn_connection_monitor(
1858 shutdown: ShutdownSignal,
1859 flag: Arc<AtomicBool>,
1860 metrics: Arc<LiveMetrics>,
1861 stream: &'static str,
1862) -> JoinHandle<()> {
1863 tokio::spawn(async move {
1864 loop {
1865 metrics.update_connection_status(stream, flag.load(Ordering::SeqCst));
1866 if !shutdown.sleep(Duration::from_secs(5)).await {
1867 break;
1868 }
1869 }
1870 })
1871}
1872
1873fn spawn_order_timeout_monitor(
1874 orchestrator: Arc<OrderOrchestrator>,
1875 bus: Arc<EventBus>,
1876 alerts: Arc<AlertManager>,
1877 shutdown: ShutdownSignal,
1878) -> JoinHandle<()> {
1879 tokio::spawn(async move {
1880 let mut ticker = tokio::time::interval(tesser_execution::orchestrator::ORDER_POLL_INTERVAL);
1881 loop {
1882 ticker.tick().await;
1883 if shutdown.triggered() {
1884 break;
1885 }
1886 match orchestrator.poll_stale_orders().await {
1887 Ok(updates) => {
1888 for order in updates {
1889 if matches!(order.status, OrderStatus::Rejected | OrderStatus::Canceled) {
1890 let message = format!(
1891 "Order {} for {} timed out after {}s",
1892 order.id,
1893 order.request.symbol,
1894 tesser_execution::orchestrator::ORDER_TIMEOUT.as_secs()
1895 );
1896 error!(%message);
1897 alerts.order_failure(&message).await;
1898 alerts.notify("Order timeout", &message).await;
1899 }
1900 bus.publish(Event::OrderUpdate(OrderUpdateEvent { order }));
1901 }
1902 }
1903 Err(err) => {
1904 warn!(error = %err, "order timeout monitor failed");
1905 }
1906 }
1907 }
1908 })
1909}
1910
1911async fn load_market_registry(
1912 client: Arc<dyn ExecutionClient>,
1913 settings: &LiveSessionSettings,
1914) -> Result<Arc<MarketRegistry>> {
1915 if let Some(path) = &settings.markets_file {
1916 let registry = MarketRegistry::load_from_file(path)
1917 .with_context(|| format!("failed to load markets from {}", path.display()))?;
1918 return Ok(Arc::new(registry));
1919 }
1920
1921 if settings.exec_backend.is_paper() {
1922 return Err(anyhow!(
1923 "paper execution requires --markets-file when exchange metadata is unavailable"
1924 ));
1925 }
1926
1927 let instruments = client
1928 .list_instruments(settings.category.as_path())
1929 .await
1930 .context("failed to fetch instruments from execution client")?;
1931 let registry =
1932 MarketRegistry::from_instruments(instruments).map_err(|err| anyhow!(err.to_string()))?;
1933 Ok(Arc::new(registry))
1934}
1935
1936#[cfg(feature = "bybit")]
1937#[allow(clippy::too_many_arguments)]
1938fn spawn_bybit_private_stream(
1939 creds: BybitCredentials,
1940 ws_url: String,
1941 private_tx: mpsc::Sender<BrokerEvent>,
1942 exec_client: Arc<dyn ExecutionClient>,
1943 symbols: Vec<String>,
1944 last_sync: Arc<tokio::sync::Mutex<Option<DateTime<Utc>>>>,
1945 private_connection_flag: Option<Arc<AtomicBool>>,
1946 metrics: Arc<LiveMetrics>,
1947 shutdown: ShutdownSignal,
1948) {
1949 tokio::spawn(async move {
1950 loop {
1951 match tesser_bybit::ws::connect_private(
1952 &ws_url,
1953 &creds,
1954 private_connection_flag.clone(),
1955 )
1956 .await
1957 {
1958 Ok(mut socket) => {
1959 if let Some(flag) = &private_connection_flag {
1960 flag.store(true, Ordering::SeqCst);
1961 }
1962 metrics.update_connection_status("private", true);
1963 info!("Connected to Bybit private WebSocket stream");
1964 for symbol in &symbols {
1965 match exec_client.list_open_orders(symbol).await {
1966 Ok(orders) => {
1967 for order in orders {
1968 if let Err(err) =
1969 private_tx.send(BrokerEvent::OrderUpdate(order)).await
1970 {
1971 error!("failed to send reconciled order update: {err}");
1972 }
1973 }
1974 }
1975 Err(e) => {
1976 error!("failed to reconcile open orders for {symbol}: {e}");
1977 }
1978 }
1979 }
1980 if let Some(bybit) = exec_client.as_any().downcast_ref::<BybitClient>() {
1981 let since = {
1982 let guard = last_sync.lock().await;
1983 guard.unwrap_or_else(|| Utc::now() - chrono::Duration::minutes(30))
1984 };
1985 match bybit.list_executions_since(since).await {
1986 Ok(fills) => {
1987 for fill in fills {
1988 if let Err(err) = private_tx.send(BrokerEvent::Fill(fill)).await
1989 {
1990 error!("failed to send reconciled fill: {err}");
1991 }
1992 }
1993 }
1994 Err(e) => {
1995 error!("failed to reconcile executions since {:?}: {}", since, e);
1996 }
1997 }
1998 let mut guard = last_sync.lock().await;
1999 *guard = Some(Utc::now());
2000 }
2001
2002 while let Some(msg) = socket.next().await {
2003 if shutdown.triggered() {
2004 break;
2005 }
2006 if let Ok(Message::Text(text)) = msg {
2007 if let Ok(value) = serde_json::from_str::<serde_json::Value>(&text) {
2008 if let Some(topic) = value.get("topic").and_then(|v| v.as_str()) {
2009 match topic {
2010 "order" => {
2011 if let Ok(msg) = serde_json::from_value::<
2012 PrivateMessage<BybitWsOrder>,
2013 >(
2014 value.clone()
2015 ) {
2016 for update in msg.data {
2017 if let Ok(order) = update.to_tesser_order(None)
2018 {
2019 if let Err(err) = private_tx
2020 .send(BrokerEvent::OrderUpdate(order))
2021 .await
2022 {
2023 error!(
2024 "failed to send private order update: {err}"
2025 );
2026 }
2027 }
2028 }
2029 }
2030 }
2031 "execution" => {
2032 if let Ok(msg) = serde_json::from_value::<
2033 PrivateMessage<BybitWsExecution>,
2034 >(
2035 value.clone()
2036 ) {
2037 for exec in msg.data {
2038 if let Ok(fill) = exec.to_tesser_fill() {
2039 if let Err(err) = private_tx
2040 .send(BrokerEvent::Fill(fill))
2041 .await
2042 {
2043 error!(
2044 "failed to send private fill event: {err}"
2045 );
2046 }
2047 }
2048 }
2049 }
2050 }
2051 _ => {}
2052 }
2053 }
2054 }
2055 }
2056 }
2057 }
2058 Err(e) => {
2059 if let Some(flag) = &private_connection_flag {
2060 flag.store(false, Ordering::SeqCst);
2061 }
2062 metrics.update_connection_status("private", false);
2063 error!("Bybit private WebSocket connection failed: {e}. Retrying...");
2064 tokio::time::sleep(Duration::from_secs(5)).await;
2065 }
2066 }
2067 if shutdown.triggered() {
2068 break;
2069 }
2070 }
2071 });
2072}
2073
2074#[cfg(feature = "binance")]
2075#[allow(clippy::too_many_arguments)]
2076fn spawn_binance_private_stream(
2077 exec_client: Arc<dyn ExecutionClient>,
2078 ws_url: String,
2079 private_tx: mpsc::Sender<BrokerEvent>,
2080 private_connection_flag: Option<Arc<AtomicBool>>,
2081 metrics: Arc<LiveMetrics>,
2082 shutdown: ShutdownSignal,
2083) {
2084 tokio::spawn(async move {
2085 loop {
2086 let Some(binance) = exec_client
2087 .as_ref()
2088 .as_any()
2089 .downcast_ref::<BinanceClient>()
2090 else {
2091 warn!("execution client is not Binance");
2092 return;
2093 };
2094 let listen_key = match binance.start_user_stream().await {
2095 Ok(key) => key,
2096 Err(err) => {
2097 error!("failed to start Binance user stream: {err}");
2098 tokio::time::sleep(Duration::from_secs(5)).await;
2099 continue;
2100 }
2101 };
2102 match BinanceUserDataStream::connect(&ws_url, &listen_key).await {
2103 Ok(user_stream) => {
2104 if let Some(flag) = &private_connection_flag {
2105 flag.store(true, Ordering::SeqCst);
2106 }
2107 metrics.update_connection_status("private", true);
2108 let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
2109 let tx_orders = private_tx.clone();
2110 user_stream.on_event(move |event| {
2111 if let Some(update) = extract_order_update(&event) {
2112 if let Some(order) = order_from_update(update) {
2113 let _ = tx_orders.blocking_send(BrokerEvent::OrderUpdate(order));
2114 }
2115 if let Some(fill) = fill_from_update(update) {
2116 let _ = tx_orders.blocking_send(BrokerEvent::Fill(fill));
2117 }
2118 }
2119 if matches!(event, UserDataStreamEventsResponse::ListenKeyExpired(_)) {
2120 let _ = reconnect_tx.try_send(());
2121 }
2122 });
2123 let keepalive_client = exec_client.clone();
2124 let keepalive_handle = tokio::spawn(async move {
2125 let mut interval = tokio::time::interval(Duration::from_secs(30 * 60));
2126 loop {
2127 interval.tick().await;
2128 let Some(client) = keepalive_client
2129 .as_ref()
2130 .as_any()
2131 .downcast_ref::<BinanceClient>()
2132 else {
2133 break;
2134 };
2135 if client.keepalive_user_stream().await.is_err() {
2136 break;
2137 }
2138 }
2139 });
2140 tokio::select! {
2141 _ = reconnect_rx.recv() => {
2142 warn!("binance listen key expired; reconnecting");
2143 }
2144 _ = shutdown.wait() => {
2145 keepalive_handle.abort();
2146 let _ = user_stream.unsubscribe().await;
2147 return;
2148 }
2149 }
2150 keepalive_handle.abort();
2151 let _ = user_stream.unsubscribe().await;
2152 }
2153 Err(err) => {
2154 error!("failed to connect to Binance user stream: {err}");
2155 }
2156 }
2157 if let Some(flag) = &private_connection_flag {
2158 flag.store(false, Ordering::SeqCst);
2159 }
2160 metrics.update_connection_status("private", false);
2161 if shutdown.triggered() {
2162 break;
2163 }
2164 tokio::time::sleep(Duration::from_secs(5)).await;
2165 }
2166 });
2167}