use crate::{
EngineEvent, Sequence,
engine::{
action::{
ActionOutput,
cancel_orders::CancelOrders,
close_positions::ClosePositions,
generate_algo_orders::{GenerateAlgoOrders, GenerateAlgoOrdersOutput},
send_requests::SendRequests,
},
audit::{AuditTick, Auditor, EngineAudit, ProcessAudit, context::EngineContext},
clock::EngineClock,
command::Command,
execution_tx::ExecutionTxMap,
state::{
EngineState, instrument::data::InstrumentDataState,
order::in_flight_recorder::InFlightRequestRecorder, position::PositionExited,
trading::TradingState,
},
},
execution::{AccountStreamEvent, request::ExecutionRequest},
risk::RiskManager,
shutdown::SyncShutdown,
statistic::summary::TradingSummaryGenerator,
strategy::{
algo::AlgoStrategy, close_positions::ClosePositionsStrategy,
on_disconnect::OnDisconnectStrategy, on_trading_disabled::OnTradingDisabled,
},
};
use barter_data::{event::MarketEvent, streams::consumer::MarketStreamEvent};
use barter_execution::AccountEvent;
use barter_instrument::{asset::QuoteAsset, exchange::ExchangeIndex, instrument::InstrumentIndex};
use barter_integration::channel::Tx;
use chrono::{DateTime, Utc};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use tracing::info;
pub mod action;
pub mod audit;
pub mod clock;
pub mod command;
pub mod error;
pub mod execution_tx;
pub mod state;
pub mod run;
pub trait Processor<Event> {
type Audit;
fn process(&mut self, event: Event) -> Self::Audit;
}
pub fn process_with_audit<Event, Engine>(
engine: &mut Engine,
event: Event,
) -> AuditTick<Engine::Audit, EngineContext>
where
Engine: Processor<Event> + Auditor<Engine::Audit, Context = EngineContext>,
{
let output = engine.process(event);
engine.audit(output)
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct Engine<Clock, State, ExecutionTxs, Strategy, Risk> {
pub clock: Clock,
pub meta: EngineMeta,
pub state: State,
pub execution_txs: ExecutionTxs,
pub strategy: Strategy,
pub risk: Risk,
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
pub struct EngineMeta {
pub time_start: DateTime<Utc>,
pub sequence: Sequence,
}
impl<Clock, GlobalData, InstrumentData, ExecutionTxs, Strategy, Risk>
Processor<EngineEvent<InstrumentData::MarketEventKind>>
for Engine<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Strategy, Risk>
where
Clock: EngineClock + for<'a> Processor<&'a EngineEvent<InstrumentData::MarketEventKind>>,
InstrumentData: InstrumentDataState,
GlobalData: for<'a> Processor<&'a AccountEvent>
+ for<'a> Processor<&'a MarketEvent<InstrumentIndex, InstrumentData::MarketEventKind>>,
ExecutionTxs: ExecutionTxMap<ExchangeIndex, InstrumentIndex>,
Strategy: OnTradingDisabled<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Risk>
+ OnDisconnectStrategy<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Risk>
+ AlgoStrategy<State = EngineState<GlobalData, InstrumentData>>
+ ClosePositionsStrategy<State = EngineState<GlobalData, InstrumentData>>,
Risk: RiskManager<State = EngineState<GlobalData, InstrumentData>>,
{
type Audit = EngineAudit<
EngineEvent<InstrumentData::MarketEventKind>,
EngineOutput<Strategy::OnTradingDisabled, Strategy::OnDisconnect>,
>;
fn process(&mut self, event: EngineEvent<InstrumentData::MarketEventKind>) -> Self::Audit {
self.clock.process(&event);
let process_audit = match &event {
EngineEvent::Shutdown(_) => return EngineAudit::process(event),
EngineEvent::Command(command) => {
let output = self.action(command);
if let Some(unrecoverable) = output.unrecoverable_errors() {
return EngineAudit::process_with_output_and_errs(event, unrecoverable, output);
} else {
ProcessAudit::with_output(event, output)
}
}
EngineEvent::TradingStateUpdate(trading_state) => {
let trading_disabled = self.update_from_trading_state_update(*trading_state);
ProcessAudit::with_trading_state_update(event, trading_disabled)
}
EngineEvent::Account(account) => {
let output = self.update_from_account_stream(account);
ProcessAudit::with_account_update(event, output)
}
EngineEvent::Market(market) => {
let output = self.update_from_market_stream(market);
ProcessAudit::with_market_update(event, output)
}
};
if let TradingState::Enabled = self.state.trading {
let output = self.generate_algo_orders();
if output.is_empty() {
EngineAudit::from(process_audit)
} else if let Some(unrecoverable) = output.unrecoverable_errors() {
EngineAudit::Process(process_audit.add_errors(unrecoverable))
} else {
EngineAudit::from(process_audit.add_output(output))
}
} else {
EngineAudit::from(process_audit)
}
}
}
impl<Clock, GlobalData, InstrumentData, ExecutionTxs, Strategy, Risk> SyncShutdown
for Engine<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Strategy, Risk>
where
ExecutionTxs: ExecutionTxMap,
{
type Result = ();
fn shutdown(&mut self) -> Self::Result {
self.execution_txs.iter().for_each(|execution_tx| {
let _send_result = execution_tx.send(ExecutionRequest::Shutdown);
});
}
}
impl<Clock, GlobalData, InstrumentData, ExecutionTxs, Strategy, Risk>
Engine<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Strategy, Risk>
{
pub fn action(&mut self, command: &Command) -> ActionOutput
where
InstrumentData: InFlightRequestRecorder,
ExecutionTxs: ExecutionTxMap,
Strategy: ClosePositionsStrategy<State = EngineState<GlobalData, InstrumentData>>,
Risk: RiskManager,
{
match &command {
Command::SendCancelRequests(requests) => {
info!(
?requests,
"Engine actioning user Command::SendCancelRequests"
);
let output = self.send_requests(requests.clone());
self.state.record_in_flight_cancels(&output.sent);
ActionOutput::CancelOrders(output)
}
Command::SendOpenRequests(requests) => {
info!(?requests, "Engine actioning user Command::SendOpenRequests");
let output = self.send_requests(requests.clone());
self.state.record_in_flight_opens(&output.sent);
ActionOutput::OpenOrders(output)
}
Command::ClosePositions(filter) => {
info!(?filter, "Engine actioning user Command::ClosePositions");
ActionOutput::ClosePositions(self.close_positions(filter))
}
Command::CancelOrders(filter) => {
info!(?filter, "Engine actioning user Command::CancelOrders");
ActionOutput::CancelOrders(self.cancel_orders(filter))
}
}
}
pub fn update_from_trading_state_update(
&mut self,
update: TradingState,
) -> Option<Strategy::OnTradingDisabled>
where
Strategy:
OnTradingDisabled<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Risk>,
{
self.state
.trading
.update(update)
.transitioned_to_disabled()
.then(|| Strategy::on_trading_disabled(self))
}
pub fn update_from_account_stream(
&mut self,
event: &AccountStreamEvent,
) -> UpdateFromAccountOutput<Strategy::OnDisconnect>
where
InstrumentData: for<'a> Processor<&'a AccountEvent>,
GlobalData: for<'a> Processor<&'a AccountEvent>,
Strategy: OnDisconnectStrategy<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Risk>,
{
match event {
AccountStreamEvent::Reconnecting(exchange) => {
self.state
.connectivity
.update_from_account_reconnecting(exchange);
UpdateFromAccountOutput::OnDisconnect(Strategy::on_disconnect(self, *exchange))
}
AccountStreamEvent::Item(event) => self
.state
.update_from_account(event)
.map(UpdateFromAccountOutput::PositionExit)
.unwrap_or(UpdateFromAccountOutput::None),
}
}
pub fn update_from_market_stream(
&mut self,
event: &MarketStreamEvent<InstrumentIndex, InstrumentData::MarketEventKind>,
) -> UpdateFromMarketOutput<Strategy::OnDisconnect>
where
InstrumentData: InstrumentDataState,
GlobalData:
for<'a> Processor<&'a MarketEvent<InstrumentIndex, InstrumentData::MarketEventKind>>,
Strategy: OnDisconnectStrategy<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Risk>,
{
match event {
MarketStreamEvent::Reconnecting(exchange) => {
self.state
.connectivity
.update_from_market_reconnecting(exchange);
UpdateFromMarketOutput::OnDisconnect(Strategy::on_disconnect(self, *exchange))
}
MarketStreamEvent::Item(event) => {
self.state.update_from_market(event);
UpdateFromMarketOutput::None
}
}
}
pub fn trading_summary_generator(&self, risk_free_return: Decimal) -> TradingSummaryGenerator
where
Clock: EngineClock,
{
TradingSummaryGenerator::init(
risk_free_return,
self.meta.time_start,
self.time(),
&self.state.instruments,
&self.state.assets,
)
}
}
impl<Clock, State, ExecutionTxs, Strategy, Risk> Engine<Clock, State, ExecutionTxs, Strategy, Risk>
where
Clock: EngineClock,
{
pub fn new(
clock: Clock,
state: State,
execution_txs: ExecutionTxs,
strategy: Strategy,
risk: Risk,
) -> Self {
Self {
meta: EngineMeta {
time_start: clock.time(),
sequence: Sequence(0),
},
clock,
state,
execution_txs,
strategy,
risk,
}
}
pub fn time(&self) -> DateTime<Utc> {
self.clock.time()
}
pub fn reset_metadata(&mut self) {
self.meta.time_start = self.clock.time();
self.meta.sequence = Sequence(0);
}
}
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
pub enum EngineOutput<
OnTradingDisabled,
OnDisconnect,
ExchangeKey = ExchangeIndex,
InstrumentKey = InstrumentIndex,
> {
Commanded(ActionOutput<ExchangeKey, InstrumentKey>),
OnTradingDisabled(OnTradingDisabled),
AccountDisconnect(OnDisconnect),
PositionExit(PositionExited<QuoteAsset, InstrumentKey>),
MarketDisconnect(OnDisconnect),
AlgoOrders(GenerateAlgoOrdersOutput<ExchangeKey, InstrumentKey>),
}
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
pub enum UpdateTradingStateOutput<OnTradingDisabled> {
None,
OnTradingDisabled(OnTradingDisabled),
}
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
pub enum UpdateFromAccountOutput<OnDisconnect, InstrumentKey = InstrumentIndex> {
None,
OnDisconnect(OnDisconnect),
PositionExit(PositionExited<QuoteAsset, InstrumentKey>),
}
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
pub enum UpdateFromMarketOutput<OnDisconnect> {
None,
OnDisconnect(OnDisconnect),
}
impl<OnTradingDisabled, OnDisconnect, ExchangeKey, InstrumentKey>
From<ActionOutput<ExchangeKey, InstrumentKey>>
for EngineOutput<OnTradingDisabled, OnDisconnect, ExchangeKey, InstrumentKey>
{
fn from(value: ActionOutput<ExchangeKey, InstrumentKey>) -> Self {
Self::Commanded(value)
}
}
impl<OnTradingDisabled, OnDisconnect, ExchangeKey, InstrumentKey>
From<PositionExited<QuoteAsset, InstrumentKey>>
for EngineOutput<OnTradingDisabled, OnDisconnect, ExchangeKey, InstrumentKey>
{
fn from(value: PositionExited<QuoteAsset, InstrumentKey>) -> Self {
Self::PositionExit(value)
}
}
impl<OnTradingDisabled, OnDisconnect, ExchangeKey, InstrumentKey>
From<GenerateAlgoOrdersOutput<ExchangeKey, InstrumentKey>>
for EngineOutput<OnTradingDisabled, OnDisconnect, ExchangeKey, InstrumentKey>
{
fn from(value: GenerateAlgoOrdersOutput<ExchangeKey, InstrumentKey>) -> Self {
Self::AlgoOrders(value)
}
}