use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use super::sync_mode::SyncMode;
use crate::param::{AccountId, Asset, Pnl};
use crate::pretrade::policies::{
OrderSizeLimitSettings, PnlBoundsKillSwitchSettings, RateLimitSettings, RealizedPnlStorage,
SpotFundsSettings,
};
use crate::storage::{ConfigCell, LockingPolicyFactory};
pub(crate) enum ConfigEntry<Factory: LockingPolicyFactory> {
RateLimit(Factory::Config<RateLimitSettings>),
PnlBoundsKillSwitch {
settings: Factory::Config<PnlBoundsKillSwitchSettings>,
realized: RealizedPnlStorage<Factory>,
},
SpotFunds(Factory::Config<SpotFundsSettings>),
OrderSizeLimit(Factory::Config<OrderSizeLimitSettings>),
}
impl<Factory: LockingPolicyFactory> ConfigEntry<Factory> {
fn settings_type_name(&self) -> &'static str {
match self {
Self::RateLimit(_) => std::any::type_name::<Factory::Config<RateLimitSettings>>(),
Self::PnlBoundsKillSwitch { .. } => {
std::any::type_name::<Factory::Config<PnlBoundsKillSwitchSettings>>()
}
Self::SpotFunds(_) => std::any::type_name::<Factory::Config<SpotFundsSettings>>(),
Self::OrderSizeLimit(_) => {
std::any::type_name::<Factory::Config<OrderSizeLimitSettings>>()
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum ConfigureError {
UnknownPolicy {
name: String,
},
PolicyTypeMismatch {
name: String,
expected: &'static str,
found: &'static str,
},
Validation {
name: String,
message: String,
},
NestedConfiguration,
}
impl Display for ConfigureError {
fn fmt(&self, formatter: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::UnknownPolicy { name } => {
write!(formatter, "no configurable policy named {name}")
}
Self::PolicyTypeMismatch {
name,
expected,
found,
} => write!(
formatter,
"policy {name} has settings type {found}, not {expected}"
),
Self::Validation { name, message } => {
write!(formatter, "policy {name} rejected the update: {message}")
}
Self::NestedConfiguration => write!(
formatter,
"configuration is not reentrant: cannot configure from within \
another configuration callback on the same thread"
),
}
}
}
impl std::error::Error for ConfigureError {}
pub(crate) struct ConfigRegistry<Factory: LockingPolicyFactory> {
entries: HashMap<String, ConfigEntry<Factory>>,
}
impl<Factory: LockingPolicyFactory> ConfigRegistry<Factory> {
pub(crate) fn from_entries(entries: HashMap<String, ConfigEntry<Factory>>) -> Self {
Self { entries }
}
fn entry(&self, name: &str) -> Result<&ConfigEntry<Factory>, ConfigureError> {
self.entries
.get(name)
.ok_or_else(|| ConfigureError::UnknownPolicy {
name: name.to_owned(),
})
}
fn type_mismatch<Settings: Clone + 'static>(
name: &str,
entry: &ConfigEntry<Factory>,
) -> ConfigureError {
ConfigureError::PolicyTypeMismatch {
name: name.to_owned(),
expected: std::any::type_name::<Factory::Config<Settings>>(),
found: entry.settings_type_name(),
}
}
fn validation<Error: std::fmt::Display>(name: &str, error: Error) -> ConfigureError {
ConfigureError::Validation {
name: name.to_owned(),
message: error.to_string(),
}
}
}
thread_local! {
static CONFIGURING: RefCell<Vec<usize>> = const { RefCell::new(Vec::new()) };
}
struct ConfiguringGuard {
identity: usize,
}
impl ConfiguringGuard {
fn enter(identity: usize) -> Result<Self, ConfigureError> {
CONFIGURING.with(|configuring| {
let mut active = configuring.borrow_mut();
if active.contains(&identity) {
return Err(ConfigureError::NestedConfiguration);
}
active.push(identity);
Ok(Self { identity })
})
}
}
impl Drop for ConfiguringGuard {
fn drop(&mut self) {
CONFIGURING.with(|configuring| {
let mut active = configuring.borrow_mut();
if let Some(pos) = active.iter().rposition(|id| *id == self.identity) {
active.swap_remove(pos);
}
});
}
}
type RegistryFactory<Sync> = <Sync as SyncMode>::StorageLockingPolicyFactory;
pub struct Configurator<Sync: SyncMode> {
registry: <<Sync as SyncMode>::StorageLockingPolicyFactory as LockingPolicyFactory>::Shared<
ConfigRegistry<RegistryFactory<Sync>>,
>,
}
impl<Sync: SyncMode> Clone for Configurator<Sync> {
fn clone(&self) -> Self {
Self {
registry: self.registry.clone(),
}
}
}
impl<Sync: SyncMode> Configurator<Sync> {
pub(crate) fn from_inner(
registry: <<Sync as SyncMode>::StorageLockingPolicyFactory as LockingPolicyFactory>::Shared<
ConfigRegistry<RegistryFactory<Sync>>,
>,
) -> Self {
Self { registry }
}
fn enter_configuration(&self) -> Result<ConfiguringGuard, ConfigureError> {
ConfiguringGuard::enter(std::ptr::from_ref(&*self.registry) as usize)
}
pub fn rate_limit<Error: std::fmt::Display>(
&self,
name: &str,
f: impl FnOnce(&mut RateLimitSettings) -> Result<(), Error>,
) -> Result<(), ConfigureError> {
match self.registry.entry(name)? {
ConfigEntry::RateLimit(cell) => {
let _guard = self.enter_configuration()?;
cell.update(f).map_err(|error| {
ConfigRegistry::<RegistryFactory<Sync>>::validation(name, error)
})
}
entry => Err(ConfigRegistry::<_>::type_mismatch::<RateLimitSettings>(
name, entry,
)),
}
}
pub fn pnl_bounds_killswitch<Error: std::fmt::Display>(
&self,
name: &str,
f: impl FnOnce(&mut PnlBoundsKillSwitchSettings) -> Result<(), Error>,
) -> Result<(), ConfigureError> {
match self.registry.entry(name)? {
ConfigEntry::PnlBoundsKillSwitch { settings, .. } => {
let _guard = self.enter_configuration()?;
settings.update(f).map_err(|error| {
ConfigRegistry::<RegistryFactory<Sync>>::validation(name, error)
})
}
entry => Err(ConfigRegistry::<_>::type_mismatch::<
PnlBoundsKillSwitchSettings,
>(name, entry)),
}
}
pub fn set_account_pnl(
&self,
name: &str,
account: AccountId,
settlement_asset: Asset,
pnl: Pnl,
) -> Result<(), ConfigureError> {
match self.registry.entry(name)? {
ConfigEntry::PnlBoundsKillSwitch { realized, .. } => {
realized.with_mut(
(account, settlement_asset),
|| Pnl::ZERO,
|entry, _is_new| *entry = pnl,
);
Ok(())
}
entry => Err(ConfigRegistry::<_>::type_mismatch::<
PnlBoundsKillSwitchSettings,
>(name, entry)),
}
}
pub fn spot_funds<Error: std::fmt::Display>(
&self,
name: &str,
f: impl FnOnce(&mut SpotFundsSettings) -> Result<(), Error>,
) -> Result<(), ConfigureError> {
match self.registry.entry(name)? {
ConfigEntry::SpotFunds(cell) => {
let _guard = self.enter_configuration()?;
cell.update(f).map_err(|error| {
ConfigRegistry::<RegistryFactory<Sync>>::validation(name, error)
})
}
entry => Err(ConfigRegistry::<_>::type_mismatch::<SpotFundsSettings>(
name, entry,
)),
}
}
pub fn order_size_limit<Error: std::fmt::Display>(
&self,
name: &str,
f: impl FnOnce(&mut OrderSizeLimitSettings) -> Result<(), Error>,
) -> Result<(), ConfigureError> {
match self.registry.entry(name)? {
ConfigEntry::OrderSizeLimit(cell) => {
let _guard = self.enter_configuration()?;
cell.update(f).map_err(|error| {
ConfigRegistry::<RegistryFactory<Sync>>::validation(name, error)
})
}
entry => Err(ConfigRegistry::<_>::type_mismatch::<OrderSizeLimitSettings>(name, entry)),
}
}
}
#[cfg(test)]
mod tests {
use std::cell::RefCell;
use std::time::Duration;
use crate::param::{AccountId, Asset, Quantity, Side, TradeAmount, Volume};
use crate::pretrade::policies::{
OrderSizeBrokerBarrier, OrderSizeLimit, OrderSizeLimitPolicy, OrderSizeLimitPolicyError,
OrderSizeLimitSettings, RateLimit, RateLimitBrokerBarrier, RateLimitPolicy,
RateLimitPolicyError, RateLimitSettings,
};
use crate::storage::FullLocking;
use crate::{Engine, FullSyncEngine, Instrument, OrderOperation};
use super::ConfigureError;
fn broker_barrier(max_orders: usize) -> RateLimitBrokerBarrier {
RateLimitBrokerBarrier {
limit: RateLimit {
max_orders,
window: Duration::from_secs(60),
},
}
}
fn order_size_broker(max_quantity: &str) -> OrderSizeBrokerBarrier {
OrderSizeBrokerBarrier {
limit: OrderSizeLimit {
max_quantity: Quantity::from_str(max_quantity)
.expect("quantity literal must be valid"),
max_notional: Volume::from_str("1000000").expect("volume literal must be valid"),
},
}
}
fn build_engine(max_orders: usize) -> FullSyncEngine<OrderOperation> {
let builder = Engine::builder::<OrderOperation, (), ()>().full_sync();
let settings = RateLimitSettings::new(Some(broker_barrier(max_orders)), [], [], [])
.expect("broker barrier is a valid configuration");
let policy = RateLimitPolicy::<FullLocking>::new(settings, builder.storage_builder());
builder
.pre_trade(policy)
.build()
.expect("engine must build")
}
fn build_engine_with_order_size(max_orders: usize) -> FullSyncEngine<OrderOperation> {
let builder = Engine::builder::<OrderOperation, (), ()>().full_sync();
let rate_settings = RateLimitSettings::new(Some(broker_barrier(max_orders)), [], [], [])
.expect("broker barrier is a valid configuration");
let size_settings = OrderSizeLimitSettings::new(Some(order_size_broker("100")), [], [])
.expect("order-size broker barrier is a valid configuration");
let rate_policy =
RateLimitPolicy::<FullLocking>::new(rate_settings, builder.storage_builder());
let size_policy = OrderSizeLimitPolicy::<FullLocking>::new(size_settings);
builder
.pre_trade(rate_policy)
.pre_trade(size_policy)
.build()
.expect("engine must build")
}
fn order(account: u64) -> OrderOperation {
OrderOperation {
instrument: Instrument::new(
Asset::new("AAPL").expect("asset code must be valid"),
Asset::new("USD").expect("asset code must be valid"),
),
account_id: AccountId::from_u64(account),
side: Side::Buy,
trade_amount: TradeAmount::Quantity(
Quantity::from_str("1").expect("quantity literal must be valid"),
),
price: None,
}
}
#[test]
fn nested_same_thread_configuration_is_rejected_without_deadlock() {
let engine = build_engine(2);
let name = RateLimitPolicy::<FullLocking>::NAME;
for account in 0..2 {
engine
.execute_pre_trade(order(account))
.expect("order within the limit must pass");
}
let nested = RefCell::new(None);
let outer = engine
.configure()
.rate_limit::<ConfigureError>(name, |settings| {
settings
.set_broker(Some(broker_barrier(9)))
.expect("widening to 9 is a valid private-copy edit");
let inner = engine
.configure()
.rate_limit::<RateLimitPolicyError>(name, |settings| {
settings.set_broker(Some(broker_barrier(100)))
});
let error = inner.expect_err("nested configuration must be rejected");
*nested.borrow_mut() = Some(error.clone());
Err(error)
});
assert_eq!(
nested.into_inner(),
Some(ConfigureError::NestedConfiguration)
);
assert_eq!(
outer,
Err(ConfigureError::Validation {
name: name.to_owned(),
message: ConfigureError::NestedConfiguration.to_string(),
})
);
let rejects = engine
.execute_pre_trade(order(2))
.err()
.expect("limit must still be 2 after the rejected retune");
assert_eq!(rejects[0].reason, "rate limit exceeded: broker barrier");
}
#[test]
fn nested_configuration_of_different_policy_in_same_engine_is_rejected() {
let engine = build_engine_with_order_size(2);
let rate_name = RateLimitPolicy::<FullLocking>::NAME;
let size_name = OrderSizeLimitPolicy::<FullLocking>::NAME;
let nested = RefCell::new(None);
let outer = engine
.configure()
.rate_limit::<ConfigureError>(rate_name, |_settings| {
let inner = engine
.configure()
.order_size_limit::<OrderSizeLimitPolicyError>(size_name, |settings| {
settings.set_broker(Some(order_size_broker("200")))
});
let error = inner.expect_err("nested configuration must be rejected");
*nested.borrow_mut() = Some(error.clone());
Err(error)
});
assert_eq!(
nested.into_inner(),
Some(ConfigureError::NestedConfiguration)
);
assert_eq!(
outer,
Err(ConfigureError::Validation {
name: rate_name.to_owned(),
message: ConfigureError::NestedConfiguration.to_string(),
})
);
}
#[test]
fn nested_configuration_of_independent_engine_is_allowed() {
let engine_a = build_engine(2);
let engine_b = build_engine(2);
let name = RateLimitPolicy::<FullLocking>::NAME;
engine_a
.configure()
.rate_limit::<RateLimitPolicyError>(name, |_settings| {
engine_b
.configure()
.rate_limit(name, |settings| {
settings.set_broker(Some(broker_barrier(9)))
})
.expect("independent engine update must succeed");
Ok(())
})
.expect("independent engine configuration must not be rejected");
for account in 0..3 {
engine_b
.execute_pre_trade(order(account))
.expect("engine B broker limit must have been widened");
}
}
}