use crate::{
EngineEvent, Sequence,
engine::{
action::{
ActionOutput,
cancel_orders::CancelOrders,
close_positions::ClosePositions,
generate_algo_orders::{GenerateAlgoOrders, GenerateAlgoOrdersOutput},
send_requests::SendRequests,
},
audit::{AuditTick, Auditor, EngineAudit, ProcessAudit, context::EngineContext},
clock::EngineClock,
command::Command,
execution_tx::ExecutionTxMap,
state::{
EngineState,
instrument::data::InstrumentDataState,
order::{in_flight_recorder::InFlightRequestRecorder, manager::OrderManager},
position::{PositionExited, PositionId},
trading::TradingState,
},
},
execution::{AccountStreamEvent, request::ExecutionRequest},
risk::RiskManager,
shutdown::SyncShutdown,
statistic::summary::TradingSummaryGenerator,
strategy::{
algo::AlgoStrategy, close_positions::ClosePositionsStrategy,
on_disconnect::OnDisconnectStrategy, on_trading_disabled::OnTradingDisabled,
},
};
use chrono::{DateTime, Utc};
use rust_decimal::Decimal;
use rustrade_data::{event::MarketEvent, streams::consumer::MarketStreamEvent};
use rustrade_execution::{
AccountEvent,
order::Order,
trade::{AssetFees, Trade, TradeId},
};
use rustrade_instrument::{
Side,
asset::AssetIndex,
exchange::ExchangeIndex,
instrument::{InstrumentIndex, kind::option::OptionKind},
};
use rustrade_integration::channel::Tx;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use tracing::{info, warn};
pub mod action;
pub mod audit;
pub mod clock;
pub mod command;
pub mod error;
pub mod execution_tx;
pub mod state;
pub mod run;
pub trait Processor<Event> {
type Audit;
fn process(&mut self, event: Event) -> Self::Audit;
}
pub fn process_with_audit<Event, Engine>(
engine: &mut Engine,
event: Event,
) -> AuditTick<Engine::Audit, EngineContext>
where
Engine: Processor<Event> + Auditor<Engine::Audit, Context = EngineContext>,
{
let output = engine.process(event);
engine.audit(output)
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct Engine<Clock, State, ExecutionTxs, Strategy, Risk> {
pub clock: Clock,
pub meta: EngineMeta,
pub state: State,
pub execution_txs: ExecutionTxs,
pub strategy: Strategy,
pub risk: Risk,
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
pub struct EngineMeta {
pub time_start: DateTime<Utc>,
pub sequence: Sequence,
}
impl<Clock, GlobalData, InstrumentData, ExecutionTxs, Strategy, Risk>
Processor<EngineEvent<InstrumentData::MarketEventKind>>
for Engine<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Strategy, Risk>
where
Clock: EngineClock + for<'a> Processor<&'a EngineEvent<InstrumentData::MarketEventKind>>,
InstrumentData: InstrumentDataState,
GlobalData: for<'a> Processor<&'a AccountEvent>
+ for<'a> Processor<&'a MarketEvent<InstrumentIndex, InstrumentData::MarketEventKind>>,
ExecutionTxs: ExecutionTxMap<ExchangeIndex, InstrumentIndex>,
Strategy: OnTradingDisabled<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Risk>
+ OnDisconnectStrategy<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Risk>
+ AlgoStrategy<State = EngineState<GlobalData, InstrumentData>>
+ ClosePositionsStrategy<State = EngineState<GlobalData, InstrumentData>>,
Risk: RiskManager<State = EngineState<GlobalData, InstrumentData>>,
{
type Audit = EngineAudit<
EngineEvent<InstrumentData::MarketEventKind>,
EngineOutput<Strategy::OnTradingDisabled, Strategy::OnDisconnect>,
>;
fn process(&mut self, event: EngineEvent<InstrumentData::MarketEventKind>) -> Self::Audit {
self.clock.process(&event);
let process_audit = match &event {
EngineEvent::Shutdown(_) => return EngineAudit::process(event),
EngineEvent::Command(command) => {
let output = self.action(command);
if let Some(unrecoverable) = output.unrecoverable_errors() {
return EngineAudit::process_with_output_and_errs(event, unrecoverable, output);
} else {
ProcessAudit::with_output(event, output)
}
}
EngineEvent::TradingStateUpdate(trading_state) => {
let trading_disabled = self.update_from_trading_state_update(*trading_state);
ProcessAudit::with_trading_state_update(event, trading_disabled)
}
EngineEvent::Account(account) => {
let output = self.update_from_account_stream(account);
ProcessAudit::with_account_update(event, output)
}
EngineEvent::Market(market) => {
let output = self.update_from_market_stream(market);
ProcessAudit::with_market_update(event, output)
}
EngineEvent::ContractExpiry(key) => {
let exited = self.process_contract_expiry(key);
let mut audit = ProcessAudit::with_event(event);
for position_exited in exited {
audit = audit.add_output(position_exited);
}
return EngineAudit::from(audit);
}
};
if let TradingState::Enabled = self.state.trading {
let output = self.generate_algo_orders();
if output.is_empty() {
EngineAudit::from(process_audit)
} else if let Some(unrecoverable) = output.unrecoverable_errors() {
EngineAudit::Process(process_audit.add_errors(unrecoverable))
} else {
EngineAudit::from(process_audit.add_output(output))
}
} else {
EngineAudit::from(process_audit)
}
}
}
impl<Clock, GlobalData, InstrumentData, ExecutionTxs, Strategy, Risk> SyncShutdown
for Engine<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Strategy, Risk>
where
ExecutionTxs: ExecutionTxMap,
{
type Result = ();
fn shutdown(&mut self) -> Self::Result {
self.execution_txs.iter().for_each(|execution_tx| {
let _send_result = execution_tx.send(ExecutionRequest::Shutdown);
});
}
}
impl<Clock, GlobalData, InstrumentData, ExecutionTxs, Strategy, Risk>
Engine<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Strategy, Risk>
{
pub fn action(&mut self, command: &Command) -> ActionOutput
where
InstrumentData: InFlightRequestRecorder,
ExecutionTxs: ExecutionTxMap,
Strategy: ClosePositionsStrategy<State = EngineState<GlobalData, InstrumentData>>,
Risk: RiskManager,
{
match &command {
Command::SendCancelRequests(requests) => {
info!(
?requests,
"Engine actioning user Command::SendCancelRequests"
);
let output = self.send_requests(requests.clone());
self.state.record_in_flight_cancels(&output.sent);
ActionOutput::CancelOrders(output)
}
Command::SendOpenRequests(requests) => {
info!(?requests, "Engine actioning user Command::SendOpenRequests");
let output = self.send_requests(requests.clone());
self.state.record_in_flight_opens(&output.sent);
ActionOutput::OpenOrders(output)
}
Command::ClosePositions(filter) => {
info!(?filter, "Engine actioning user Command::ClosePositions");
ActionOutput::ClosePositions(self.close_positions(filter))
}
Command::CancelOrders(filter) => {
info!(?filter, "Engine actioning user Command::CancelOrders");
ActionOutput::CancelOrders(self.cancel_orders(filter))
}
}
}
pub fn update_from_trading_state_update(
&mut self,
update: TradingState,
) -> Option<Strategy::OnTradingDisabled>
where
Strategy:
OnTradingDisabled<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Risk>,
{
self.state
.trading
.update(update)
.transitioned_to_disabled()
.then(|| Strategy::on_trading_disabled(self))
}
pub fn update_from_account_stream(
&mut self,
event: &AccountStreamEvent,
) -> UpdateFromAccountOutput<Strategy::OnDisconnect>
where
InstrumentData: for<'a> Processor<&'a AccountEvent>,
GlobalData: for<'a> Processor<&'a AccountEvent>,
Strategy: OnDisconnectStrategy<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Risk>,
{
match event {
AccountStreamEvent::Reconnecting(exchange) => {
self.state
.connectivity
.update_from_account_reconnecting(exchange);
UpdateFromAccountOutput::OnDisconnect(Strategy::on_disconnect(self, *exchange))
}
AccountStreamEvent::Item(event) => self
.state
.update_from_account(event)
.map(UpdateFromAccountOutput::PositionExit)
.unwrap_or(UpdateFromAccountOutput::None),
}
}
pub fn update_from_market_stream(
&mut self,
event: &MarketStreamEvent<InstrumentIndex, InstrumentData::MarketEventKind>,
) -> UpdateFromMarketOutput<Strategy::OnDisconnect>
where
InstrumentData: InstrumentDataState,
GlobalData:
for<'a> Processor<&'a MarketEvent<InstrumentIndex, InstrumentData::MarketEventKind>>,
Strategy: OnDisconnectStrategy<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Risk>,
{
match event {
MarketStreamEvent::Reconnecting(exchange) => {
self.state
.connectivity
.update_from_market_reconnecting(exchange);
UpdateFromMarketOutput::OnDisconnect(Strategy::on_disconnect(self, *exchange))
}
MarketStreamEvent::Item(event) => {
self.state.update_from_market(event);
UpdateFromMarketOutput::None
}
}
}
pub fn trading_summary_generator(&self, risk_free_return: Decimal) -> TradingSummaryGenerator
where
Clock: EngineClock,
{
TradingSummaryGenerator::init(
risk_free_return,
self.meta.time_start,
self.time(),
&self.state.instruments,
&self.state.assets,
)
}
pub fn process_contract_expiry(
&mut self,
key: &InstrumentIndex,
) -> Vec<PositionExited<AssetIndex, InstrumentIndex>>
where
Clock: EngineClock,
InstrumentData: InstrumentDataState + InFlightRequestRecorder,
ExecutionTxs: ExecutionTxMap,
{
let instrument_state = self.state.instruments.instrument_index_mut(key);
if instrument_state.expiration_processed {
return vec![];
}
let cancel_requests: Vec<_> = instrument_state
.orders
.orders()
.filter_map(Order::to_request_cancel)
.collect();
let cancels = self.send_requests(cancel_requests);
self.state.record_in_flight_cancels(&cancels.sent);
let instrument_state = self.state.instruments.instrument_index_mut(key);
if instrument_state.position.positions.is_empty() {
instrument_state.expiration_processed = true;
instrument_state.orders.clear();
instrument_state.exchange_id_to_cid.clear();
instrument_state.position_ids.clear();
instrument_state.pending_fills.clear();
return vec![];
}
use rustrade_instrument::instrument::kind::InstrumentKind;
let option_spec = match &instrument_state.instrument.kind {
InstrumentKind::Option(_) => Some((
instrument_state.instrument.underlying.base,
instrument_state.instrument.underlying.quote,
instrument_state.instrument.exchange,
)),
_ => None,
};
let spot_price = match option_spec {
Some((base_key, quote_key, exchange)) => {
let spot_matches: Vec<_> = self
.state
.instruments
.0
.values()
.filter(|s| {
matches!(&s.instrument.kind, InstrumentKind::Spot)
&& s.instrument.underlying.base == base_key
&& s.instrument.underlying.quote == quote_key
&& s.instrument.exchange == exchange
})
.collect();
if spot_matches.len() > 1 {
warn!(
count = spot_matches.len(),
"process_contract_expiry: multiple Spot instruments match the option \
underlying — using the first. Deduplicate your instrument config."
);
}
spot_matches.into_iter().next().and_then(|s| s.data.price())
}
None => self.state.instruments.instrument_index(key).data.price(),
};
let instrument_state = self.state.instruments.instrument_index_mut(key);
let Some(spot_price) = spot_price else {
warn!(
instrument = ?key,
"ContractExpiry: underlying price unavailable — cannot compute settlement. \
Ensure the underlying spot instrument is subscribed. \
Re-inject ContractExpiry once market data arrives."
);
return vec![];
};
let settlement_price = match &instrument_state.instrument.kind {
InstrumentKind::Option(contract) => {
match contract.kind {
OptionKind::Call => {
if spot_price > contract.strike {
spot_price - contract.strike
} else {
Decimal::ZERO
}
}
OptionKind::Put => {
if contract.strike > spot_price {
contract.strike - spot_price
} else {
Decimal::ZERO
}
}
}
}
_ => spot_price,
};
let position_ids: Vec<PositionId> = instrument_state
.position
.positions
.keys()
.cloned()
.collect();
let engine_time = self.time();
let mut exited = Vec::with_capacity(position_ids.len());
for pos_id in position_ids {
let instrument_state = self.state.instruments.instrument_index_mut(key);
let Some(open_position) = instrument_state.position.positions.get(&pos_id) else {
continue;
};
let closing_side = match open_position.side {
Side::Buy => Side::Sell,
Side::Sell => Side::Buy,
};
let closing_quantity = open_position.quantity_abs;
let trade_tag = format!(
"expiry-settlement-{}-{}",
pos_id,
engine_time.timestamp_micros()
);
let quote_asset = instrument_state.instrument.underlying.quote;
let settlement_trade = Trade {
id: TradeId::new(&trade_tag),
order_id: rustrade_execution::order::id::OrderId::new(&trade_tag),
instrument: *key,
strategy: rustrade_execution::order::id::StrategyId::ENGINE_EXPIRY,
time_exchange: engine_time,
side: closing_side,
price: settlement_price,
quantity: closing_quantity,
fees: AssetFees {
asset: quote_asset,
fees: Decimal::ZERO,
fees_quote: Some(Decimal::ZERO),
},
};
debug_assert_eq!(
settlement_trade.fees.fees,
Decimal::ZERO,
"settlement trade must carry zero fees before update_from_trade_with_id"
);
let contract_size = instrument_state.instrument.kind.contract_size();
if let Some(exit) = instrument_state.position.update_from_trade_with_id(
&settlement_trade,
&pos_id,
contract_size,
) {
instrument_state.tear_sheet.update_from_position(&exit);
exited.push(exit);
}
}
let instrument_state = self.state.instruments.instrument_index_mut(key);
instrument_state.expiration_processed = true;
instrument_state.orders.clear();
instrument_state.exchange_id_to_cid.clear();
instrument_state.position_ids.clear();
instrument_state.pending_fills.clear();
exited
}
}
impl<Clock, State, ExecutionTxs, Strategy, Risk> Engine<Clock, State, ExecutionTxs, Strategy, Risk>
where
Clock: EngineClock,
{
pub fn new(
clock: Clock,
state: State,
execution_txs: ExecutionTxs,
strategy: Strategy,
risk: Risk,
) -> Self {
Self {
meta: EngineMeta {
time_start: clock.time(),
sequence: Sequence(0),
},
clock,
state,
execution_txs,
strategy,
risk,
}
}
pub fn time(&self) -> DateTime<Utc> {
self.clock.time()
}
pub fn reset_metadata(&mut self) {
self.meta.time_start = self.clock.time();
self.meta.sequence = Sequence(0);
}
}
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
pub enum EngineOutput<
OnTradingDisabled,
OnDisconnect,
ExchangeKey = ExchangeIndex,
InstrumentKey = InstrumentIndex,
> {
Commanded(ActionOutput<ExchangeKey, InstrumentKey>),
OnTradingDisabled(OnTradingDisabled),
AccountDisconnect(OnDisconnect),
PositionExit(PositionExited<AssetIndex, InstrumentKey>),
MarketDisconnect(OnDisconnect),
AlgoOrders(GenerateAlgoOrdersOutput<ExchangeKey, InstrumentKey>),
}
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
pub enum UpdateTradingStateOutput<OnTradingDisabled> {
None,
OnTradingDisabled(OnTradingDisabled),
}
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
#[allow(clippy::large_enum_variant)] pub enum UpdateFromAccountOutput<OnDisconnect, InstrumentKey = InstrumentIndex> {
None,
OnDisconnect(OnDisconnect),
PositionExit(PositionExited<AssetIndex, InstrumentKey>),
}
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
pub enum UpdateFromMarketOutput<OnDisconnect> {
None,
OnDisconnect(OnDisconnect),
}
impl<OnTradingDisabled, OnDisconnect, ExchangeKey, InstrumentKey>
From<ActionOutput<ExchangeKey, InstrumentKey>>
for EngineOutput<OnTradingDisabled, OnDisconnect, ExchangeKey, InstrumentKey>
{
fn from(value: ActionOutput<ExchangeKey, InstrumentKey>) -> Self {
Self::Commanded(value)
}
}
impl<OnTradingDisabled, OnDisconnect, ExchangeKey, InstrumentKey>
From<PositionExited<AssetIndex, InstrumentKey>>
for EngineOutput<OnTradingDisabled, OnDisconnect, ExchangeKey, InstrumentKey>
{
fn from(value: PositionExited<AssetIndex, InstrumentKey>) -> Self {
Self::PositionExit(value)
}
}
impl<OnTradingDisabled, OnDisconnect, ExchangeKey, InstrumentKey>
From<GenerateAlgoOrdersOutput<ExchangeKey, InstrumentKey>>
for EngineOutput<OnTradingDisabled, OnDisconnect, ExchangeKey, InstrumentKey>
{
fn from(value: GenerateAlgoOrdersOutput<ExchangeKey, InstrumentKey>) -> Self {
Self::AlgoOrders(value)
}
}