use crate::{
engine::{
Engine, Processor,
audit::{Auditor, context::EngineContext},
clock::EngineClock,
execution_tx::MultiExchangeTxMap,
run::{async_run, async_run_with_audit, sync_run, sync_run_with_audit},
state::{EngineState, builder::EngineStateBuilder, trading::TradingState},
},
error::BarterError,
execution::{
AccountStreamEvent,
builder::{ExecutionBuildFutures, ExecutionBuilder},
},
shutdown::SyncShutdown,
system::{System, SystemAuxillaryHandles, config::ExecutionConfig},
};
use barter_data::streams::reconnect::stream::ReconnectingStream;
use barter_execution::balance::Balance;
use barter_instrument::{
Keyed,
asset::{AssetIndex, ExchangeAsset, name::AssetNameInternal},
exchange::{ExchangeId, ExchangeIndex},
index::IndexedInstruments,
instrument::{Instrument, InstrumentIndex},
};
use barter_integration::{
FeedEnded, Terminal,
channel::{Channel, ChannelTxDroppable, mpsc_unbounded},
collection::snapshot::SnapUpdates,
};
use derive_more::Constructor;
use fnv::FnvHashMap;
use futures::Stream;
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, marker::PhantomData};
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize, Default)]
pub enum EngineFeedMode {
#[default]
Iterator,
Stream,
}
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize, Default)]
pub enum AuditMode {
Enabled,
#[default]
Disabled,
}
#[derive(Debug, Clone, PartialEq, PartialOrd, Constructor)]
pub struct SystemArgs<'a, Clock, Strategy, Risk, MarketStream, GlobalData, FnInstrumentData> {
pub instruments: &'a IndexedInstruments,
pub executions: Vec<ExecutionConfig>,
pub clock: Clock,
pub strategy: Strategy,
pub risk: Risk,
pub market_stream: MarketStream,
pub global_data: GlobalData,
pub instrument_data_init: FnInstrumentData,
}
#[derive(Debug)]
pub struct SystemBuilder<'a, Clock, Strategy, Risk, MarketStream, GlobalData, FnInstrumentData> {
args: SystemArgs<'a, Clock, Strategy, Risk, MarketStream, GlobalData, FnInstrumentData>,
engine_feed_mode: Option<EngineFeedMode>,
audit_mode: Option<AuditMode>,
trading_state: Option<TradingState>,
balances: FnvHashMap<ExchangeAsset<AssetNameInternal>, Balance>,
}
impl<'a, Clock, Strategy, Risk, MarketStream, GlobalData, FnInstrumentData>
SystemBuilder<'a, Clock, Strategy, Risk, MarketStream, GlobalData, FnInstrumentData>
{
pub fn new(
config: SystemArgs<'a, Clock, Strategy, Risk, MarketStream, GlobalData, FnInstrumentData>,
) -> Self {
Self {
args: config,
engine_feed_mode: None,
audit_mode: None,
trading_state: None,
balances: FnvHashMap::default(),
}
}
pub fn engine_feed_mode(self, value: EngineFeedMode) -> Self {
Self {
engine_feed_mode: Some(value),
..self
}
}
pub fn audit_mode(self, value: AuditMode) -> Self {
Self {
audit_mode: Some(value),
..self
}
}
pub fn trading_state(self, value: TradingState) -> Self {
Self {
trading_state: Some(value),
..self
}
}
pub fn balances<BalanceIter, KeyedBalance>(mut self, balances: BalanceIter) -> Self
where
BalanceIter: IntoIterator<Item = KeyedBalance>,
KeyedBalance: Into<Keyed<ExchangeAsset<AssetNameInternal>, Balance>>,
{
self.balances.extend(balances.into_iter().map(|keyed| {
let Keyed { key, value } = keyed.into();
(key, value)
}));
self
}
pub fn build<Event, InstrumentData>(
self,
) -> Result<
SystemBuild<
Engine<
Clock,
EngineState<GlobalData, InstrumentData>,
MultiExchangeTxMap,
Strategy,
Risk,
>,
Event,
MarketStream,
>,
BarterError,
>
where
Clock: EngineClock + Clone + Send + Sync + 'static,
FnInstrumentData: Fn(
&'a Keyed<InstrumentIndex, Instrument<Keyed<ExchangeIndex, ExchangeId>, AssetIndex>>,
) -> InstrumentData,
{
let Self {
args:
SystemArgs {
instruments,
executions,
clock,
strategy,
risk,
market_stream,
global_data,
instrument_data_init,
},
engine_feed_mode,
audit_mode,
trading_state,
balances,
} = self;
let engine_feed_mode = engine_feed_mode.unwrap_or_default();
let audit_mode = audit_mode.unwrap_or_default();
let trading_state = trading_state.unwrap_or_default();
let execution = executions
.into_iter()
.try_fold(
ExecutionBuilder::new(instruments),
|builder, config| match config {
ExecutionConfig::Mock(mock_config) => {
builder.add_mock(mock_config, clock.clone())
}
},
)?
.build();
let state = EngineStateBuilder::new(instruments, global_data, instrument_data_init)
.time_engine_start(clock.time())
.trading_state(trading_state)
.balances(
balances
.into_iter()
.map(|(key, value)| Keyed::new(key, value)),
)
.build();
let engine = Engine::new(clock, state, execution.execution_tx_map, strategy, risk);
Ok(SystemBuild {
engine,
engine_feed_mode,
audit_mode,
market_stream,
account_channel: execution.account_channel,
execution_build_futures: execution.futures,
phantom_event: PhantomData,
})
}
}
#[allow(missing_debug_implementations)]
pub struct SystemBuild<Engine, Event, MarketStream> {
pub engine: Engine,
pub engine_feed_mode: EngineFeedMode,
pub audit_mode: AuditMode,
pub market_stream: MarketStream,
pub account_channel: Channel<AccountStreamEvent>,
pub execution_build_futures: ExecutionBuildFutures,
phantom_event: PhantomData<Event>,
}
impl<Engine, Event, MarketStream> SystemBuild<Engine, Event, MarketStream>
where
Engine: Processor<Event>
+ Auditor<Engine::Audit, Context = EngineContext>
+ SyncShutdown
+ Send
+ 'static,
Engine::Audit: From<FeedEnded> + Terminal + Debug + Clone + Send + 'static,
Event: From<MarketStream::Item> + From<AccountStreamEvent> + Debug + Clone + Send + 'static,
MarketStream: Stream + Send + 'static,
{
pub fn new(
engine: Engine,
engine_feed_mode: EngineFeedMode,
audit_mode: AuditMode,
market_stream: MarketStream,
account_channel: Channel<AccountStreamEvent>,
execution_build_futures: ExecutionBuildFutures,
) -> Self {
Self {
engine,
engine_feed_mode,
audit_mode,
market_stream,
account_channel,
execution_build_futures,
phantom_event: Default::default(),
}
}
pub async fn init(self) -> Result<System<Engine, Event>, BarterError> {
self.init_internal(tokio::runtime::Handle::current()).await
}
pub async fn init_with_runtime(
self,
runtime: tokio::runtime::Handle,
) -> Result<System<Engine, Event>, BarterError> {
self.init_internal(runtime).await
}
async fn init_internal(
self,
runtime: tokio::runtime::Handle,
) -> Result<System<Engine, Event>, BarterError> {
let Self {
mut engine,
engine_feed_mode,
audit_mode,
market_stream,
account_channel,
execution_build_futures,
phantom_event: _,
} = self;
let execution = execution_build_futures
.init_with_runtime(runtime.clone())
.await?;
let (feed_tx, mut feed_rx) = mpsc_unbounded();
let market_to_engine = runtime
.clone()
.spawn(market_stream.forward_to(feed_tx.clone()));
let account_stream = account_channel.rx.into_stream();
let account_to_engine = runtime.spawn(account_stream.forward_to(feed_tx.clone()));
let (engine, audit) = match (engine_feed_mode, audit_mode) {
(EngineFeedMode::Iterator, AuditMode::Enabled) => {
let (audit_tx, audit_rx) = mpsc_unbounded();
let mut audit_tx = ChannelTxDroppable::new(audit_tx);
let audit = SnapUpdates {
snapshot: engine.audit_snapshot(),
updates: audit_rx,
};
let handle = runtime.spawn_blocking(move || {
let shutdown_audit =
sync_run_with_audit(&mut feed_rx, &mut engine, &mut audit_tx);
(engine, shutdown_audit)
});
(handle, Some(audit))
}
(EngineFeedMode::Iterator, AuditMode::Disabled) => {
let handle = runtime.spawn_blocking(move || {
let shutdown_audit = sync_run(&mut feed_rx, &mut engine);
(engine, shutdown_audit)
});
(handle, None)
}
(EngineFeedMode::Stream, AuditMode::Enabled) => {
let (audit_tx, audit_rx) = mpsc_unbounded();
let mut audit_tx = ChannelTxDroppable::new(audit_tx);
let audit = SnapUpdates {
snapshot: engine.audit_snapshot(),
updates: audit_rx,
};
let handle = runtime.spawn(async move {
let shutdown_audit =
async_run_with_audit(&mut feed_rx, &mut engine, &mut audit_tx).await;
(engine, shutdown_audit)
});
(handle, Some(audit))
}
(EngineFeedMode::Stream, AuditMode::Disabled) => {
let handle = runtime.spawn(async move {
let shutdown_audit = async_run(&mut feed_rx, &mut engine).await;
(engine, shutdown_audit)
});
(handle, None)
}
};
Ok(System {
engine,
handles: SystemAuxillaryHandles {
execution,
market_to_engine,
account_to_engine,
},
feed_tx,
audit,
})
}
}