use barter_instrument::{
exchange::{ExchangeId, ExchangeIndex},
index::IndexedInstruments,
};
use indexmap::IndexMap;
use serde::{Deserialize, Serialize};
use tracing::{info, warn};
#[derive(Debug, Clone, Eq, PartialEq, Default, Deserialize, Serialize)]
pub struct ConnectivityStates {
pub global: Health,
pub exchanges: IndexMap<ExchangeId, ConnectivityState>,
}
impl ConnectivityStates {
pub fn update_from_account_reconnecting(&mut self, exchange: &ExchangeId) {
warn!(%exchange, "EngineState received AccountStream disconnected event");
self.global = Health::Reconnecting;
self.connectivity_mut(exchange).account = Health::Reconnecting;
}
pub fn update_from_account_event(&mut self, exchange: &ExchangeIndex) {
if self.global == Health::Healthy {
return;
}
let state = self.connectivity_index_mut(exchange);
if state.account == Health::Healthy {
return;
}
info!(
%exchange,
"EngineState received AccountStream event - setting connection to Healthy"
);
state.account = Health::Healthy;
if self.exchange_states().all(ConnectivityState::all_healthy) {
info!("EngineState setting global connectivity to Healthy");
self.global = Health::Healthy
}
}
pub fn update_from_market_reconnecting(&mut self, exchange: &ExchangeId) {
warn!(%exchange, "EngineState received MarketStream disconnect event");
self.global = Health::Reconnecting;
self.connectivity_mut(exchange).market_data = Health::Reconnecting
}
pub fn update_from_market_event(&mut self, exchange: &ExchangeId) {
if self.global == Health::Healthy {
return;
}
let state = self.connectivity_mut(exchange);
if state.market_data == Health::Healthy {
return;
}
info!(
%exchange,
"EngineState received MarketStream event - setting connection to Healthy"
);
state.market_data = Health::Healthy;
if self.exchange_states().all(ConnectivityState::all_healthy) {
info!("EngineState setting global connectivity to Healthy");
self.global = Health::Healthy
}
}
pub fn connectivity_index(&self, key: &ExchangeIndex) -> &ConnectivityState {
self.exchanges
.get_index(key.index())
.map(|(_key, state)| state)
.unwrap_or_else(|| panic!("ConnectivityStates does not contain: {key}"))
}
pub fn connectivity_index_mut(&mut self, key: &ExchangeIndex) -> &mut ConnectivityState {
self.exchanges
.get_index_mut(key.index())
.map(|(_key, state)| state)
.unwrap_or_else(|| panic!("ConnectivityStates does not contain: {key}"))
}
pub fn connectivity(&self, key: &ExchangeId) -> &ConnectivityState {
self.exchanges
.get(key)
.unwrap_or_else(|| panic!("ConnectivityStates does not contain: {key}"))
}
pub fn connectivity_mut(&mut self, key: &ExchangeId) -> &mut ConnectivityState {
self.exchanges
.get_mut(key)
.unwrap_or_else(|| panic!("ConnectivityStates does not contain: {key}"))
}
pub fn exchange_ids(&self) -> impl Iterator<Item = &ExchangeId> {
self.exchanges.keys()
}
pub fn exchange_states(&self) -> impl Iterator<Item = &ConnectivityState> {
self.exchanges.values()
}
}
#[derive(
Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Default, Deserialize, Serialize,
)]
pub enum Health {
Healthy,
#[default]
Reconnecting,
}
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Default, Deserialize, Serialize)]
pub struct ConnectivityState {
pub market_data: Health,
pub account: Health,
}
impl ConnectivityState {
pub fn all_healthy(&self) -> bool {
self.market_data == Health::Healthy && self.account == Health::Healthy
}
}
pub fn generate_empty_indexed_connectivity_states(
instruments: &IndexedInstruments,
) -> ConnectivityStates {
ConnectivityStates {
global: Health::Reconnecting,
exchanges: instruments
.exchanges()
.iter()
.map(|exchange| (exchange.value, ConnectivityState::default()))
.collect(),
}
}