use crate::{
Timed, engine::state::asset::filter::AssetFilter,
statistic::summary::asset::TearSheetAssetGenerator,
};
use barter_execution::balance::{AssetBalance, Balance};
use barter_instrument::{
asset::{
Asset, AssetIndex, ExchangeAsset,
name::{AssetNameExchange, AssetNameInternal},
},
index::IndexedInstruments,
};
use barter_integration::collection::{FnvIndexMap, snapshot::Snapshot};
use chrono::Utc;
use derive_more::Constructor;
use itertools::Either;
use serde::{
Deserialize, Deserializer, Serialize, Serializer,
de::{SeqAccess, Visitor},
ser::SerializeSeq,
};
use std::fmt::{self, Debug};
pub mod filter;
#[derive(Debug, Clone, PartialEq, Default)]
pub struct AssetStates(pub FnvIndexMap<ExchangeAsset<AssetNameInternal>, AssetState>);
impl Serialize for AssetStates {
fn serialize<S: Serializer>(&self, serialiser: S) -> Result<S::Ok, S::Error> {
let mut seq = serialiser.serialize_seq(Some(self.0.len()))?;
for pair in &self.0 {
seq.serialize_element(&pair)?;
}
seq.end()
}
}
impl<'de> Deserialize<'de> for AssetStates {
fn deserialize<D: Deserializer<'de>>(deserialiser: D) -> Result<Self, D::Error> {
struct AssetStatesVisitor;
impl<'de> Visitor<'de> for AssetStatesVisitor {
type Value = AssetStates;
fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "a sequence of (ExchangeAsset, AssetState) pairs")
}
fn visit_seq<A: SeqAccess<'de>>(self, mut seq: A) -> Result<Self::Value, A::Error> {
let mut map = FnvIndexMap::default();
map.reserve(seq.size_hint().unwrap_or(0));
while let Some((k, v)) = seq.next_element()? {
map.insert(k, v);
}
Ok(AssetStates(map))
}
}
deserialiser.deserialize_seq(AssetStatesVisitor)
}
}
impl AssetStates {
pub fn asset_index(&self, key: &AssetIndex) -> &AssetState {
self.0
.get_index(key.index())
.map(|(_key, state)| state)
.unwrap_or_else(|| panic!("AssetStates does not contain: {key}"))
}
pub fn asset_index_mut(&mut self, key: &AssetIndex) -> &mut AssetState {
self.0
.get_index_mut(key.index())
.map(|(_key, state)| state)
.unwrap_or_else(|| panic!("AssetStates does not contain: {key}"))
}
pub fn asset(&self, key: &ExchangeAsset<AssetNameInternal>) -> &AssetState {
self.0
.get(key)
.unwrap_or_else(|| panic!("AssetStates does not contain: {key:?}"))
}
pub fn asset_mut(&mut self, key: &ExchangeAsset<AssetNameInternal>) -> &mut AssetState {
self.0
.get_mut(key)
.unwrap_or_else(|| panic!("AssetStates does not contain: {key:?}"))
}
pub fn filtered<'a>(&'a self, filter: &'a AssetFilter) -> impl Iterator<Item = &'a AssetState> {
use filter::AssetFilter::*;
match filter {
None => Either::Left(self.assets()),
Exchanges(exchanges) => Either::Right(self.0.iter().filter_map(|(asset, state)| {
if exchanges.contains(&asset.exchange) {
Some(state)
} else {
Option::<&AssetState>::None
}
})),
}
}
pub fn assets(&self) -> impl Iterator<Item = &AssetState> {
self.0.values()
}
}
#[derive(Debug, Clone, PartialEq, PartialOrd, Deserialize, Serialize, Constructor)]
pub struct AssetState {
pub asset: Asset,
pub statistics: TearSheetAssetGenerator,
pub balance: Option<Timed<Balance>>,
}
impl AssetState {
pub fn update_from_balance<AssetKey>(&mut self, snapshot: Snapshot<&AssetBalance<AssetKey>>) {
let Some(balance) = &mut self.balance else {
self.balance = Some(Timed::new(snapshot.0.balance, snapshot.0.time_exchange));
self.statistics.update_from_balance(snapshot);
return;
};
if balance.time <= snapshot.value().time_exchange {
balance.time = snapshot.value().time_exchange;
balance.value = snapshot.value().balance;
self.statistics.update_from_balance(snapshot);
}
}
}
impl From<&AssetState> for AssetBalance<AssetNameExchange> {
fn from(value: &AssetState) -> Self {
let AssetState {
asset,
statistics: _,
balance,
} = value;
let (balance, time_exchange) = match balance {
None => (Balance::default(), Utc::now()),
Some(balance) => (balance.value, balance.time),
};
Self {
asset: asset.name_exchange.clone(),
balance,
time_exchange,
}
}
}
pub fn generate_empty_indexed_asset_states(instruments: &IndexedInstruments) -> AssetStates {
AssetStates(
instruments
.assets()
.iter()
.map(|asset| {
(
ExchangeAsset::new(
asset.value.exchange,
asset.value.asset.name_internal.clone(),
),
AssetState::new(
asset.value.asset.clone(),
TearSheetAssetGenerator::default(),
None,
),
)
})
.collect(),
)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::asset_state;
use barter_instrument::{asset::name::AssetNameExchange, exchange::ExchangeId};
use chrono::{DateTime, TimeZone, Utc};
use rust_decimal_macros::dec;
#[test]
fn test_update_from_balance_with_first_ever_snapshot() {
let mut state = AssetState {
asset: Asset {
name_internal: AssetNameInternal::new("btc"),
name_exchange: AssetNameExchange::new("btc"),
},
statistics: Default::default(),
balance: None,
};
let snapshot = Snapshot(AssetBalance {
asset: Asset {
name_internal: AssetNameInternal::new("btc"),
name_exchange: AssetNameExchange::new("btc"),
},
balance: Balance {
total: dec!(1100.0),
free: dec!(1100.0),
},
time_exchange: DateTime::<Utc>::MIN_UTC,
});
state.update_from_balance(snapshot.as_ref());
let expected = asset_state("btc", 1100.0, 1100.0, DateTime::<Utc>::MIN_UTC);
assert_eq!(state, expected)
}
#[test]
fn test_update_from_balance_with_more_recent_snapshot() {
let mut state = asset_state("btc", 1000.0, 1000.0, DateTime::<Utc>::MIN_UTC);
let snapshot = Snapshot(AssetBalance {
asset: Asset {
name_internal: AssetNameInternal::new("btc"),
name_exchange: AssetNameExchange::new("xbt"),
},
balance: Balance {
total: dec!(1100.0),
free: dec!(1100.0),
},
time_exchange: DateTime::<Utc>::MAX_UTC,
});
state.update_from_balance(snapshot.as_ref());
let expected = asset_state("btc", 1100.0, 1100.0, DateTime::<Utc>::MAX_UTC);
assert_eq!(state, expected)
}
#[test]
fn test_update_from_balance_with_equal_timestamp() {
let time = Utc.timestamp_opt(1000, 0).unwrap();
let mut state = asset_state("btc", 1000.0, 900.0, time);
let snapshot = Snapshot(AssetBalance {
asset: Asset {
name_internal: AssetNameInternal::new("btc"),
name_exchange: AssetNameExchange::new("xbt"),
},
balance: Balance {
total: dec!(1000.0),
free: dec!(800.0),
},
time_exchange: time,
});
state.update_from_balance(snapshot.as_ref());
assert_eq!(state.balance.unwrap().value.total, dec!(1000.0));
assert_eq!(state.balance.unwrap().value.free, dec!(800.0));
assert_eq!(state.balance.unwrap().time, time);
}
#[test]
fn test_asset_states_serde_round_trip_preserves_index_and_key_lookup() {
let btc_key = ExchangeAsset::new(ExchangeId::BinanceSpot, AssetNameInternal::new("btc"));
let usdt_key = ExchangeAsset::new(ExchangeId::BinanceSpot, AssetNameInternal::new("usdt"));
let btc_state = asset_state("btc", 1.0, 0.5, DateTime::<Utc>::MIN_UTC);
let usdt_state = asset_state("usdt", 1000.0, 1000.0, DateTime::<Utc>::MIN_UTC);
let original = AssetStates(
[
(btc_key.clone(), btc_state.clone()),
(usdt_key.clone(), usdt_state.clone()),
]
.into_iter()
.collect(),
);
let json = serde_json::to_string(&original).expect("serialisation failed");
let restored: AssetStates = serde_json::from_str(&json).expect("deserialisation failed");
assert_eq!(original, restored);
assert_eq!(restored.asset_index(&AssetIndex(0)), &btc_state);
assert_eq!(restored.asset_index(&AssetIndex(1)), &usdt_state);
assert_eq!(restored.asset(&btc_key), &btc_state);
assert_eq!(restored.asset(&usdt_key), &usdt_state);
}
#[test]
fn test_update_from_balance_with_stale_snapshot() {
let mut state = asset_state("btc", 1000.0, 900.0, DateTime::<Utc>::MAX_UTC);
let snapshot = Snapshot(AssetBalance {
asset: Asset {
name_internal: AssetNameInternal::new("btc"),
name_exchange: AssetNameExchange::new("xbt"),
},
balance: Balance {
total: dec!(1000.0),
free: dec!(800.0),
},
time_exchange: DateTime::<Utc>::MIN_UTC,
});
state.update_from_balance(snapshot.as_ref());
let expected = asset_state("btc", 1000.0, 900.0, DateTime::<Utc>::MAX_UTC);
assert_eq!(state, expected)
}
}