use crate::{
engine::{
Processor,
audit::{AuditTick, Auditor, context::EngineContext},
command::Command,
state::{instrument::filter::InstrumentFilter, trading::TradingState},
},
execution::builder::ExecutionHandles,
shutdown::{AsyncShutdown, Shutdown},
};
use barter_execution::order::request::{OrderRequestCancel, OrderRequestOpen};
use barter_integration::{
channel::{Tx, UnboundedRx, UnboundedTx},
collection::{one_or_many::OneOrMany, snapshot::SnapUpdates},
};
use std::fmt::Debug;
use tokio::task::{JoinError, JoinHandle};
pub mod builder;
pub mod config;
#[allow(missing_debug_implementations)]
pub struct System<Engine, Event>
where
Engine: Processor<Event> + Auditor<Engine::Audit, Context = EngineContext>,
{
pub engine: JoinHandle<(Engine, Engine::Audit)>,
pub handles: SystemAuxillaryHandles,
pub feed_tx: UnboundedTx<Event>,
pub audit:
Option<SnapUpdates<AuditTick<Engine::Snapshot>, UnboundedRx<AuditTick<Engine::Audit>>>>,
}
impl<Engine, Event> System<Engine, Event>
where
Engine: Processor<Event> + Auditor<Engine::Audit, Context = EngineContext>,
Event: Debug + Clone + Send,
{
pub async fn shutdown(mut self) -> Result<(Engine, Engine::Audit), JoinError>
where
Event: From<Shutdown>,
{
self.send(Shutdown);
let (engine, shutdown_audit) = self.engine.await?;
self.handles.shutdown().await?;
Ok((engine, shutdown_audit))
}
pub async fn abort(self) -> Result<(Engine, Engine::Audit), JoinError>
where
Event: From<Shutdown>,
{
self.send(Shutdown);
let (engine, shutdown_audit) = self.engine.await?;
self.handles.abort();
Ok((engine, shutdown_audit))
}
pub async fn shutdown_after_backtest(self) -> Result<(Engine, Engine::Audit), JoinError>
where
Event: From<Shutdown>,
{
let Self {
engine,
handles:
SystemAuxillaryHandles {
mut execution,
market_to_engine,
account_to_engine,
},
feed_tx,
audit: _,
} = self;
market_to_engine.await?;
feed_tx
.send(Shutdown)
.expect("Engine cannot drop Feed receiver");
drop(feed_tx);
let (engine, shutdown_audit) = engine.await?;
account_to_engine.abort();
execution.shutdown().await?;
Ok((engine, shutdown_audit))
}
pub fn send_cancel_requests(&self, requests: OneOrMany<OrderRequestCancel>)
where
Event: From<Command>,
{
self.send(Command::SendCancelRequests(requests))
}
pub fn send_open_requests(&self, requests: OneOrMany<OrderRequestOpen>)
where
Event: From<Command>,
{
self.send(Command::SendOpenRequests(requests))
}
pub fn close_positions(&self, filter: InstrumentFilter)
where
Event: From<Command>,
{
self.send(Command::ClosePositions(filter))
}
pub fn cancel_orders(&self, filter: InstrumentFilter)
where
Event: From<Command>,
{
self.send(Command::CancelOrders(filter))
}
pub fn trading_state(&self, trading_state: TradingState)
where
Event: From<TradingState>,
{
self.send(trading_state)
}
pub fn take_audit(
&mut self,
) -> Option<SnapUpdates<AuditTick<Engine::Snapshot>, UnboundedRx<AuditTick<Engine::Audit>>>>
{
self.audit.take()
}
fn send<T>(&self, event: T)
where
T: Into<Event>,
{
self.feed_tx
.send(event)
.expect("Engine cannot drop Feed receiver")
}
}
#[allow(missing_debug_implementations)]
pub struct SystemAuxillaryHandles {
pub execution: ExecutionHandles,
pub market_to_engine: JoinHandle<()>,
pub account_to_engine: JoinHandle<()>,
}
impl AsyncShutdown for SystemAuxillaryHandles {
type Result = Result<(), JoinError>;
async fn shutdown(&mut self) -> Self::Result {
self.market_to_engine.abort();
self.account_to_engine.abort();
self.execution.shutdown().await
}
}
impl SystemAuxillaryHandles {
pub fn abort(self) {
self.execution
.into_iter()
.chain(std::iter::once(self.market_to_engine))
.chain(std::iter::once(self.account_to_engine))
.for_each(|handle| handle.abort());
}
}