1use std::collections::{HashMap, HashSet};
2use std::net::SocketAddr;
3use std::path::PathBuf;
4use std::sync::{
5 atomic::{AtomicBool, AtomicI64, Ordering},
6 Arc, Once,
7};
8use std::time::{Duration, Instant};
9
10use anyhow::{anyhow, bail, Context, Result};
11use chrono::{DateTime, Utc};
12use clap::ValueEnum;
13use futures::StreamExt;
14use rust_decimal::{prelude::ToPrimitive, Decimal};
15use tokio::sync::{mpsc, Mutex, Notify};
16use tokio::task::JoinHandle;
17use tokio_tungstenite::tungstenite::Message;
18use tracing::{debug, error, info, trace, warn};
19
20use serde_json::{json, Value};
21
22fn ensure_builtin_connectors_registered() {
23 static INIT: Once = Once::new();
24 INIT.call_once(|| {
25 register_connector_factory(Arc::new(PaperFactory::default()));
26 #[cfg(feature = "bybit")]
27 register_bybit_factory();
28 #[cfg(feature = "binance")]
29 register_binance_factory();
30 });
31}
32
33#[cfg(feature = "binance")]
34use tesser_binance::{
35 fill_from_update, order_from_update, register_factory as register_binance_factory,
36 ws::{extract_order_update, BinanceUserDataStream, UserDataStreamEventsResponse},
37 BinanceClient,
38};
39use tesser_broker::{
40 get_connector_factory, register_connector_factory, BrokerResult, ConnectorFactory,
41 ConnectorStream, ConnectorStreamConfig, ExecutionClient,
42};
43#[cfg(feature = "bybit")]
44use tesser_bybit::ws::{BybitWsExecution, BybitWsOrder, PrivateMessage};
45#[cfg(feature = "bybit")]
46use tesser_bybit::{register_factory as register_bybit_factory, BybitClient, BybitCredentials};
47use tesser_config::{AlertingConfig, ExchangeConfig, PersistenceEngine, RiskManagementConfig};
48use tesser_core::{
49 AccountBalance, Candle, Fill, Interval, Order, OrderBook, OrderStatus, Position, Price,
50 Quantity, Side, Signal, Symbol, Tick,
51};
52use tesser_data::recorder::{ParquetRecorder, RecorderConfig, RecorderHandle};
53use tesser_events::{
54 CandleEvent, Event, EventBus, FillEvent, OrderBookEvent, OrderUpdateEvent, SignalEvent,
55 TickEvent,
56};
57use tesser_execution::{
58 AlgoStateRepository, BasicRiskChecker, ExecutionEngine, FixedOrderSizer, OrderOrchestrator,
59 PreTradeRiskChecker, RiskContext, RiskLimits, SqliteAlgoStateRepository, StoredAlgoState,
60};
61use tesser_journal::LmdbJournal;
62use tesser_markets::MarketRegistry;
63use tesser_paper::{PaperExecutionClient, PaperFactory};
64use tesser_portfolio::{
65 LiveState, Portfolio, PortfolioConfig, SqliteStateRepository, StateRepository,
66};
67use tesser_strategy::{Strategy, StrategyContext};
68
69use crate::alerts::{AlertDispatcher, AlertManager};
70use crate::control;
71use crate::telemetry::{spawn_metrics_server, LiveMetrics};
72use crate::PublicChannel;
73
74#[derive(Debug)]
76pub enum BrokerEvent {
77 OrderUpdate(Order),
78 Fill(Fill),
79}
80
81#[derive(Clone, Copy, Debug, Eq, PartialEq, ValueEnum)]
82#[value(rename_all = "kebab-case")]
83pub enum ExecutionBackend {
84 Paper,
85 Live,
86}
87
88impl ExecutionBackend {
89 fn is_paper(self) -> bool {
90 matches!(self, Self::Paper)
91 }
92}
93
94#[derive(Clone, Copy, Debug, Eq, PartialEq, ValueEnum)]
95#[value(rename_all = "kebab-case")]
96pub enum PersistenceBackend {
97 Sqlite,
98 Lmdb,
99}
100
101impl From<PersistenceBackend> for PersistenceEngine {
102 fn from(value: PersistenceBackend) -> Self {
103 match value {
104 PersistenceBackend::Sqlite => PersistenceEngine::Sqlite,
105 PersistenceBackend::Lmdb => PersistenceEngine::Lmdb,
106 }
107 }
108}
109
110const DEFAULT_ORDER_BOOK_DEPTH: usize = 50;
111
112pub const fn default_order_book_depth() -> usize {
113 DEFAULT_ORDER_BOOK_DEPTH
114}
115const STRATEGY_LOCK_WARN_THRESHOLD: Duration = Duration::from_millis(25);
116const STRATEGY_CALL_WARN_THRESHOLD: Duration = Duration::from_millis(250);
117
118#[async_trait::async_trait]
119trait LiveMarketStream: Send {
120 async fn next_tick(&mut self) -> BrokerResult<Option<Tick>>;
121 async fn next_candle(&mut self) -> BrokerResult<Option<Candle>>;
122 async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>>;
123}
124
125struct FactoryStreamAdapter {
126 inner: Box<dyn ConnectorStream>,
127}
128
129impl FactoryStreamAdapter {
130 fn new(inner: Box<dyn ConnectorStream>) -> Self {
131 Self { inner }
132 }
133}
134
135#[async_trait::async_trait]
136impl LiveMarketStream for FactoryStreamAdapter {
137 async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
138 self.inner.next_tick().await
139 }
140
141 async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
142 self.inner.next_candle().await
143 }
144
145 async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>> {
146 self.inner.next_order_book().await
147 }
148}
149
150#[derive(Clone)]
151pub struct PersistenceSettings {
152 pub engine: PersistenceEngine,
153 pub state_path: PathBuf,
154 pub algo_path: PathBuf,
155}
156
157impl PersistenceSettings {
158 pub fn new(engine: PersistenceEngine, state_path: PathBuf) -> Self {
159 let algo_path = match engine {
160 PersistenceEngine::Sqlite => state_path.with_extension("algos.db"),
161 PersistenceEngine::Lmdb => state_path.clone(),
162 };
163 Self {
164 engine,
165 state_path,
166 algo_path,
167 }
168 }
169
170 fn algo_repo_path(&self) -> &PathBuf {
171 &self.algo_path
172 }
173}
174
175struct PersistenceHandles {
176 state: Arc<dyn StateRepository<Snapshot = LiveState>>,
177 algo: Arc<dyn AlgoStateRepository<State = StoredAlgoState>>,
178}
179
180pub struct LiveSessionSettings {
181 pub category: PublicChannel,
182 pub interval: Interval,
183 pub quantity: Quantity,
184 pub slippage_bps: Decimal,
185 pub fee_bps: Decimal,
186 pub history: usize,
187 pub metrics_addr: SocketAddr,
188 pub persistence: PersistenceSettings,
189 pub initial_balances: HashMap<Symbol, Decimal>,
190 pub reporting_currency: Symbol,
191 pub markets_file: Option<PathBuf>,
192 pub alerting: AlertingConfig,
193 pub exec_backend: ExecutionBackend,
194 pub risk: RiskManagementConfig,
195 pub reconciliation_interval: Duration,
196 pub reconciliation_threshold: Decimal,
197 pub driver: String,
198 pub orderbook_depth: usize,
199 pub record_path: Option<PathBuf>,
200 pub control_addr: SocketAddr,
201}
202
203impl LiveSessionSettings {
204 fn risk_limits(&self) -> RiskLimits {
205 RiskLimits {
206 max_order_quantity: self.risk.max_order_quantity.max(Decimal::ZERO),
207 max_position_quantity: self.risk.max_position_quantity.max(Decimal::ZERO),
208 }
209 }
210}
211
212fn build_persistence_handles(settings: &LiveSessionSettings) -> Result<PersistenceHandles> {
213 match settings.persistence.engine {
214 PersistenceEngine::Sqlite => {
215 let state_repo: Arc<dyn StateRepository<Snapshot = LiveState>> = Arc::new(
216 SqliteStateRepository::new(settings.persistence.state_path.clone()),
217 );
218 let algo_repo: Arc<dyn AlgoStateRepository<State = StoredAlgoState>> = Arc::new(
219 SqliteAlgoStateRepository::new(settings.persistence.algo_repo_path())?,
220 );
221 Ok(PersistenceHandles {
222 state: state_repo,
223 algo: algo_repo,
224 })
225 }
226 PersistenceEngine::Lmdb => {
227 let journal = Arc::new(LmdbJournal::open(&settings.persistence.state_path)?);
228 let state_repo: Arc<dyn StateRepository<Snapshot = LiveState>> =
229 Arc::new(journal.state_repo());
230 let algo_repo: Arc<dyn AlgoStateRepository<State = StoredAlgoState>> =
231 Arc::new(journal.algo_repo());
232 Ok(PersistenceHandles {
233 state: state_repo,
234 algo: algo_repo,
235 })
236 }
237 }
238}
239
240pub async fn run_live(
241 strategy: Box<dyn Strategy>,
242 symbols: Vec<String>,
243 exchange: ExchangeConfig,
244 settings: LiveSessionSettings,
245) -> Result<()> {
246 run_live_with_shutdown(strategy, symbols, exchange, settings, ShutdownSignal::new()).await
247}
248
249pub async fn run_live_with_shutdown(
251 strategy: Box<dyn Strategy>,
252 symbols: Vec<String>,
253 exchange: ExchangeConfig,
254 settings: LiveSessionSettings,
255 shutdown: ShutdownSignal,
256) -> Result<()> {
257 if symbols.is_empty() {
258 return Err(anyhow!("strategy did not declare any subscriptions"));
259 }
260 if settings.quantity <= Decimal::ZERO {
261 return Err(anyhow!("--quantity must be positive"));
262 }
263
264 let public_connection = Arc::new(AtomicBool::new(false));
265 let private_connection = if matches!(settings.exec_backend, ExecutionBackend::Live) {
266 Some(Arc::new(AtomicBool::new(false)))
267 } else {
268 None
269 };
270 ensure_builtin_connectors_registered();
271 let connector_payload = build_exchange_payload(&exchange, &settings);
272 let connector_factory = get_connector_factory(&settings.driver)
273 .ok_or_else(|| anyhow!("driver {} is not registered", settings.driver))?;
274 let stream_config = ConnectorStreamConfig {
275 ws_url: Some(exchange.ws_url.clone()),
276 metadata: json!({
277 "category": settings.category.as_path(),
278 "symbols": symbols.clone(),
279 "orderbook_depth": settings.orderbook_depth,
280 }),
281 connection_status: Some(public_connection.clone()),
282 };
283 let mut connector_stream = connector_factory
284 .create_market_stream(&connector_payload, stream_config)
285 .await
286 .map_err(|err| anyhow!("failed to create market stream: {err}"))?;
287 connector_stream
288 .subscribe(&symbols, settings.interval)
289 .await
290 .map_err(|err| anyhow!("failed to subscribe via connector: {err}"))?;
291 let market_stream: Box<dyn LiveMarketStream> =
292 Box::new(FactoryStreamAdapter::new(connector_stream));
293
294 let execution_client =
295 build_execution_client(&settings, connector_factory.clone(), &connector_payload).await?;
296 let market_registry = load_market_registry(execution_client.clone(), &settings).await?;
297 if matches!(settings.exec_backend, ExecutionBackend::Live) {
298 info!(
299 rest = %exchange.rest_url,
300 driver = ?settings.driver,
301 "live execution enabled via {:?} REST",
302 settings.driver
303 );
304 }
305 let risk_checker: Arc<dyn PreTradeRiskChecker> =
306 Arc::new(BasicRiskChecker::new(settings.risk_limits()));
307 let execution = ExecutionEngine::new(
308 execution_client.clone(),
309 Box::new(FixedOrderSizer {
310 quantity: settings.quantity,
311 }),
312 risk_checker,
313 );
314
315 let mut bootstrap = None;
316 if matches!(settings.exec_backend, ExecutionBackend::Live) {
317 info!("synchronizing portfolio snapshot from exchange");
318 let positions = execution_client
319 .positions()
320 .await
321 .context("failed to fetch remote positions")?;
322 let balances = execution_client
323 .account_balances()
324 .await
325 .context("failed to fetch remote account balances")?;
326 let mut open_orders = Vec::new();
327 for symbol in &symbols {
328 let mut symbol_orders = execution_client
329 .list_open_orders(symbol)
330 .await
331 .with_context(|| format!("failed to fetch open orders for {symbol}"))?;
332 open_orders.append(&mut symbol_orders);
333 }
334 bootstrap = Some(LiveBootstrap {
335 positions,
336 balances,
337 open_orders,
338 });
339 }
340
341 let persistence = build_persistence_handles(&settings)?;
342
343 let initial_open_orders = bootstrap
345 .as_ref()
346 .map(|data| data.open_orders.clone())
347 .unwrap_or_default();
348 let orchestrator = OrderOrchestrator::new(
349 Arc::new(execution),
350 persistence.algo.clone(),
351 initial_open_orders,
352 )
353 .await?;
354
355 let runtime = LiveRuntime::new(
356 market_stream,
357 strategy,
358 symbols,
359 orchestrator,
360 persistence.state,
361 settings,
362 exchange.ws_url.clone(),
363 market_registry,
364 shutdown,
365 public_connection,
366 private_connection,
367 bootstrap,
368 )
369 .await?;
370 runtime.run().await
371}
372
373async fn build_execution_client(
374 settings: &LiveSessionSettings,
375 connector_factory: Arc<dyn ConnectorFactory>,
376 connector_payload: &Value,
377) -> Result<Arc<dyn ExecutionClient>> {
378 match settings.exec_backend {
379 ExecutionBackend::Paper => {
380 if settings.driver == "paper" {
381 return connector_factory
382 .create_execution_client(connector_payload)
383 .await
384 .map_err(|err| anyhow!("failed to create execution client: {err}"));
385 }
386 Ok(Arc::new(PaperExecutionClient::new(
387 "paper".to_string(),
388 vec!["BTCUSDT".to_string()],
389 settings.slippage_bps,
390 settings.fee_bps,
391 )))
392 }
393 ExecutionBackend::Live => connector_factory
394 .create_execution_client(connector_payload)
395 .await
396 .map_err(|err| anyhow!("failed to create execution client: {err}")),
397 }
398}
399
400struct LiveRuntime {
401 market: Box<dyn LiveMarketStream>,
402 orchestrator: Arc<OrderOrchestrator>,
403 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
404 persisted: Arc<Mutex<LiveState>>,
405 event_bus: Arc<EventBus>,
406 recorder: Option<ParquetRecorder>,
407 control_task: Option<JoinHandle<()>>,
408 shutdown: ShutdownSignal,
409 metrics_task: JoinHandle<()>,
410 alert_task: Option<JoinHandle<()>>,
411 reconciliation_task: Option<JoinHandle<()>>,
412 reconciliation_ctx: Option<Arc<ReconciliationContext>>,
413 private_event_rx: mpsc::Receiver<BrokerEvent>,
414 #[allow(dead_code)]
415 last_private_sync: Arc<tokio::sync::Mutex<Option<DateTime<Utc>>>>,
416 subscriber_handles: Vec<JoinHandle<()>>,
417 connection_monitors: Vec<JoinHandle<()>>,
418 order_timeout_task: JoinHandle<()>,
419 strategy: Arc<Mutex<Box<dyn Strategy>>>,
420 _public_connection: Arc<AtomicBool>,
421 _private_connection: Option<Arc<AtomicBool>>,
422}
423
424struct LiveBootstrap {
425 positions: Vec<Position>,
426 balances: Vec<AccountBalance>,
427 open_orders: Vec<Order>,
428}
429
430impl LiveRuntime {
431 #[allow(clippy::too_many_arguments)]
432 async fn new(
433 market: Box<dyn LiveMarketStream>,
434 mut strategy: Box<dyn Strategy>,
435 symbols: Vec<String>,
436 orchestrator: OrderOrchestrator,
437 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
438 settings: LiveSessionSettings,
439 #[cfg_attr(not(feature = "binance"), allow(unused_variables))] exchange_ws_url: String,
440 market_registry: Arc<MarketRegistry>,
441 shutdown: ShutdownSignal,
442 public_connection: Arc<AtomicBool>,
443 private_connection: Option<Arc<AtomicBool>>,
444 bootstrap: Option<LiveBootstrap>,
445 ) -> Result<Self> {
446 let mut strategy_ctx = StrategyContext::new(settings.history);
447 let driver = Arc::new(settings.driver.clone());
448 let mut persisted = match tokio::task::spawn_blocking({
449 let repo = state_repo.clone();
450 move || repo.load()
451 })
452 .await
453 {
454 Ok(Ok(state)) => state,
455 Ok(Err(err)) => {
456 warn!(error = %err, "failed to load live state; starting from defaults");
457 LiveState::default()
458 }
459 Err(err) => {
460 warn!(error = %err, "state load task failed; starting from defaults");
461 LiveState::default()
462 }
463 };
464 let mut live_bootstrap = None;
465 if let Some(data) = bootstrap {
466 persisted.open_orders = data.open_orders;
467 live_bootstrap = Some((data.positions, data.balances));
468 } else if matches!(settings.exec_backend, ExecutionBackend::Live) {
469 warn!("live session missing bootstrap data; continuing without remote snapshot");
470 }
471
472 let portfolio_cfg = PortfolioConfig {
473 initial_balances: settings.initial_balances.clone(),
474 reporting_currency: settings.reporting_currency.clone(),
475 max_drawdown: Some(settings.risk.max_drawdown),
476 };
477 let portfolio = if let Some((positions, balances)) = live_bootstrap {
478 Portfolio::from_exchange_state(
479 positions,
480 balances,
481 portfolio_cfg.clone(),
482 market_registry.clone(),
483 )
484 } else if let Some(snapshot) = persisted.portfolio.take() {
485 Portfolio::from_state(snapshot, portfolio_cfg.clone(), market_registry.clone())
486 } else {
487 Portfolio::new(portfolio_cfg.clone(), market_registry.clone())
488 };
489 strategy_ctx.update_positions(portfolio.positions());
490 if let Some(state) = persisted.strategy_state.take() {
492 info!("restoring strategy state from persistence");
493 strategy
494 .restore(state)
495 .context("failed to restore strategy state")?;
496 }
497 persisted.portfolio = Some(portfolio.snapshot());
498
499 let mut market_snapshots = HashMap::new();
500 for symbol in &symbols {
501 let mut snapshot = MarketSnapshot::default();
502 if let Some(price) = persisted.last_prices.get(symbol).copied() {
503 snapshot.last_trade = Some(price);
504 }
505 market_snapshots.insert(symbol.clone(), snapshot);
506 }
507
508 let metrics = LiveMetrics::new();
509 metrics.update_connection_status("public", public_connection.load(Ordering::SeqCst));
510 if let Some(flag) = &private_connection {
511 metrics.update_connection_status("private", flag.load(Ordering::SeqCst));
512 }
513 let metrics_task = spawn_metrics_server(metrics.registry(), settings.metrics_addr);
514 let dispatcher = AlertDispatcher::new(settings.alerting.webhook_url.clone());
515 let alerts = AlertManager::new(
516 settings.alerting,
517 dispatcher,
518 Some(public_connection.clone()),
519 private_connection.clone(),
520 );
521 let (private_event_tx, private_event_rx) = mpsc::channel(1024);
522 let last_private_sync = Arc::new(tokio::sync::Mutex::new(persisted.last_candle_ts));
523 let alerts = Arc::new(alerts);
524 let alert_task = alerts.spawn_watchdog();
525 let metrics = Arc::new(metrics);
526 let mut connection_monitors = Vec::new();
527 connection_monitors.push(spawn_connection_monitor(
528 shutdown.clone(),
529 public_connection.clone(),
530 metrics.clone(),
531 "public",
532 ));
533 if let Some(flag) = private_connection.clone() {
534 connection_monitors.push(spawn_connection_monitor(
535 shutdown.clone(),
536 flag,
537 metrics.clone(),
538 "private",
539 ));
540 }
541
542 if !settings.exec_backend.is_paper() {
543 let execution_engine = orchestrator.execution_engine();
544 let exec_client = execution_engine.client();
545 match settings.driver.as_str() {
546 "bybit" | "" => {
547 #[cfg(feature = "bybit")]
548 {
549 let bybit = exec_client
550 .as_ref()
551 .as_any()
552 .downcast_ref::<BybitClient>()
553 .ok_or_else(|| anyhow!("execution client is not Bybit"))?;
554 let creds = bybit
555 .get_credentials()
556 .ok_or_else(|| anyhow!("live execution requires Bybit credentials"))?;
557 spawn_bybit_private_stream(
558 creds,
559 bybit.get_ws_url(),
560 private_event_tx.clone(),
561 exec_client.clone(),
562 symbols.clone(),
563 last_private_sync.clone(),
564 private_connection.clone(),
565 metrics.clone(),
566 shutdown.clone(),
567 );
568 }
569 #[cfg(not(feature = "bybit"))]
570 {
571 bail!("driver 'bybit' is unavailable without the 'bybit' feature");
572 }
573 }
574 "binance" => {
575 #[cfg(feature = "binance")]
576 {
577 spawn_binance_private_stream(
578 exec_client.clone(),
579 exchange_ws_url.clone(),
580 private_event_tx.clone(),
581 private_connection.clone(),
582 metrics.clone(),
583 shutdown.clone(),
584 );
585 }
586 #[cfg(not(feature = "binance"))]
587 {
588 bail!("driver 'binance' is unavailable without the 'binance' feature");
589 }
590 }
591 "paper" => {}
592 other => {
593 bail!("private stream unsupported for driver '{other}'");
594 }
595 }
596 }
597
598 let recorder = if let Some(record_path) = settings.record_path.clone() {
599 let config = RecorderConfig {
600 root: record_path.clone(),
601 ..RecorderConfig::default()
602 };
603 match ParquetRecorder::spawn(config).await {
604 Ok(recorder) => {
605 info!(path = %record_path.display(), "flight recorder enabled");
606 Some(recorder)
607 }
608 Err(err) => {
609 warn!(
610 error = %err,
611 path = %record_path.display(),
612 "failed to start flight recorder"
613 );
614 None
615 }
616 }
617 } else {
618 None
619 };
620 let recorder_handle = recorder.as_ref().map(|rec| rec.handle());
621
622 let strategy = Arc::new(Mutex::new(strategy));
623 let strategy_ctx = Arc::new(Mutex::new(strategy_ctx));
624 let portfolio = Arc::new(Mutex::new(portfolio));
625 let market_cache = Arc::new(Mutex::new(market_snapshots));
626 let persisted = Arc::new(Mutex::new(persisted));
627 let orchestrator = Arc::new(orchestrator);
628 let event_bus = Arc::new(EventBus::new(2048));
629 let last_data_timestamp = Arc::new(AtomicI64::new(0));
630 let control_task = control::spawn_control_plane(
631 settings.control_addr,
632 portfolio.clone(),
633 orchestrator.clone(),
634 persisted.clone(),
635 last_data_timestamp.clone(),
636 event_bus.clone(),
637 shutdown.clone(),
638 );
639 let reconciliation_ctx = (!settings.exec_backend.is_paper()).then(|| {
640 Arc::new(ReconciliationContext::new(ReconciliationContextConfig {
641 client: orchestrator.execution_engine().client(),
642 portfolio: portfolio.clone(),
643 persisted: persisted.clone(),
644 state_repo: state_repo.clone(),
645 alerts: alerts.clone(),
646 metrics: metrics.clone(),
647 reporting_currency: settings.reporting_currency.clone(),
648 threshold: settings.reconciliation_threshold,
649 }))
650 });
651 let reconciliation_task = reconciliation_ctx.as_ref().map(|ctx| {
652 spawn_reconciliation_loop(
653 ctx.clone(),
654 shutdown.clone(),
655 settings.reconciliation_interval,
656 )
657 });
658 let subscriber_handles = spawn_event_subscribers(
659 event_bus.clone(),
660 strategy.clone(),
661 strategy_ctx.clone(),
662 orchestrator.clone(),
663 portfolio.clone(),
664 metrics.clone(),
665 alerts.clone(),
666 market_cache.clone(),
667 state_repo.clone(),
668 persisted.clone(),
669 settings.exec_backend,
670 recorder_handle.clone(),
671 last_data_timestamp.clone(),
672 driver.clone(),
673 );
674 let order_timeout_task = spawn_order_timeout_monitor(
675 orchestrator.clone(),
676 event_bus.clone(),
677 alerts.clone(),
678 shutdown.clone(),
679 );
680
681 info!(
682 symbols = ?symbols,
683 category = ?settings.category,
684 metrics_addr = %settings.metrics_addr,
685 state_path = %settings.persistence.state_path.display(),
686 persistence_engine = ?settings.persistence.engine,
687 history = settings.history,
688 "market stream ready"
689 );
690
691 for symbol in &symbols {
692 let ctx = shared_risk_context(symbol, &portfolio, &market_cache, &persisted).await;
693 orchestrator.update_risk_context(symbol.clone(), ctx);
694 }
695
696 Ok(Self {
697 market,
698 orchestrator,
699 state_repo,
700 persisted,
701 event_bus,
702 recorder,
703 control_task: Some(control_task),
704 shutdown,
705 metrics_task,
706 alert_task,
707 reconciliation_task,
708 reconciliation_ctx,
709 private_event_rx,
710 last_private_sync,
711 subscriber_handles,
712 connection_monitors,
713 order_timeout_task,
714 strategy,
715 _public_connection: public_connection,
716 _private_connection: private_connection,
717 })
718 }
719
720 async fn run(mut self) -> Result<()> {
721 info!("live session started");
722 if let Some(ctx) = self.reconciliation_ctx.as_ref() {
723 perform_state_reconciliation(ctx.as_ref())
724 .await
725 .context("initial state reconciliation failed")?;
726 }
727 let backoff = Duration::from_millis(200);
728 let mut orchestrator_timer = tokio::time::interval(Duration::from_secs(1));
729
730 'run: while !self.shutdown.triggered() {
731 let mut progressed = false;
732
733 let tick = tokio::select! {
734 res = self.market.next_tick() => Some(res),
735 _ = self.shutdown.wait() => None,
736 };
737 match tick {
738 Some(res) => {
739 if let Some(tick) = res? {
740 progressed = true;
741 self.event_bus.publish(Event::Tick(TickEvent { tick }));
742 }
743 }
744 None => break 'run,
745 }
746
747 let candle = tokio::select! {
748 res = self.market.next_candle() => Some(res),
749 _ = self.shutdown.wait() => None,
750 };
751 match candle {
752 Some(res) => {
753 if let Some(candle) = res? {
754 progressed = true;
755 self.event_bus
756 .publish(Event::Candle(CandleEvent { candle }));
757 }
758 }
759 None => break 'run,
760 }
761
762 let book = tokio::select! {
763 res = self.market.next_order_book() => Some(res),
764 _ = self.shutdown.wait() => None,
765 };
766 match book {
767 Some(res) => {
768 if let Some(book) = res? {
769 progressed = true;
770 self.event_bus
771 .publish(Event::OrderBook(OrderBookEvent { order_book: book }));
772 }
773 }
774 None => break 'run,
775 }
776
777 tokio::select! {
778 biased;
779 Some(event) = self.private_event_rx.recv() => {
780 progressed = true;
781 match event {
782 BrokerEvent::OrderUpdate(order) => {
783 info!(
784 order_id = %order.id,
785 status = ?order.status,
786 symbol = %order.request.symbol,
787 "received private order update"
788 );
789 self.event_bus
790 .publish(Event::OrderUpdate(OrderUpdateEvent { order }));
791 }
792 BrokerEvent::Fill(fill) => {
793 info!(
794 order_id = %fill.order_id,
795 symbol = %fill.symbol,
796 qty = %fill.fill_quantity,
797 price = %fill.fill_price,
798 "received private fill"
799 );
800 self.event_bus.publish(Event::Fill(FillEvent { fill }));
801 }
802 }
803 }
804 _ = orchestrator_timer.tick() => {
805 if let Err(e) = self.orchestrator.on_timer_tick().await {
807 error!("Orchestrator timer tick failed: {}", e);
808 }
809 }
810 _ = self.shutdown.wait() => break 'run,
811 else => {}
812 }
813
814 if !progressed && !self.shutdown.sleep(backoff).await {
815 break;
816 }
817 }
818 info!("live session stopping");
819 self.metrics_task.abort();
820 if let Some(handle) = self.alert_task.take() {
821 handle.abort();
822 }
823 if let Some(handle) = self.reconciliation_task.take() {
824 handle.abort();
825 }
826 self.order_timeout_task.abort();
827 for handle in self.subscriber_handles.drain(..) {
828 handle.abort();
829 }
830 for handle in self.connection_monitors.drain(..) {
831 handle.abort();
832 }
833 if let Err(err) = persist_state(
834 self.state_repo.clone(),
835 self.persisted.clone(),
836 Some(self.strategy.clone()),
837 )
838 .await
839 {
840 warn!(error = %err, "failed to persist shutdown state");
841 }
842 if let Some(task) = self.control_task.take() {
843 if let Err(err) = task.await {
844 warn!(error = %err, "control plane server task aborted");
845 }
846 }
847 if let Some(recorder) = self.recorder.take() {
848 if let Err(err) = recorder.shutdown().await {
849 warn!(error = %err, "failed to flush flight recorder");
850 }
851 }
852 Ok(())
853 }
854}
855
856struct ReconciliationContext {
857 client: Arc<dyn ExecutionClient>,
858 portfolio: Arc<Mutex<Portfolio>>,
859 persisted: Arc<Mutex<LiveState>>,
860 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
861 alerts: Arc<AlertManager>,
862 metrics: Arc<LiveMetrics>,
863 reporting_currency: Symbol,
864 threshold: Decimal,
865}
866
867struct ReconciliationContextConfig {
868 client: Arc<dyn ExecutionClient>,
869 portfolio: Arc<Mutex<Portfolio>>,
870 persisted: Arc<Mutex<LiveState>>,
871 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
872 alerts: Arc<AlertManager>,
873 metrics: Arc<LiveMetrics>,
874 reporting_currency: Symbol,
875 threshold: Decimal,
876}
877
878impl ReconciliationContext {
879 fn new(config: ReconciliationContextConfig) -> Self {
880 let ReconciliationContextConfig {
881 client,
882 portfolio,
883 persisted,
884 state_repo,
885 alerts,
886 metrics,
887 reporting_currency,
888 threshold,
889 } = config;
890 let min_threshold = Decimal::new(1, 6); let threshold = if threshold <= Decimal::ZERO {
892 min_threshold
893 } else {
894 threshold
895 };
896 Self {
897 client,
898 portfolio,
899 persisted,
900 state_repo,
901 alerts,
902 metrics,
903 reporting_currency,
904 threshold,
905 }
906 }
907}
908
909fn spawn_reconciliation_loop(
910 ctx: Arc<ReconciliationContext>,
911 shutdown: ShutdownSignal,
912 interval: Duration,
913) -> JoinHandle<()> {
914 tokio::spawn(async move {
915 while shutdown.sleep(interval).await {
916 if let Err(err) = perform_state_reconciliation(ctx.as_ref()).await {
917 error!(error = %err, "periodic state reconciliation failed");
918 }
919 }
920 })
921}
922
923async fn perform_state_reconciliation(ctx: &ReconciliationContext) -> Result<()> {
924 info!("running state reconciliation");
925 let remote_positions = ctx
926 .client
927 .positions()
928 .await
929 .context("failed to fetch remote positions")?;
930 let remote_balances = ctx
931 .client
932 .account_balances()
933 .await
934 .context("failed to fetch remote balances")?;
935 let (local_positions, local_cash) = {
936 let guard = ctx.portfolio.lock().await;
937 (guard.positions(), guard.cash())
938 };
939
940 let remote_map = positions_to_map(remote_positions);
941 let local_map = positions_to_map(local_positions);
942 let mut tracked_symbols: HashSet<String> = HashSet::new();
943 tracked_symbols.extend(remote_map.keys().cloned());
944 tracked_symbols.extend(local_map.keys().cloned());
945
946 let mut severe_findings = Vec::new();
947 for symbol in tracked_symbols {
948 let local_qty = local_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
949 let remote_qty = remote_map.get(&symbol).copied().unwrap_or(Decimal::ZERO);
950 let diff = (local_qty - remote_qty).abs();
951 let diff_value = diff.to_f64().unwrap_or(0.0);
952 ctx.metrics.update_position_diff(&symbol, diff_value);
953 if diff > Decimal::ZERO {
954 warn!(
955 symbol = %symbol,
956 local = %local_qty,
957 remote = %remote_qty,
958 diff = %diff,
959 "position mismatch detected during reconciliation"
960 );
961 let pct = normalize_diff(diff, remote_qty);
962 if pct >= ctx.threshold {
963 error!(
964 symbol = %symbol,
965 local = %local_qty,
966 remote = %remote_qty,
967 diff = %diff,
968 pct = %pct,
969 "position mismatch exceeds threshold"
970 );
971 severe_findings.push(format!(
972 "{symbol} local={local_qty} remote={remote_qty} diff={diff}"
973 ));
974 }
975 }
976 }
977
978 let reporting = ctx.reporting_currency.as_str();
979 let remote_cash = remote_balances
980 .iter()
981 .find(|balance| balance.currency == reporting)
982 .map(|balance| balance.available)
983 .unwrap_or_else(|| Decimal::ZERO);
984 let cash_diff = (remote_cash - local_cash).abs();
985 ctx.metrics
986 .update_balance_diff(reporting, cash_diff.to_f64().unwrap_or(0.0));
987 if cash_diff > Decimal::ZERO {
988 warn!(
989 currency = %reporting,
990 local = %local_cash,
991 remote = %remote_cash,
992 diff = %cash_diff,
993 "balance mismatch detected during reconciliation"
994 );
995 let pct = normalize_diff(cash_diff, remote_cash);
996 if pct >= ctx.threshold {
997 error!(
998 currency = %reporting,
999 local = %local_cash,
1000 remote = %remote_cash,
1001 diff = %cash_diff,
1002 pct = %pct,
1003 "balance mismatch exceeds threshold"
1004 );
1005 severe_findings.push(format!(
1006 "{reporting} balance local={local_cash} remote={remote_cash} diff={cash_diff}"
1007 ));
1008 }
1009 }
1010
1011 if severe_findings.is_empty() {
1012 info!("state reconciliation complete with no critical divergence");
1013 return Ok(());
1014 }
1015
1016 let alert_body = severe_findings.join("; ");
1017 ctx.alerts
1018 .notify("State reconciliation divergence", &alert_body)
1019 .await;
1020 enforce_liquidate_only(ctx).await;
1021 Ok(())
1022}
1023
1024async fn enforce_liquidate_only(ctx: &ReconciliationContext) {
1025 let snapshot = {
1026 let mut guard = ctx.portfolio.lock().await;
1027 if !guard.set_liquidate_only(true) {
1028 return;
1029 }
1030 info!("entering liquidate-only mode due to reconciliation divergence");
1031 guard.snapshot()
1032 };
1033 {
1034 let mut state = ctx.persisted.lock().await;
1035 state.portfolio = Some(snapshot);
1036 }
1037 if let Err(err) = persist_state(ctx.state_repo.clone(), ctx.persisted.clone(), None).await {
1038 warn!(error = %err, "failed to persist liquidate-only transition");
1039 }
1040}
1041
1042fn positions_to_map(positions: Vec<Position>) -> HashMap<String, Decimal> {
1043 let mut map = HashMap::new();
1044 for position in positions {
1045 map.insert(position.symbol.clone(), position_signed_qty(&position));
1046 }
1047 map
1048}
1049
1050fn position_signed_qty(position: &Position) -> Decimal {
1051 match position.side {
1052 Some(Side::Buy) => position.quantity,
1053 Some(Side::Sell) => -position.quantity,
1054 None => Decimal::ZERO,
1055 }
1056}
1057
1058fn normalize_diff(diff: Decimal, reference: Decimal) -> Decimal {
1059 if diff <= Decimal::ZERO {
1060 Decimal::ZERO
1061 } else {
1062 let denominator = std::cmp::max(reference.abs(), Decimal::ONE);
1063 diff / denominator
1064 }
1065}
1066
1067fn build_exchange_payload(exchange: &ExchangeConfig, settings: &LiveSessionSettings) -> Value {
1068 let mut payload = serde_json::Map::new();
1069 payload.insert("rest_url".into(), Value::String(exchange.rest_url.clone()));
1070 payload.insert("ws_url".into(), Value::String(exchange.ws_url.clone()));
1071 payload.insert("api_key".into(), Value::String(exchange.api_key.clone()));
1072 payload.insert(
1073 "api_secret".into(),
1074 Value::String(exchange.api_secret.clone()),
1075 );
1076 payload.insert(
1077 "category".into(),
1078 Value::String(settings.category.as_path().to_string()),
1079 );
1080 payload.insert(
1081 "orderbook_depth".into(),
1082 Value::Number(serde_json::Number::from(settings.orderbook_depth as u64)),
1083 );
1084 if let Value::Object(extra) = exchange.params.clone() {
1085 for (key, value) in extra {
1086 payload.insert(key, value);
1087 }
1088 }
1089 Value::Object(payload)
1090}
1091
1092#[derive(Default)]
1093struct MarketSnapshot {
1094 last_trade: Option<Price>,
1095 last_trade_ts: Option<DateTime<Utc>>,
1096 last_candle: Option<Candle>,
1097}
1098
1099impl MarketSnapshot {
1100 fn price(&self) -> Option<Price> {
1101 self.last_trade
1102 .or_else(|| self.last_candle.as_ref().map(|c| c.close))
1103 }
1104}
1105
1106pub struct ShutdownSignal {
1107 flag: Arc<AtomicBool>,
1108 notify: Arc<Notify>,
1109}
1110
1111impl ShutdownSignal {
1112 pub fn new() -> Self {
1113 let flag = Arc::new(AtomicBool::new(false));
1114 let notify = Arc::new(Notify::new());
1115 let flag_clone = flag.clone();
1116 let notify_clone = notify.clone();
1117 tokio::spawn(async move {
1118 if tokio::signal::ctrl_c().await.is_ok() {
1119 flag_clone.store(true, Ordering::SeqCst);
1120 notify_clone.notify_waiters();
1121 }
1122 });
1123 Self { flag, notify }
1124 }
1125
1126 pub fn trigger(&self) {
1127 self.flag.store(true, Ordering::SeqCst);
1128 self.notify.notify_waiters();
1129 }
1130
1131 pub fn triggered(&self) -> bool {
1132 self.flag.load(Ordering::SeqCst)
1133 }
1134
1135 pub async fn wait(&self) {
1136 if self.triggered() {
1137 return;
1138 }
1139 self.notify.notified().await;
1140 }
1141
1142 async fn sleep(&self, duration: Duration) -> bool {
1143 tokio::select! {
1144 _ = tokio::time::sleep(duration) => true,
1145 _ = self.notify.notified() => false,
1146 }
1147 }
1148}
1149
1150impl Default for ShutdownSignal {
1151 fn default() -> Self {
1152 Self::new()
1153 }
1154}
1155
1156impl Clone for ShutdownSignal {
1157 fn clone(&self) -> Self {
1158 Self {
1159 flag: self.flag.clone(),
1160 notify: self.notify.clone(),
1161 }
1162 }
1163}
1164
1165#[allow(clippy::too_many_arguments)]
1166fn spawn_event_subscribers(
1167 bus: Arc<EventBus>,
1168 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1169 strategy_ctx: Arc<Mutex<StrategyContext>>,
1170 orchestrator: Arc<OrderOrchestrator>,
1171 portfolio: Arc<Mutex<Portfolio>>,
1172 metrics: Arc<LiveMetrics>,
1173 alerts: Arc<AlertManager>,
1174 market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1175 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1176 persisted: Arc<Mutex<LiveState>>,
1177 exec_backend: ExecutionBackend,
1178 recorder: Option<RecorderHandle>,
1179 last_data_timestamp: Arc<AtomicI64>,
1180 driver: Arc<String>,
1181) -> Vec<JoinHandle<()>> {
1182 let mut handles = Vec::new();
1183 let market_recorder = recorder.clone();
1184
1185 let market_bus = bus.clone();
1186 let market_strategy = strategy.clone();
1187 let market_ctx = strategy_ctx.clone();
1188 let market_metrics = metrics.clone();
1189 let market_alerts = alerts.clone();
1190 let market_state = state_repo.clone();
1191 let market_persisted = persisted.clone();
1192 let market_portfolio = portfolio.clone();
1193 let market_snapshot = market.clone();
1194 let orchestrator_clone = orchestrator.clone();
1195 let market_data_tracker = last_data_timestamp.clone();
1196 let driver_clone = driver.clone();
1197 handles.push(tokio::spawn(async move {
1198 let recorder = market_recorder;
1199 let mut stream = market_bus.subscribe();
1200 loop {
1201 match stream.recv().await {
1202 Ok(Event::Tick(evt)) => {
1203 if let Some(handle) = recorder.as_ref() {
1204 handle.record_tick(evt.tick.clone());
1205 }
1206 if let Err(err) = process_tick_event(
1207 evt.tick,
1208 market_strategy.clone(),
1209 market_ctx.clone(),
1210 market_metrics.clone(),
1211 market_alerts.clone(),
1212 market_snapshot.clone(),
1213 market_portfolio.clone(),
1214 market_state.clone(),
1215 market_persisted.clone(),
1216 market_bus.clone(),
1217 market_data_tracker.clone(),
1218 )
1219 .await
1220 {
1221 warn!(error = %err, "tick handler failed");
1222 }
1223 }
1224 Ok(Event::Candle(evt)) => {
1225 if let Some(handle) = recorder.as_ref() {
1226 handle.record_candle(evt.candle.clone());
1227 }
1228 if let Err(err) = process_candle_event(
1229 evt.candle,
1230 market_strategy.clone(),
1231 market_ctx.clone(),
1232 market_metrics.clone(),
1233 market_alerts.clone(),
1234 market_snapshot.clone(),
1235 market_portfolio.clone(),
1236 orchestrator_clone.clone(),
1237 exec_backend,
1238 market_state.clone(),
1239 market_persisted.clone(),
1240 market_bus.clone(),
1241 market_data_tracker.clone(),
1242 )
1243 .await
1244 {
1245 warn!(error = %err, "candle handler failed");
1246 }
1247 }
1248 Ok(Event::OrderBook(evt)) => {
1249 if let Err(err) = process_order_book_event(
1250 evt.order_book,
1251 market_strategy.clone(),
1252 market_ctx.clone(),
1253 market_metrics.clone(),
1254 market_alerts.clone(),
1255 market_snapshot.clone(),
1256 market_bus.clone(),
1257 market_data_tracker.clone(),
1258 driver_clone.clone(),
1259 )
1260 .await
1261 {
1262 warn!(error = %err, "order book handler failed");
1263 }
1264 }
1265 Ok(_) => {}
1266 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1267 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1268 warn!(lag = lag, "market subscriber lagged");
1269 continue;
1270 }
1271 }
1272 }
1273 }));
1274
1275 let exec_bus = bus.clone();
1276 let exec_portfolio = portfolio.clone();
1277 let exec_market = market.clone();
1278 let exec_persisted = persisted.clone();
1279 let exec_alerts = alerts.clone();
1280 let exec_metrics = metrics.clone();
1281 let exec_orchestrator = orchestrator.clone();
1282 let exec_recorder = recorder.clone();
1283 handles.push(tokio::spawn(async move {
1284 let orchestrator = exec_orchestrator.clone();
1285 let recorder = exec_recorder;
1286 let mut stream = exec_bus.subscribe();
1287 loop {
1288 match stream.recv().await {
1289 Ok(Event::Signal(evt)) => {
1290 if let Some(handle) = recorder.as_ref() {
1291 handle.record_signal(evt.signal.clone());
1292 }
1293 if let Err(err) = process_signal_event(
1294 evt.signal,
1295 orchestrator.clone(),
1296 exec_portfolio.clone(),
1297 exec_market.clone(),
1298 exec_persisted.clone(),
1299 exec_alerts.clone(),
1300 exec_metrics.clone(),
1301 )
1302 .await
1303 {
1304 warn!(error = %err, "signal handler failed");
1305 }
1306 }
1307 Ok(_) => {}
1308 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1309 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1310 warn!(lag = lag, "signal subscriber lagged");
1311 continue;
1312 }
1313 }
1314 }
1315 }));
1316
1317 let fill_bus = bus.clone();
1318 let fill_state = state_repo.clone();
1319 let fill_orchestrator = orchestrator.clone();
1320 let fill_persisted = persisted.clone();
1321 let fill_alerts = alerts.clone();
1322 let fill_recorder = recorder.clone();
1323 handles.push(tokio::spawn(async move {
1324 let orchestrator = fill_orchestrator.clone();
1325 let persisted = fill_persisted.clone();
1326 let recorder = fill_recorder;
1327 let mut stream = fill_bus.subscribe();
1328 loop {
1329 match stream.recv().await {
1330 Ok(Event::Fill(evt)) => {
1331 if let Some(handle) = recorder.as_ref() {
1332 handle.record_fill(evt.fill.clone());
1333 }
1334 if let Err(err) = process_fill_event(
1335 evt.fill,
1336 portfolio.clone(),
1337 strategy.clone(),
1338 strategy_ctx.clone(),
1339 orchestrator.clone(),
1340 metrics.clone(),
1341 fill_alerts.clone(),
1342 fill_state.clone(),
1343 persisted.clone(),
1344 )
1345 .await
1346 {
1347 warn!(error = %err, "fill handler failed");
1348 }
1349 }
1350 Ok(_) => {}
1351 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1352 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1353 warn!(lag = lag, "fill subscriber lagged");
1354 continue;
1355 }
1356 }
1357 }
1358 }));
1359
1360 let order_bus = bus.clone();
1361 let order_persisted = persisted.clone();
1362 let order_alerts = alerts.clone();
1363 let order_orchestrator = orchestrator.clone();
1364 let order_recorder = recorder;
1367 handles.push(tokio::spawn(async move {
1368 let orchestrator = order_orchestrator.clone();
1369 let persisted = order_persisted.clone();
1370 let recorder = order_recorder;
1371 let mut stream = order_bus.subscribe();
1372 loop {
1373 match stream.recv().await {
1374 Ok(Event::OrderUpdate(evt)) => {
1375 if let Some(handle) = recorder.as_ref() {
1376 handle.record_order(evt.order.clone());
1377 }
1378 if let Err(err) = process_order_update_event(
1379 evt.order,
1380 orchestrator.clone(),
1381 order_alerts.clone(),
1382 state_repo.clone(),
1383 persisted.clone(),
1384 )
1385 .await
1386 {
1387 warn!(error = %err, "order update handler failed");
1388 }
1389 }
1390 Ok(_) => {}
1391 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
1392 Err(tokio::sync::broadcast::error::RecvError::Lagged(lag)) => {
1393 warn!(lag = lag, "order subscriber lagged");
1394 continue;
1395 }
1396 }
1397 }
1398 }));
1399
1400 handles
1401}
1402
1403#[allow(clippy::too_many_arguments)]
1404async fn process_tick_event(
1405 tick: Tick,
1406 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1407 strategy_ctx: Arc<Mutex<StrategyContext>>,
1408 metrics: Arc<LiveMetrics>,
1409 alerts: Arc<AlertManager>,
1410 market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1411 portfolio: Arc<Mutex<Portfolio>>,
1412 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1413 persisted: Arc<Mutex<LiveState>>,
1414 bus: Arc<EventBus>,
1415 last_data_timestamp: Arc<AtomicI64>,
1416) -> Result<()> {
1417 metrics.inc_tick();
1418 metrics.update_staleness(0.0);
1419 metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1420 last_data_timestamp.store(tick.exchange_timestamp.timestamp(), Ordering::SeqCst);
1421 alerts.heartbeat().await;
1422 {
1423 let mut guard = market.lock().await;
1424 if let Some(snapshot) = guard.get_mut(&tick.symbol) {
1425 snapshot.last_trade = Some(tick.price);
1426 snapshot.last_trade_ts = Some(tick.exchange_timestamp);
1427 }
1428 }
1429 let mut drawdown_triggered = false;
1430 let mut snapshot_on_trigger = None;
1431 {
1432 let mut guard = portfolio.lock().await;
1433 let was_liquidate_only = guard.liquidate_only();
1434 match guard.update_market_data(&tick.symbol, tick.price) {
1435 Ok(_) => {
1436 if !was_liquidate_only && guard.liquidate_only() {
1437 drawdown_triggered = true;
1438 snapshot_on_trigger = Some(guard.snapshot());
1439 }
1440 }
1441 Err(err) => {
1442 warn!(
1443 symbol = %tick.symbol,
1444 error = %err,
1445 "failed to refresh market data"
1446 );
1447 }
1448 }
1449 }
1450 {
1451 let mut state = persisted.lock().await;
1452 state.last_prices.insert(tick.symbol.clone(), tick.price);
1453 if drawdown_triggered {
1454 if let Some(snapshot) = snapshot_on_trigger.take() {
1455 state.portfolio = Some(snapshot);
1456 }
1457 }
1458 }
1459 if drawdown_triggered {
1460 persist_state(
1461 state_repo.clone(),
1462 persisted.clone(),
1463 Some(strategy.clone()),
1464 )
1465 .await?;
1466 alert_liquidate_only(alerts.clone()).await;
1467 }
1468 {
1469 let mut ctx = strategy_ctx.lock().await;
1470 ctx.push_tick(tick.clone());
1471 let lock_start = Instant::now();
1472 let mut strat = strategy.lock().await;
1473 log_strategy_lock("tick", lock_start.elapsed());
1474 let call_start = Instant::now();
1475 strat
1476 .on_tick(&ctx, &tick)
1477 .await
1478 .context("strategy failure on tick event")?;
1479 log_strategy_call("tick", call_start.elapsed());
1480 }
1481 emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1482 debug!(symbol = %tick.symbol, price = %tick.price, "completed tick processing");
1483 Ok(())
1484}
1485
1486#[allow(clippy::too_many_arguments)]
1487async fn process_candle_event(
1488 candle: Candle,
1489 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1490 strategy_ctx: Arc<Mutex<StrategyContext>>,
1491 metrics: Arc<LiveMetrics>,
1492 alerts: Arc<AlertManager>,
1493 market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1494 portfolio: Arc<Mutex<Portfolio>>,
1495 orchestrator: Arc<OrderOrchestrator>,
1496 exec_backend: ExecutionBackend,
1497 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1498 persisted: Arc<Mutex<LiveState>>,
1499 bus: Arc<EventBus>,
1500 last_data_timestamp: Arc<AtomicI64>,
1501) -> Result<()> {
1502 metrics.inc_candle();
1503 metrics.update_staleness(0.0);
1504 metrics.update_last_data_timestamp(Utc::now().timestamp() as f64);
1505 last_data_timestamp.store(candle.timestamp.timestamp(), Ordering::SeqCst);
1506 alerts.heartbeat().await;
1507 metrics.update_price(&candle.symbol, candle.close.to_f64().unwrap_or(0.0));
1508 {
1509 let mut guard = market.lock().await;
1510 if let Some(snapshot) = guard.get_mut(&candle.symbol) {
1511 snapshot.last_candle = Some(candle.clone());
1512 snapshot.last_trade = Some(candle.close);
1513 }
1514 }
1515 if exec_backend.is_paper() {
1516 let client = orchestrator.execution_engine().client();
1517 if let Some(paper) = client.as_any().downcast_ref::<PaperExecutionClient>() {
1518 paper.update_price(&candle.symbol, candle.close);
1519 }
1520 }
1521 let mut candle_drawdown_triggered = false;
1522 let mut candle_snapshot = None;
1523 {
1524 let mut guard = portfolio.lock().await;
1525 let was_liquidate_only = guard.liquidate_only();
1526 match guard.update_market_data(&candle.symbol, candle.close) {
1527 Ok(_) => {
1528 if !was_liquidate_only && guard.liquidate_only() {
1529 candle_drawdown_triggered = true;
1530 candle_snapshot = Some(guard.snapshot());
1531 }
1532 }
1533 Err(err) => {
1534 warn!(
1535 symbol = %candle.symbol,
1536 error = %err,
1537 "failed to refresh market data"
1538 );
1539 }
1540 }
1541 }
1542 if candle_drawdown_triggered {
1543 if let Some(snapshot) = candle_snapshot.take() {
1544 let mut persisted_guard = persisted.lock().await;
1545 persisted_guard.portfolio = Some(snapshot);
1546 }
1547 alert_liquidate_only(alerts.clone()).await;
1548 }
1549 {
1550 let mut ctx = strategy_ctx.lock().await;
1551 ctx.push_candle(candle.clone());
1552 let lock_start = Instant::now();
1553 let mut strat = strategy.lock().await;
1554 log_strategy_lock("candle", lock_start.elapsed());
1555 let call_start = Instant::now();
1556 strat
1557 .on_candle(&ctx, &candle)
1558 .await
1559 .context("strategy failure on candle event")?;
1560 log_strategy_call("candle", call_start.elapsed());
1561 }
1562 {
1563 let mut snapshot = persisted.lock().await;
1564 snapshot.last_candle_ts = Some(candle.timestamp);
1565 snapshot
1566 .last_prices
1567 .insert(candle.symbol.clone(), candle.close);
1568 }
1569 persist_state(
1570 state_repo.clone(),
1571 persisted.clone(),
1572 Some(strategy.clone()),
1573 )
1574 .await?;
1575 let ctx = shared_risk_context(&candle.symbol, &portfolio, &market, &persisted).await;
1576 orchestrator.update_risk_context(candle.symbol.clone(), ctx);
1577 emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1578 debug!(symbol = %candle.symbol, close = %candle.close, "completed candle processing");
1579 Ok(())
1580}
1581
1582#[allow(clippy::too_many_arguments)]
1583async fn process_order_book_event(
1584 mut book: OrderBook,
1585 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1586 strategy_ctx: Arc<Mutex<StrategyContext>>,
1587 metrics: Arc<LiveMetrics>,
1588 alerts: Arc<AlertManager>,
1589 _market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1590 bus: Arc<EventBus>,
1591 last_data_timestamp: Arc<AtomicI64>,
1592 driver: Arc<String>,
1593) -> Result<()> {
1594 metrics.update_staleness(0.0);
1595 alerts.heartbeat().await;
1596 last_data_timestamp.store(book.timestamp.timestamp(), Ordering::SeqCst);
1597 let driver_name = driver.as_str();
1598 let local_checksum = if let Some(cs) = book.local_checksum {
1599 cs
1600 } else {
1601 let computed = book.computed_checksum(None);
1602 book.local_checksum = Some(computed);
1603 computed
1604 };
1605 if let Some(expected) = book.exchange_checksum {
1606 if expected != local_checksum {
1607 metrics.inc_checksum_mismatch(driver_name, &book.symbol);
1608 alerts
1609 .order_book_checksum_mismatch(driver_name, &book.symbol, expected, local_checksum)
1610 .await;
1611 }
1612 }
1613 {
1614 let mut ctx = strategy_ctx.lock().await;
1615 ctx.push_order_book(book.clone());
1616 let lock_start = Instant::now();
1617 let mut strat = strategy.lock().await;
1618 log_strategy_lock("order_book", lock_start.elapsed());
1619 let call_start = Instant::now();
1620 strat
1621 .on_order_book(&ctx, &book)
1622 .await
1623 .context("strategy failure on order book")?;
1624 log_strategy_call("order_book", call_start.elapsed());
1625 }
1626 emit_signals(strategy.clone(), bus.clone(), metrics.clone()).await;
1627 Ok(())
1628}
1629
1630async fn process_signal_event(
1631 signal: Signal,
1632 orchestrator: Arc<OrderOrchestrator>,
1633 portfolio: Arc<Mutex<Portfolio>>,
1634 market: Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1635 persisted: Arc<Mutex<LiveState>>,
1636 alerts: Arc<AlertManager>,
1637 metrics: Arc<LiveMetrics>,
1638) -> Result<()> {
1639 let ctx = shared_risk_context(&signal.symbol, &portfolio, &market, &persisted).await;
1640 orchestrator.update_risk_context(signal.symbol.clone(), ctx);
1641 match orchestrator.on_signal(&signal, &ctx).await {
1642 Ok(()) => {
1643 alerts.reset_order_failures().await;
1644 }
1645 Err(err) => {
1646 metrics.inc_order_failure();
1647 alerts
1648 .order_failure(&format!("orchestrator error: {err}"))
1649 .await;
1650 }
1651 }
1652 Ok(())
1653}
1654
1655fn log_strategy_lock(event: &str, wait: Duration) {
1656 let wait_ms = wait.as_secs_f64() * 1000.0;
1657 if wait >= STRATEGY_LOCK_WARN_THRESHOLD {
1658 warn!(target: "strategy", event, wait_ms, "strategy lock wait exceeded threshold");
1659 } else {
1660 trace!(target: "strategy", event, wait_ms, "strategy lock acquired");
1661 }
1662}
1663
1664fn log_strategy_call(event: &str, elapsed: Duration) {
1665 let duration_ms = elapsed.as_secs_f64() * 1000.0;
1666 if elapsed >= STRATEGY_CALL_WARN_THRESHOLD {
1667 warn!(target: "strategy", event, duration_ms, "strategy call latency above threshold");
1668 } else {
1669 trace!(target: "strategy", event, duration_ms, "strategy call completed");
1670 }
1671}
1672
1673#[allow(clippy::too_many_arguments)]
1674async fn process_fill_event(
1675 fill: Fill,
1676 portfolio: Arc<Mutex<Portfolio>>,
1677 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1678 strategy_ctx: Arc<Mutex<StrategyContext>>,
1679 orchestrator: Arc<OrderOrchestrator>,
1680 metrics: Arc<LiveMetrics>,
1681 alerts: Arc<AlertManager>,
1682 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1683 persisted: Arc<Mutex<LiveState>>,
1684) -> Result<()> {
1685 let mut drawdown_triggered = false;
1686 {
1687 let mut guard = portfolio.lock().await;
1688 let was_liquidate_only = guard.liquidate_only();
1689 guard
1690 .apply_fill(&fill)
1691 .context("Failed to apply fill to portfolio")?;
1692 if !was_liquidate_only && guard.liquidate_only() {
1693 drawdown_triggered = true;
1694 }
1695 let snapshot = guard.snapshot();
1696 let mut persisted_guard = persisted.lock().await;
1697 persisted_guard.portfolio = Some(snapshot);
1698 }
1699 {
1700 let positions = {
1701 let guard = portfolio.lock().await;
1702 guard.positions()
1703 };
1704 let mut ctx = strategy_ctx.lock().await;
1705 ctx.update_positions(positions);
1706 }
1707 orchestrator.on_fill(&fill).await.ok();
1708 {
1709 let ctx = strategy_ctx.lock().await;
1710 let lock_start = Instant::now();
1711 let mut strat = strategy.lock().await;
1712 log_strategy_lock("fill", lock_start.elapsed());
1713 let call_start = Instant::now();
1714 strat
1715 .on_fill(&ctx, &fill)
1716 .await
1717 .context("Strategy failed on fill event")?;
1718 log_strategy_call("fill", call_start.elapsed());
1719 }
1720 let equity = {
1721 let guard = portfolio.lock().await;
1722 guard.equity()
1723 };
1724 if let Some(value) = equity.to_f64() {
1725 metrics.update_equity(value);
1726 }
1727 alerts.update_equity(equity).await;
1728 metrics.inc_order();
1729 alerts
1730 .notify(
1731 "Order Filled",
1732 &format!(
1733 "order filled: {}@{} ({})",
1734 fill.fill_quantity,
1735 fill.fill_price,
1736 match fill.side {
1737 Side::Buy => "buy",
1738 Side::Sell => "sell",
1739 }
1740 ),
1741 )
1742 .await;
1743 if drawdown_triggered {
1744 alert_liquidate_only(alerts.clone()).await;
1745 }
1746 persist_state(
1747 state_repo.clone(),
1748 persisted.clone(),
1749 Some(strategy.clone()),
1750 )
1751 .await?;
1752 Ok(())
1753}
1754
1755async fn process_order_update_event(
1756 order: Order,
1757 orchestrator: Arc<OrderOrchestrator>,
1758 alerts: Arc<AlertManager>,
1759 state_repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1760 persisted: Arc<Mutex<LiveState>>,
1761) -> Result<()> {
1762 orchestrator.on_order_update(&order);
1763 if matches!(order.status, OrderStatus::Rejected) {
1764 error!(
1765 order_id = %order.id,
1766 symbol = %order.request.symbol,
1767 "order rejected by exchange"
1768 );
1769 alerts.order_failure("order rejected by exchange").await;
1770 alerts
1771 .notify(
1772 "Order rejected",
1773 &format!(
1774 "Order {} for {} was rejected",
1775 order.id, order.request.symbol
1776 ),
1777 )
1778 .await;
1779 }
1780 {
1781 let mut snapshot = persisted.lock().await;
1782 let mut found = false;
1783 for existing in &mut snapshot.open_orders {
1784 if existing.id == order.id {
1785 *existing = order.clone();
1786 found = true;
1787 break;
1788 }
1789 }
1790 if !found {
1791 snapshot.open_orders.push(order.clone());
1792 }
1793 if matches!(
1794 order.status,
1795 OrderStatus::Filled | OrderStatus::Canceled | OrderStatus::Rejected
1796 ) {
1797 snapshot.open_orders.retain(|o| o.id != order.id);
1798 }
1799 }
1800 persist_state(state_repo, persisted, None).await?;
1801 Ok(())
1802}
1803
1804async fn emit_signals(
1805 strategy: Arc<Mutex<Box<dyn Strategy>>>,
1806 bus: Arc<EventBus>,
1807 metrics: Arc<LiveMetrics>,
1808) {
1809 let signals = {
1810 let mut strat = strategy.lock().await;
1811 let drained = strat.drain_signals();
1812 debug!(count = drained.len(), "strategy drained signals");
1813 drained
1814 };
1815 if signals.is_empty() {
1816 return;
1817 }
1818 metrics.inc_signals(signals.len());
1819 for signal in signals {
1820 debug!(id = %signal.id, symbol = %signal.symbol, kind = ?signal.kind, "publishing signal event");
1821 bus.publish(Event::Signal(SignalEvent { signal }));
1822 }
1823}
1824
1825async fn persist_state(
1826 repo: Arc<dyn StateRepository<Snapshot = LiveState>>,
1827 persisted: Arc<Mutex<LiveState>>,
1828 strategy: Option<Arc<Mutex<Box<dyn Strategy>>>>,
1829) -> Result<()> {
1830 if let Some(strat_lock) = strategy {
1831 let strat = strat_lock.lock().await;
1833 if let Ok(json_state) = strat.snapshot() {
1834 let mut guard = persisted.lock().await;
1835 guard.strategy_state = Some(json_state);
1836 } else {
1837 warn!("failed to snapshot strategy state");
1838 }
1839 }
1840
1841 let snapshot = {
1842 let guard = persisted.lock().await;
1843 guard.clone()
1844 };
1845 tokio::task::spawn_blocking(move || repo.save(&snapshot))
1846 .await
1847 .map_err(|err| anyhow!("state persistence task failed: {err}"))?
1848 .map_err(|err| anyhow!(err.to_string()))
1849}
1850
1851async fn shared_risk_context(
1852 symbol: &str,
1853 portfolio: &Arc<Mutex<Portfolio>>,
1854 market: &Arc<Mutex<HashMap<String, MarketSnapshot>>>,
1855 persisted: &Arc<Mutex<LiveState>>,
1856) -> RiskContext {
1857 let (signed_qty, equity, liquidate_only) = {
1858 let guard = portfolio.lock().await;
1859 (
1860 guard.signed_position_qty(symbol),
1861 guard.equity(),
1862 guard.liquidate_only(),
1863 )
1864 };
1865 let observed_price = {
1866 let guard = market.lock().await;
1867 guard.get(symbol).and_then(|snapshot| snapshot.price())
1868 };
1869 let last_price = if let Some(price) = observed_price {
1870 price
1871 } else {
1872 let guard = persisted.lock().await;
1873 guard
1874 .last_prices
1875 .get(symbol)
1876 .copied()
1877 .unwrap_or(Decimal::ZERO)
1878 };
1879 RiskContext {
1880 signed_position_qty: signed_qty,
1881 portfolio_equity: equity,
1882 last_price,
1883 liquidate_only,
1884 }
1885}
1886
1887async fn alert_liquidate_only(alerts: Arc<AlertManager>) {
1888 alerts
1889 .notify(
1890 "Max drawdown triggered",
1891 "Portfolio entered liquidate-only mode; new exposure blocked until review",
1892 )
1893 .await;
1894}
1895
1896fn spawn_connection_monitor(
1897 shutdown: ShutdownSignal,
1898 flag: Arc<AtomicBool>,
1899 metrics: Arc<LiveMetrics>,
1900 stream: &'static str,
1901) -> JoinHandle<()> {
1902 tokio::spawn(async move {
1903 loop {
1904 metrics.update_connection_status(stream, flag.load(Ordering::SeqCst));
1905 if !shutdown.sleep(Duration::from_secs(5)).await {
1906 break;
1907 }
1908 }
1909 })
1910}
1911
1912fn spawn_order_timeout_monitor(
1913 orchestrator: Arc<OrderOrchestrator>,
1914 bus: Arc<EventBus>,
1915 alerts: Arc<AlertManager>,
1916 shutdown: ShutdownSignal,
1917) -> JoinHandle<()> {
1918 tokio::spawn(async move {
1919 let mut ticker = tokio::time::interval(tesser_execution::orchestrator::ORDER_POLL_INTERVAL);
1920 loop {
1921 ticker.tick().await;
1922 if shutdown.triggered() {
1923 break;
1924 }
1925 match orchestrator.poll_stale_orders().await {
1926 Ok(updates) => {
1927 for order in updates {
1928 if matches!(order.status, OrderStatus::Rejected | OrderStatus::Canceled) {
1929 let message = format!(
1930 "Order {} for {} timed out after {}s",
1931 order.id,
1932 order.request.symbol,
1933 tesser_execution::orchestrator::ORDER_TIMEOUT.as_secs()
1934 );
1935 error!(%message);
1936 alerts.order_failure(&message).await;
1937 alerts.notify("Order timeout", &message).await;
1938 }
1939 bus.publish(Event::OrderUpdate(OrderUpdateEvent { order }));
1940 }
1941 }
1942 Err(err) => {
1943 warn!(error = %err, "order timeout monitor failed");
1944 }
1945 }
1946 }
1947 })
1948}
1949
1950async fn load_market_registry(
1951 client: Arc<dyn ExecutionClient>,
1952 settings: &LiveSessionSettings,
1953) -> Result<Arc<MarketRegistry>> {
1954 if let Some(path) = &settings.markets_file {
1955 let registry = MarketRegistry::load_from_file(path)
1956 .with_context(|| format!("failed to load markets from {}", path.display()))?;
1957 return Ok(Arc::new(registry));
1958 }
1959
1960 if settings.exec_backend.is_paper() {
1961 return Err(anyhow!(
1962 "paper execution requires --markets-file when exchange metadata is unavailable"
1963 ));
1964 }
1965
1966 let instruments = client
1967 .list_instruments(settings.category.as_path())
1968 .await
1969 .context("failed to fetch instruments from execution client")?;
1970 let registry =
1971 MarketRegistry::from_instruments(instruments).map_err(|err| anyhow!(err.to_string()))?;
1972 Ok(Arc::new(registry))
1973}
1974
1975#[cfg(feature = "bybit")]
1976#[allow(clippy::too_many_arguments)]
1977fn spawn_bybit_private_stream(
1978 creds: BybitCredentials,
1979 ws_url: String,
1980 private_tx: mpsc::Sender<BrokerEvent>,
1981 exec_client: Arc<dyn ExecutionClient>,
1982 symbols: Vec<String>,
1983 last_sync: Arc<tokio::sync::Mutex<Option<DateTime<Utc>>>>,
1984 private_connection_flag: Option<Arc<AtomicBool>>,
1985 metrics: Arc<LiveMetrics>,
1986 shutdown: ShutdownSignal,
1987) {
1988 tokio::spawn(async move {
1989 loop {
1990 match tesser_bybit::ws::connect_private(
1991 &ws_url,
1992 &creds,
1993 private_connection_flag.clone(),
1994 )
1995 .await
1996 {
1997 Ok(mut socket) => {
1998 if let Some(flag) = &private_connection_flag {
1999 flag.store(true, Ordering::SeqCst);
2000 }
2001 metrics.update_connection_status("private", true);
2002 info!("Connected to Bybit private WebSocket stream");
2003 for symbol in &symbols {
2004 match exec_client.list_open_orders(symbol).await {
2005 Ok(orders) => {
2006 for order in orders {
2007 if let Err(err) =
2008 private_tx.send(BrokerEvent::OrderUpdate(order)).await
2009 {
2010 error!("failed to send reconciled order update: {err}");
2011 }
2012 }
2013 }
2014 Err(e) => {
2015 error!("failed to reconcile open orders for {symbol}: {e}");
2016 }
2017 }
2018 }
2019 if let Some(bybit) = exec_client.as_any().downcast_ref::<BybitClient>() {
2020 let since = {
2021 let guard = last_sync.lock().await;
2022 guard.unwrap_or_else(|| Utc::now() - chrono::Duration::minutes(30))
2023 };
2024 match bybit.list_executions_since(since).await {
2025 Ok(fills) => {
2026 for fill in fills {
2027 if let Err(err) = private_tx.send(BrokerEvent::Fill(fill)).await
2028 {
2029 error!("failed to send reconciled fill: {err}");
2030 }
2031 }
2032 }
2033 Err(e) => {
2034 error!("failed to reconcile executions since {:?}: {}", since, e);
2035 }
2036 }
2037 let mut guard = last_sync.lock().await;
2038 *guard = Some(Utc::now());
2039 }
2040
2041 while let Some(msg) = socket.next().await {
2042 if shutdown.triggered() {
2043 break;
2044 }
2045 if let Ok(Message::Text(text)) = msg {
2046 if let Ok(value) = serde_json::from_str::<serde_json::Value>(&text) {
2047 if let Some(topic) = value.get("topic").and_then(|v| v.as_str()) {
2048 match topic {
2049 "order" => {
2050 if let Ok(msg) = serde_json::from_value::<
2051 PrivateMessage<BybitWsOrder>,
2052 >(
2053 value.clone()
2054 ) {
2055 for update in msg.data {
2056 if let Ok(order) = update.to_tesser_order(None)
2057 {
2058 if let Err(err) = private_tx
2059 .send(BrokerEvent::OrderUpdate(order))
2060 .await
2061 {
2062 error!(
2063 "failed to send private order update: {err}"
2064 );
2065 }
2066 }
2067 }
2068 }
2069 }
2070 "execution" => {
2071 if let Ok(msg) = serde_json::from_value::<
2072 PrivateMessage<BybitWsExecution>,
2073 >(
2074 value.clone()
2075 ) {
2076 for exec in msg.data {
2077 if let Ok(fill) = exec.to_tesser_fill() {
2078 if let Err(err) = private_tx
2079 .send(BrokerEvent::Fill(fill))
2080 .await
2081 {
2082 error!(
2083 "failed to send private fill event: {err}"
2084 );
2085 }
2086 }
2087 }
2088 }
2089 }
2090 _ => {}
2091 }
2092 }
2093 }
2094 }
2095 }
2096 }
2097 Err(e) => {
2098 if let Some(flag) = &private_connection_flag {
2099 flag.store(false, Ordering::SeqCst);
2100 }
2101 metrics.update_connection_status("private", false);
2102 error!("Bybit private WebSocket connection failed: {e}. Retrying...");
2103 tokio::time::sleep(Duration::from_secs(5)).await;
2104 }
2105 }
2106 if shutdown.triggered() {
2107 break;
2108 }
2109 }
2110 });
2111}
2112
2113#[cfg(feature = "binance")]
2114#[allow(clippy::too_many_arguments)]
2115fn spawn_binance_private_stream(
2116 exec_client: Arc<dyn ExecutionClient>,
2117 ws_url: String,
2118 private_tx: mpsc::Sender<BrokerEvent>,
2119 private_connection_flag: Option<Arc<AtomicBool>>,
2120 metrics: Arc<LiveMetrics>,
2121 shutdown: ShutdownSignal,
2122) {
2123 tokio::spawn(async move {
2124 loop {
2125 let Some(binance) = exec_client
2126 .as_ref()
2127 .as_any()
2128 .downcast_ref::<BinanceClient>()
2129 else {
2130 warn!("execution client is not Binance");
2131 return;
2132 };
2133 let listen_key = match binance.start_user_stream().await {
2134 Ok(key) => key,
2135 Err(err) => {
2136 error!("failed to start Binance user stream: {err}");
2137 tokio::time::sleep(Duration::from_secs(5)).await;
2138 continue;
2139 }
2140 };
2141 match BinanceUserDataStream::connect(&ws_url, &listen_key).await {
2142 Ok(user_stream) => {
2143 if let Some(flag) = &private_connection_flag {
2144 flag.store(true, Ordering::SeqCst);
2145 }
2146 metrics.update_connection_status("private", true);
2147 let (reconnect_tx, mut reconnect_rx) = mpsc::channel(1);
2148 let tx_orders = private_tx.clone();
2149 user_stream.on_event(move |event| {
2150 if let Some(update) = extract_order_update(&event) {
2151 if let Some(order) = order_from_update(update) {
2152 let _ = tx_orders.blocking_send(BrokerEvent::OrderUpdate(order));
2153 }
2154 if let Some(fill) = fill_from_update(update) {
2155 let _ = tx_orders.blocking_send(BrokerEvent::Fill(fill));
2156 }
2157 }
2158 if matches!(event, UserDataStreamEventsResponse::ListenKeyExpired(_)) {
2159 let _ = reconnect_tx.try_send(());
2160 }
2161 });
2162 let keepalive_client = exec_client.clone();
2163 let keepalive_handle = tokio::spawn(async move {
2164 let mut interval = tokio::time::interval(Duration::from_secs(30 * 60));
2165 loop {
2166 interval.tick().await;
2167 let Some(client) = keepalive_client
2168 .as_ref()
2169 .as_any()
2170 .downcast_ref::<BinanceClient>()
2171 else {
2172 break;
2173 };
2174 if client.keepalive_user_stream().await.is_err() {
2175 break;
2176 }
2177 }
2178 });
2179 tokio::select! {
2180 _ = reconnect_rx.recv() => {
2181 warn!("binance listen key expired; reconnecting");
2182 }
2183 _ = shutdown.wait() => {
2184 keepalive_handle.abort();
2185 let _ = user_stream.unsubscribe().await;
2186 return;
2187 }
2188 }
2189 keepalive_handle.abort();
2190 let _ = user_stream.unsubscribe().await;
2191 }
2192 Err(err) => {
2193 error!("failed to connect to Binance user stream: {err}");
2194 }
2195 }
2196 if let Some(flag) = &private_connection_flag {
2197 flag.store(false, Ordering::SeqCst);
2198 }
2199 metrics.update_connection_status("private", false);
2200 if shutdown.triggered() {
2201 break;
2202 }
2203 tokio::time::sleep(Duration::from_secs(5)).await;
2204 }
2205 });
2206}