use std::collections::{HashMap, VecDeque};
use std::fmt::{Display, Formatter};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::core::{HasAccountId, HasInstrument};
use crate::param::{AccountId, Asset};
use crate::pretrade::policy::{missing_required_field_reject, PolicyGroupId, PolicyName};
use crate::pretrade::start_pre_trade_time::start_pre_trade_now;
use crate::pretrade::ConfigurablePolicy;
use crate::pretrade::DEFAULT_POLICY_GROUP_ID;
use crate::pretrade::{PreTradeContext, PreTradePolicy, Reject, RejectCode, RejectScope, Rejects};
use crate::storage::{ConfigCell, Storage, StorageBuilder};
type StoragePolicy<LPF> = <LPF as crate::storage::LockingPolicyFactory>::Policy;
type TimestampStorage<K, LPF> = Storage<K, VecDeque<Instant>, StoragePolicy<LPF>>;
const RATE_LIMIT_POLICY_NAME: &str = "RateLimitPolicy";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RateLimit {
pub max_orders: usize,
pub window: Duration,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RateLimitBrokerBarrier {
pub limit: RateLimit,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RateLimitAssetBarrier {
pub limit: RateLimit,
pub settlement_asset: Asset,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RateLimitAccountBarrier {
pub limit: RateLimit,
pub account_id: AccountId,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RateLimitAccountAssetBarrier {
pub limit: RateLimit,
pub account_id: AccountId,
pub settlement_asset: Asset,
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RateLimitPolicyError {
NoBarriersConfigured,
InvalidWindow { window: Duration },
}
impl Display for RateLimitPolicyError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::NoBarriersConfigured => write!(
f,
"at least one broker, asset, account, or account+asset barrier must be configured"
),
Self::InvalidWindow { window } => write!(
f,
"rate limit window must be positive and fit in u64 nanoseconds, got {window:?}"
),
}
}
}
impl std::error::Error for RateLimitPolicyError {}
#[derive(Debug, Clone)]
struct RateLimitSlot {
counter: Arc<AtomicWindowCounter>,
limit: RateLimit,
}
#[derive(Debug, Clone)]
pub struct RateLimitSettings {
asset_limits: HashMap<Asset, RateLimitSlot>,
account_limits: HashMap<AccountId, RateLimit>,
account_asset_limits: HashMap<(AccountId, Asset), RateLimit>,
broker: Option<RateLimitSlot>,
}
impl RateLimitSettings {
pub fn new(
broker: Option<RateLimitBrokerBarrier>,
asset_barriers: impl IntoIterator<Item = RateLimitAssetBarrier>,
account_barriers: impl IntoIterator<Item = RateLimitAccountBarrier>,
account_asset_barriers: impl IntoIterator<Item = RateLimitAccountAssetBarrier>,
) -> Result<Self, RateLimitPolicyError> {
let broker = broker
.map(|b| validate_limit(b.limit).map(fresh_slot))
.transpose()?;
let asset_limits: HashMap<Asset, RateLimitSlot> = asset_barriers
.into_iter()
.map(|b| validate_limit(b.limit).map(|limit| (b.settlement_asset, fresh_slot(limit))))
.collect::<Result<_, _>>()?;
let account_limits: HashMap<AccountId, RateLimit> = account_barriers
.into_iter()
.map(|b| validate_limit(b.limit).map(|limit| (b.account_id, limit)))
.collect::<Result<_, _>>()?;
let account_asset_limits: HashMap<(AccountId, Asset), RateLimit> = account_asset_barriers
.into_iter()
.map(|b| {
validate_limit(b.limit).map(|limit| ((b.account_id, b.settlement_asset), limit))
})
.collect::<Result<_, _>>()?;
if broker.is_none()
&& asset_limits.is_empty()
&& account_limits.is_empty()
&& account_asset_limits.is_empty()
{
return Err(RateLimitPolicyError::NoBarriersConfigured);
}
Ok(Self {
asset_limits,
account_limits,
account_asset_limits,
broker,
})
}
pub fn set_broker(
&mut self,
broker: Option<RateLimitBrokerBarrier>,
) -> Result<(), RateLimitPolicyError> {
let next = match broker {
Some(b) => {
let limit = validate_limit(b.limit)?;
let counter = match self.broker.as_ref() {
Some(slot) => Arc::clone(&slot.counter),
None => Arc::new(AtomicWindowCounter::new(0)),
};
Some(RateLimitSlot { counter, limit })
}
None => None,
};
if next.is_none()
&& self.asset_limits.is_empty()
&& self.account_limits.is_empty()
&& self.account_asset_limits.is_empty()
{
return Err(RateLimitPolicyError::NoBarriersConfigured);
}
self.broker = next;
Ok(())
}
pub fn set_asset_barriers(
&mut self,
barriers: impl IntoIterator<Item = RateLimitAssetBarrier>,
) -> Result<(), RateLimitPolicyError> {
let mut asset_limits: HashMap<Asset, RateLimitSlot> = HashMap::new();
for barrier in barriers {
let limit = validate_limit(barrier.limit)?;
let counter = match self.asset_limits.get(&barrier.settlement_asset) {
Some(slot) => Arc::clone(&slot.counter),
None => Arc::new(AtomicWindowCounter::new(0)),
};
asset_limits.insert(barrier.settlement_asset, RateLimitSlot { counter, limit });
}
if self.broker.is_none()
&& asset_limits.is_empty()
&& self.account_limits.is_empty()
&& self.account_asset_limits.is_empty()
{
return Err(RateLimitPolicyError::NoBarriersConfigured);
}
self.asset_limits = asset_limits;
Ok(())
}
pub fn set_account_barriers(
&mut self,
barriers: impl IntoIterator<Item = RateLimitAccountBarrier>,
) -> Result<(), RateLimitPolicyError> {
let mut account_limits: HashMap<AccountId, RateLimit> = HashMap::new();
for barrier in barriers {
account_limits.insert(barrier.account_id, validate_limit(barrier.limit)?);
}
if self.broker.is_none()
&& self.asset_limits.is_empty()
&& account_limits.is_empty()
&& self.account_asset_limits.is_empty()
{
return Err(RateLimitPolicyError::NoBarriersConfigured);
}
self.account_limits = account_limits;
Ok(())
}
pub fn set_account_asset_barriers(
&mut self,
barriers: impl IntoIterator<Item = RateLimitAccountAssetBarrier>,
) -> Result<(), RateLimitPolicyError> {
let mut account_asset_limits: HashMap<(AccountId, Asset), RateLimit> = HashMap::new();
for barrier in barriers {
account_asset_limits.insert(
(barrier.account_id, barrier.settlement_asset),
validate_limit(barrier.limit)?,
);
}
if self.broker.is_none()
&& self.asset_limits.is_empty()
&& self.account_limits.is_empty()
&& account_asset_limits.is_empty()
{
return Err(RateLimitPolicyError::NoBarriersConfigured);
}
self.account_asset_limits = account_asset_limits;
Ok(())
}
fn rearm(&mut self) {
if let Some(slot) = self.broker.as_mut() {
slot.counter = Arc::new(AtomicWindowCounter::new(0));
}
for slot in self.asset_limits.values_mut() {
slot.counter = Arc::new(AtomicWindowCounter::new(0));
}
}
}
fn fresh_slot(limit: RateLimit) -> RateLimitSlot {
RateLimitSlot {
counter: Arc::new(AtomicWindowCounter::new(0)),
limit,
}
}
#[derive(Debug)]
struct AtomicWindowCounter {
count: AtomicU64,
window_start_nanos: AtomicU64,
}
impl AtomicWindowCounter {
fn new(now_nanos: u64) -> Self {
Self {
count: AtomicU64::new(0),
window_start_nanos: AtomicU64::new(now_nanos),
}
}
fn push(&self, now_nanos: u64, window_nanos: u64) -> u64 {
let win_start = self.window_start_nanos.load(Ordering::Relaxed);
if now_nanos.wrapping_sub(win_start) >= window_nanos
&& self
.window_start_nanos
.compare_exchange(win_start, now_nanos, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
self.count.store(1, Ordering::Release);
return 1;
}
self.count.fetch_add(1, Ordering::AcqRel) + 1
}
fn peek(&self, now_nanos: u64, window_nanos: u64) -> u64 {
let win_start = self.window_start_nanos.load(Ordering::Relaxed);
if now_nanos.wrapping_sub(win_start) >= window_nanos {
return 1;
}
self.count.load(Ordering::Acquire) + 1
}
}
pub struct RateLimitPolicy<LockingPolicyFactory>
where
LockingPolicyFactory: crate::storage::LockingPolicyFactory,
{
group_id: PolicyGroupId,
epoch: Instant,
settings:
<LockingPolicyFactory as crate::storage::LockingPolicyFactory>::Config<RateLimitSettings>,
per_account_timestamps: TimestampStorage<AccountId, LockingPolicyFactory>,
per_account_asset_timestamps: TimestampStorage<(AccountId, Asset), LockingPolicyFactory>,
}
impl<LockingPolicyFactory> RateLimitPolicy<LockingPolicyFactory>
where
LockingPolicyFactory: crate::storage::LockingPolicyFactory,
{
pub const NAME: &'static str = RATE_LIMIT_POLICY_NAME;
pub fn new(
mut settings: RateLimitSettings,
storage_builder: &StorageBuilder<LockingPolicyFactory>,
) -> Self
where
LockingPolicyFactory: crate::storage::CreateStorageFor<AccountId>
+ crate::storage::CreateStorageFor<(AccountId, Asset)>,
{
let epoch = Instant::now();
settings.rearm();
let per_account_timestamps = storage_builder.create_for_bound_key();
let per_account_asset_timestamps = storage_builder.create_for_bound_key();
Self {
group_id: DEFAULT_POLICY_GROUP_ID,
epoch,
settings: <LockingPolicyFactory as crate::storage::LockingPolicyFactory>::new_config(
settings,
),
per_account_timestamps,
per_account_asset_timestamps,
}
}
pub fn with_policy_group_id(mut self, id: PolicyGroupId) -> Self {
self.group_id = id;
self
}
}
impl<LockingPolicyFactory> PolicyName for RateLimitPolicy<LockingPolicyFactory>
where
LockingPolicyFactory: crate::storage::LockingPolicyFactory,
{
fn policy_name(&self) -> &str {
Self::NAME
}
}
impl<LockingPolicyFactory> ConfigurablePolicy<LockingPolicyFactory>
for RateLimitPolicy<LockingPolicyFactory>
where
LockingPolicyFactory: crate::storage::LockingPolicyFactory,
{
type Settings = RateLimitSettings;
fn settings_cell(
&self,
) -> <LockingPolicyFactory as crate::storage::LockingPolicyFactory>::Config<RateLimitSettings>
{
self.settings.clone()
}
}
impl<Order, ExecutionReport, AccountAdjustment, LockingPolicyFactory, Sync>
PreTradePolicy<Order, ExecutionReport, AccountAdjustment, Sync>
for RateLimitPolicy<LockingPolicyFactory>
where
Order: HasAccountId + HasInstrument,
LockingPolicyFactory:
crate::storage::LockingPolicyFactory + crate::storage::CreateStorageFor<AccountId>,
Sync: crate::core::SyncMode<StorageLockingPolicyFactory = LockingPolicyFactory>,
{
fn name(&self) -> &str {
Self::NAME
}
fn policy_group_id(&self) -> PolicyGroupId {
self.group_id
}
#[allow(private_interfaces)]
fn built_in_config_entry(&self) -> Option<crate::core::ConfigEntry<LockingPolicyFactory>> {
Some(crate::core::ConfigEntry::RateLimit(
crate::pretrade::ConfigurablePolicy::settings_cell(self),
))
}
fn check_pre_trade_start(
&self,
_ctx: &PreTradeContext<<Sync as crate::core::SyncMode>::StorageLockingPolicyFactory>,
order: &Order,
) -> Result<(), Rejects> {
let now = start_pre_trade_now();
let now_nanos = now
.checked_duration_since(self.epoch)
.unwrap_or_default()
.as_nanos() as u64;
self.settings.with(|s| -> Result<(), Rejects> {
let needs_settlement = !s.asset_limits.is_empty() || !s.account_asset_limits.is_empty();
let needs_account = !s.account_limits.is_empty() || !s.account_asset_limits.is_empty();
let settlement_opt: Option<Asset> = if needs_settlement {
Some(
order
.instrument()
.map_err(|e| {
Rejects::from(missing_required_field_reject(self, "instrument", &e))
})?
.settlement_asset()
.clone(),
)
} else {
None
};
let account_id_opt: Option<AccountId> = if needs_account {
Some(order.account_id().map_err(|e| {
Rejects::from(missing_required_field_reject(self, "account ID", &e))
})?)
} else {
None
};
let broker = s.broker.as_ref().map(|slot| {
let count = slot.counter.push(now_nanos, window_nanos(&slot.limit));
over_limit(count, &slot.limit, RejectScope::Order, "broker barrier")
});
let asset = settlement_opt.as_ref().and_then(|settlement| {
s.asset_limits.get(settlement).map(|slot| {
let count = slot.counter.push(now_nanos, window_nanos(&slot.limit));
over_limit(count, &slot.limit, RejectScope::Order, "asset barrier")
})
});
let account = account_id_opt.and_then(|account_id| {
s.account_limits.get(&account_id).map(|limit| {
let count = self.per_account_timestamps.with_mut(
account_id,
VecDeque::new,
|entry, _is_new| {
advance_window(entry, now, limit.window);
entry.push_back(now);
entry.len() as u64
},
);
over_limit(count, limit, RejectScope::Account, "account barrier")
})
});
let account_asset = account_id_opt.and_then(|account_id| {
settlement_opt.as_ref().and_then(|settlement| {
let key = (account_id, settlement.clone());
s.account_asset_limits.get(&key).map(|limit| {
let count = self.per_account_asset_timestamps.with_mut(
key,
VecDeque::new,
|entry, _is_new| {
advance_window(entry, now, limit.window);
entry.push_back(now);
entry.len() as u64
},
);
over_limit(count, limit, RejectScope::Account, "account+asset barrier")
})
})
});
match broker
.flatten()
.or_else(|| asset.flatten())
.or_else(|| account.flatten())
.or_else(|| account_asset.flatten())
{
Some(reject) => Err(reject),
None => Ok(()),
}
})
}
fn check_pre_trade_start_dry_run(
&self,
_ctx: &PreTradeContext<<Sync as crate::core::SyncMode>::StorageLockingPolicyFactory>,
order: &Order,
) -> Result<(), Rejects> {
let now = start_pre_trade_now();
let now_nanos = now
.checked_duration_since(self.epoch)
.unwrap_or_default()
.as_nanos() as u64;
self.settings.with(|s| -> Result<(), Rejects> {
let needs_settlement = !s.asset_limits.is_empty() || !s.account_asset_limits.is_empty();
let needs_account = !s.account_limits.is_empty() || !s.account_asset_limits.is_empty();
let settlement_opt: Option<Asset> = if needs_settlement {
Some(
order
.instrument()
.map_err(|e| {
Rejects::from(missing_required_field_reject(self, "instrument", &e))
})?
.settlement_asset()
.clone(),
)
} else {
None
};
let account_id_opt: Option<AccountId> = if needs_account {
Some(order.account_id().map_err(|e| {
Rejects::from(missing_required_field_reject(self, "account ID", &e))
})?)
} else {
None
};
let broker = s.broker.as_ref().map(|slot| {
let count = slot.counter.peek(now_nanos, window_nanos(&slot.limit));
over_limit(count, &slot.limit, RejectScope::Order, "broker barrier")
});
let asset = settlement_opt.as_ref().and_then(|settlement| {
s.asset_limits.get(settlement).map(|slot| {
let count = slot.counter.peek(now_nanos, window_nanos(&slot.limit));
over_limit(count, &slot.limit, RejectScope::Order, "asset barrier")
})
});
let account = account_id_opt.and_then(|account_id| {
s.account_limits.get(&account_id).map(|limit| {
let count = self
.per_account_timestamps
.with(&account_id, |entry| {
would_be_window_count(entry, now, limit.window)
})
.unwrap_or(1);
over_limit(count, limit, RejectScope::Account, "account barrier")
})
});
let account_asset = account_id_opt.and_then(|account_id| {
settlement_opt.as_ref().and_then(|settlement| {
let key = (account_id, settlement.clone());
s.account_asset_limits.get(&key).map(|limit| {
let count = self
.per_account_asset_timestamps
.with(&key, |entry| {
would_be_window_count(entry, now, limit.window)
})
.unwrap_or(1);
over_limit(count, limit, RejectScope::Account, "account+asset barrier")
})
})
});
match broker
.flatten()
.or_else(|| asset.flatten())
.or_else(|| account.flatten())
.or_else(|| account_asset.flatten())
{
Some(reject) => Err(reject),
None => Ok(()),
}
})
}
}
fn window_nanos(limit: &RateLimit) -> u64 {
limit.window.as_nanos() as u64
}
fn over_limit(
count: u64,
limit: &RateLimit,
scope: RejectScope,
reason: &'static str,
) -> Option<Rejects> {
(count > limit.max_orders as u64)
.then(|| rate_limit_reject(scope, reason, count, limit.max_orders as u64, limit.window))
}
fn rate_limit_reject(
scope: RejectScope,
barrier: &'static str,
count: u64,
max_orders: u64,
window: Duration,
) -> Rejects {
Reject::new(
RATE_LIMIT_POLICY_NAME,
scope,
RejectCode::RateLimitExceeded,
format!("rate limit exceeded: {barrier}"),
format!(
"submitted {} orders in {:?} window, max allowed: {}",
count, window, max_orders
),
)
.into()
}
fn validate_limit(limit: RateLimit) -> Result<RateLimit, RateLimitPolicyError> {
if limit.window.is_zero() || limit.window.as_nanos() > u128::from(u64::MAX) {
return Err(RateLimitPolicyError::InvalidWindow {
window: limit.window,
});
}
Ok(limit)
}
fn advance_window(timestamps: &mut VecDeque<Instant>, now: Instant, window: Duration) {
while let Some(oldest) = timestamps.front().copied() {
match now.checked_duration_since(oldest) {
Some(elapsed) if elapsed >= window => {
timestamps.pop_front();
}
_ => break,
}
}
}
fn would_be_window_count(timestamps: &VecDeque<Instant>, now: Instant, window: Duration) -> u64 {
let live = timestamps
.iter()
.filter(
|ts| !matches!(now.checked_duration_since(**ts), Some(elapsed) if elapsed >= window),
)
.count() as u64;
live + 1
}
#[cfg(test)]
mod tests {
use std::time::{Duration, Instant};
use crate::core::{Instrument, OrderOperation};
use crate::param::{AccountId, Asset, Quantity, Side, TradeAmount};
use crate::pretrade::start_pre_trade_time::with_start_pre_trade_now;
use crate::pretrade::{
ConfigurablePolicy, PreTradeContext, PreTradePolicy, RejectCode, RejectScope, Rejects,
};
use crate::storage::{ConfigCell, NoLocking};
use super::{
RateLimit, RateLimitAccountAssetBarrier, RateLimitAccountBarrier, RateLimitAssetBarrier,
RateLimitBrokerBarrier, RateLimitPolicy, RateLimitPolicyError, RateLimitSettings,
};
type TestPolicy = RateLimitPolicy<NoLocking>;
fn test_builder() -> crate::SyncedEngineBuilder<OrderOperation, (), (), crate::LocalSync> {
crate::Engine::builder().no_sync()
}
fn policy_from(settings: RateLimitSettings) -> TestPolicy {
RateLimitPolicy::new(settings, test_builder().storage_builder())
}
#[test]
fn zero_window_rejected_by_settings() {
let err = RateLimitSettings::new(
Some(RateLimitBrokerBarrier {
limit: RateLimit {
max_orders: 1,
window: Duration::ZERO,
},
}),
[],
[],
[],
)
.expect_err("must fail");
assert_eq!(
err,
RateLimitPolicyError::InvalidWindow {
window: Duration::ZERO
}
);
assert_eq!(
err.to_string(),
"rate limit window must be positive and fit in u64 nanoseconds, got 0ns"
);
}
#[test]
fn sub_microsecond_window_accepted_by_settings() {
let result = RateLimitSettings::new(
Some(RateLimitBrokerBarrier {
limit: RateLimit {
max_orders: 1,
window: Duration::from_nanos(500),
},
}),
[],
[],
[],
);
assert!(result.is_ok());
}
#[test]
fn excessive_window_rejected_by_settings() {
let max_plus_one = Duration::new(u64::MAX, 0) + Duration::from_nanos(1);
let err = RateLimitSettings::new(
Some(RateLimitBrokerBarrier {
limit: RateLimit {
max_orders: 1,
window: max_plus_one,
},
}),
[],
[],
[],
)
.expect_err("must fail");
assert!(matches!(err, RateLimitPolicyError::InvalidWindow { .. }));
}
#[test]
fn no_barriers_configured_rejected_by_settings() {
let err = RateLimitSettings::new(None, [], [], []).expect_err("must fail");
assert_eq!(err, RateLimitPolicyError::NoBarriersConfigured);
assert_eq!(
err.to_string(),
"at least one broker, asset, account, or account+asset barrier must be configured"
);
}
#[test]
fn set_broker_retunes_without_resetting_counter() {
let policy = broker_policy(5, Duration::from_secs(10));
let o = order(account(1));
let base = Instant::now();
assert!(check_at(&policy, &o, base).is_ok());
assert!(check_at(&policy, &o, base + Duration::from_secs(1)).is_ok());
assert!(check_at(&policy, &o, base + Duration::from_secs(2)).is_ok());
policy
.settings_cell()
.update::<RateLimitPolicyError>(|s| {
s.set_broker(Some(RateLimitBrokerBarrier {
limit: RateLimit {
max_orders: 2,
window: Duration::from_secs(10),
},
}))
})
.expect("retune must publish");
assert!(check_at(&policy, &o, base + Duration::from_secs(3)).is_err());
}
#[test]
fn set_asset_barriers_adds_axis_at_runtime() {
let policy = broker_policy(100, Duration::from_secs(60));
let usd = Asset::new("USD").expect("asset code must be valid");
policy
.settings_cell()
.update::<RateLimitPolicyError>(|s| {
s.set_asset_barriers([RateLimitAssetBarrier {
settlement_asset: usd.clone(),
limit: RateLimit {
max_orders: 1,
window: Duration::from_secs(60),
},
}])
})
.expect("add must publish");
let base = Instant::now();
assert!(check_at(&policy, &order(account(1)), base).is_ok());
let reject = check_at(&policy, &order(account(2)), base + Duration::from_secs(1))
.expect_err("second USD order must breach the new asset barrier");
assert_eq!(reject[0].reason, "rate limit exceeded: asset barrier");
}
#[test]
fn set_broker_none_removes_axis_when_another_remains() {
let mut settings = asset_settings("USD", 100, Duration::from_secs(60));
settings
.set_broker(Some(RateLimitBrokerBarrier {
limit: RateLimit {
max_orders: 1,
window: Duration::from_secs(60),
},
}))
.expect("broker add must succeed");
let policy = policy_from(settings);
policy
.settings_cell()
.update::<RateLimitPolicyError>(|s| s.set_broker(None))
.expect("remove must publish");
let base = Instant::now();
assert!(check_at(&policy, &order(account(1)), base).is_ok());
assert!(check_at(&policy, &order(account(2)), base + Duration::from_secs(1)).is_ok());
}
#[test]
fn set_broker_none_clearing_last_axis_fails_and_keeps_config() {
let policy = broker_policy(1, Duration::from_secs(10));
let err = policy
.settings_cell()
.update::<RateLimitPolicyError>(|s| s.set_broker(None))
.expect_err("clearing the last axis must fail");
assert_eq!(err, RateLimitPolicyError::NoBarriersConfigured);
let base = Instant::now();
assert!(check_at(&policy, &order(account(1)), base).is_ok());
assert!(check_at(&policy, &order(account(1)), base + Duration::from_secs(1)).is_err());
}
#[test]
fn set_asset_barriers_rejects_invalid_window_and_keeps_prior() {
let mut settings = asset_settings("USD", 1, Duration::from_secs(10));
let usd = Asset::new("USD").expect("asset code must be valid");
let err = settings
.set_asset_barriers([RateLimitAssetBarrier {
settlement_asset: usd.clone(),
limit: RateLimit {
max_orders: 1,
window: Duration::ZERO,
},
}])
.expect_err("must fail");
assert_eq!(
err,
RateLimitPolicyError::InvalidWindow {
window: Duration::ZERO
}
);
assert_eq!(
settings
.asset_limits
.get(&usd)
.expect("usd set")
.limit
.window,
Duration::from_secs(10)
);
}
#[test]
fn cell_update_retunes_broker_limit_going_forward() {
let policy = broker_policy(5, Duration::from_secs(10));
let o = order(account(1));
let base = Instant::now();
assert!(check_at(&policy, &o, base).is_ok());
assert!(check_at(&policy, &o, base + Duration::from_secs(1)).is_ok());
assert!(check_at(&policy, &o, base + Duration::from_secs(2)).is_ok());
policy
.settings_cell()
.update::<RateLimitPolicyError>(|s| {
s.set_broker(Some(RateLimitBrokerBarrier {
limit: RateLimit {
max_orders: 2,
window: Duration::from_secs(10),
},
}))
})
.expect("retune must publish");
assert!(check_at(&policy, &o, base + Duration::from_secs(3)).is_err());
policy
.settings_cell()
.update::<RateLimitPolicyError>(|s| {
s.set_broker(Some(RateLimitBrokerBarrier {
limit: RateLimit {
max_orders: 10,
window: Duration::from_secs(10),
},
}))
})
.expect("retune must publish");
let later = base + Duration::from_secs(20);
for i in 0..10 {
assert!(check_at(&policy, &o, later + Duration::from_millis(i)).is_ok());
}
}
#[test]
fn settings_cell_clone_shares_underlying() {
let policy = broker_policy(2, Duration::from_secs(10));
let handle = policy.settings_cell();
handle
.update::<RateLimitPolicyError>(|s| {
s.set_broker(Some(RateLimitBrokerBarrier {
limit: RateLimit {
max_orders: 7,
window: Duration::from_secs(10),
},
}))
})
.expect("retune must publish");
assert_eq!(
policy
.settings
.with(|s| s.broker.as_ref().map(|b| b.limit.max_orders)),
Some(7)
);
}
#[test]
fn sliding_window_rejects_when_broker_limit_is_exceeded() {
let policy = broker_policy(2, Duration::from_secs(10));
let o = order(account(1));
let base = Instant::now();
assert!(check_at(&policy, &o, base).is_ok());
assert!(check_at(&policy, &o, base + Duration::from_secs(1)).is_ok());
let reject = check_at(&policy, &o, base + Duration::from_secs(2))
.expect_err("third order in window must be rejected");
let reject = &reject[0];
assert_eq!(reject.scope, RejectScope::Order);
assert_eq!(reject.code, RejectCode::RateLimitExceeded);
assert_eq!(reject.reason, "rate limit exceeded: broker barrier");
assert_eq!(
reject.details,
"submitted 3 orders in 10s window, max allowed: 2"
);
}
#[test]
fn expired_timestamps_leave_broker_sliding_window() {
let policy = broker_policy(2, Duration::from_secs(10));
let o = order(account(1));
let base = Instant::now();
assert!(check_at(&policy, &o, base).is_ok());
assert!(check_at(&policy, &o, base + Duration::from_secs(1)).is_ok());
assert!(check_at(&policy, &o, base + Duration::from_secs(11)).is_ok());
}
#[test]
fn rejected_broker_attempts_are_counted_and_not_rolled_back() {
let policy = broker_policy(1, Duration::from_secs(3));
let o = order(account(1));
let base = Instant::now();
assert!(check_at(&policy, &o, base).is_ok());
assert!(check_at(&policy, &o, base + Duration::from_secs(1)).is_err());
let reject = check_at(&policy, &o, base + Duration::from_millis(2500))
.expect_err("rejected attempt must stay counted in the window");
let reject = &reject[0];
assert_eq!(reject.scope, RejectScope::Order);
assert_eq!(reject.code, RejectCode::RateLimitExceeded);
assert_eq!(reject.reason, "rate limit exceeded: broker barrier");
assert_eq!(
reject.details,
"submitted 3 orders in 3s window, max allowed: 1"
);
}
#[test]
fn dry_run_consumes_no_broker_budget() {
let policy = broker_policy(1, Duration::from_secs(10));
let o = order(account(1));
let base = Instant::now();
for i in 0..50 {
assert!(
check_dry_run_at(&policy, &o, base + Duration::from_millis(i)).is_ok(),
"dry-run on a window with budget must pass"
);
}
assert!(check_at(&policy, &o, base + Duration::from_secs(1)).is_ok());
assert!(check_at(&policy, &o, base + Duration::from_secs(2)).is_err());
}
#[test]
fn dry_run_reports_would_be_breach_without_consuming_budget() {
let policy = broker_policy(1, Duration::from_secs(10));
let o = order(account(1));
let base = Instant::now();
assert!(check_at(&policy, &o, base).is_ok());
let reject = check_dry_run_at(&policy, &o, base + Duration::from_secs(1))
.expect_err("dry-run must report a would-be rate-limit breach");
assert_eq!(reject[0].scope, RejectScope::Order);
assert_eq!(reject[0].code, RejectCode::RateLimitExceeded);
assert_eq!(reject[0].reason, "rate limit exceeded: broker barrier");
assert!(check_dry_run_at(&policy, &o, base + Duration::from_secs(2)).is_err());
assert!(check_at(&policy, &o, base + Duration::from_secs(11)).is_ok());
}
#[test]
fn broker_barrier_applies_to_all_accounts() {
let policy = broker_policy(2, Duration::from_secs(10));
let base = Instant::now();
assert!(check_at(&policy, &order(account(1)), base).is_ok());
assert!(check_at(&policy, &order(account(2)), base + Duration::from_secs(1)).is_ok());
let reject = check_at(&policy, &order(account(3)), base + Duration::from_secs(2))
.expect_err("third order across all accounts must be rejected");
assert_eq!(reject[0].scope, RejectScope::Order);
assert_eq!(reject[0].reason, "rate limit exceeded: broker barrier");
}
#[test]
fn asset_barrier_rejects_when_limit_is_exceeded_for_matching_settlement() {
let policy = asset_policy("USD", 1, Duration::from_secs(10));
let base = Instant::now();
assert!(check_at(&policy, &order(account(1)), base).is_ok());
let reject = check_at(&policy, &order(account(2)), base + Duration::from_secs(1))
.expect_err("second USD order must be rejected by asset barrier");
assert_eq!(reject[0].scope, RejectScope::Order);
assert_eq!(reject[0].code, RejectCode::RateLimitExceeded);
assert_eq!(reject[0].reason, "rate limit exceeded: asset barrier");
}
#[test]
fn asset_barrier_ignores_non_matching_settlement() {
let policy = asset_policy("EUR", 1, Duration::from_secs(10));
let base = Instant::now();
assert!(check_at(&policy, &order(account(1)), base).is_ok());
assert!(check_at(&policy, &order(account(1)), base + Duration::from_secs(1)).is_ok());
}
#[test]
fn account_barrier_rejects_when_limit_is_exceeded() {
let policy = account_policy(account(1), 2, Duration::from_secs(10));
let o = order(account(1));
let base = Instant::now();
assert!(check_at(&policy, &o, base).is_ok());
assert!(check_at(&policy, &o, base + Duration::from_secs(1)).is_ok());
let reject = check_at(&policy, &o, base + Duration::from_secs(2))
.expect_err("third order for account must be rejected");
let reject = &reject[0];
assert_eq!(reject.scope, RejectScope::Account);
assert_eq!(reject.code, RejectCode::RateLimitExceeded);
assert_eq!(reject.reason, "rate limit exceeded: account barrier");
assert_eq!(
reject.details,
"submitted 3 orders in 10s window, max allowed: 2"
);
}
#[test]
fn different_accounts_track_independently() {
let policy = account_policy(account(1), 1, Duration::from_secs(10));
let base = Instant::now();
assert!(check_at(&policy, &order(account(1)), base).is_ok());
assert!(check_at(&policy, &order(account(2)), base + Duration::from_secs(1)).is_ok());
assert!(check_at(&policy, &order(account(2)), base + Duration::from_secs(2)).is_ok());
assert!(check_at(&policy, &order(account(1)), base + Duration::from_secs(3)).is_err());
}
#[test]
fn account_without_barrier_passes_when_only_account_barriers_configured() {
let policy = account_policy(account(1), 1, Duration::from_secs(10));
let base = Instant::now();
assert!(check_at(&policy, &order(account(2)), base).is_ok());
assert!(check_at(&policy, &order(account(2)), base + Duration::from_secs(1)).is_ok());
}
#[test]
fn broker_only_config_does_not_call_instrument() {
let policy = broker_policy(10, Duration::from_secs(60));
let order = NoInstrumentOrder {
account_id: account(1),
};
let result = <TestPolicy as PreTradePolicy<
NoInstrumentOrder,
(),
(),
crate::core::LocalSync,
>>::check_pre_trade_start(
&policy, &PreTradeContext::<NoLocking>::new(None), &order
);
assert!(result.is_ok());
}
#[test]
fn account_only_config_does_not_call_instrument() {
let policy = account_policy(account(1), 10, Duration::from_secs(60));
let order = NoInstrumentOrder {
account_id: account(1),
};
let result = <TestPolicy as PreTradePolicy<
NoInstrumentOrder,
(),
(),
crate::core::LocalSync,
>>::check_pre_trade_start(
&policy, &PreTradeContext::<NoLocking>::new(None), &order
);
assert!(result.is_ok());
}
#[test]
fn account_asset_barrier_rejects_when_limit_is_exceeded() {
let policy = account_asset_policy(account(1), "USD", 1, Duration::from_secs(10));
let base = Instant::now();
assert!(check_at(&policy, &order(account(1)), base).is_ok());
let reject = check_at(&policy, &order(account(1)), base + Duration::from_secs(1))
.expect_err("second order for account+USD must be rejected");
let reject = &reject[0];
assert_eq!(reject.scope, RejectScope::Account);
assert_eq!(reject.code, RejectCode::RateLimitExceeded);
assert_eq!(reject.reason, "rate limit exceeded: account+asset barrier");
}
#[test]
fn account_asset_barrier_ignores_different_account() {
let policy = account_asset_policy(account(1), "USD", 1, Duration::from_secs(10));
let base = Instant::now();
assert!(check_at(&policy, &order(account(2)), base).is_ok());
assert!(check_at(&policy, &order(account(2)), base + Duration::from_secs(1)).is_ok());
}
#[test]
fn both_barriers_checked_and_account_barrier_triggers_after_broker() {
let policy = policy_from(
RateLimitSettings::new(
Some(RateLimitBrokerBarrier {
limit: RateLimit {
max_orders: 3,
window: Duration::from_secs(10),
},
}),
[],
[RateLimitAccountBarrier {
account_id: account(1),
limit: RateLimit {
max_orders: 1,
window: Duration::from_secs(10),
},
}],
[],
)
.expect("valid config"),
);
let base = Instant::now();
assert!(check_at(&policy, &order(account(1)), base).is_ok());
let reject = check_at(&policy, &order(account(1)), base + Duration::from_secs(1))
.expect_err("account barrier must trigger");
assert_eq!(reject[0].scope, RejectScope::Account);
assert_eq!(reject[0].reason, "rate limit exceeded: account barrier");
}
#[test]
fn broker_reject_reported_when_both_barriers_breach() {
let policy = policy_from(
RateLimitSettings::new(
Some(RateLimitBrokerBarrier {
limit: RateLimit {
max_orders: 1,
window: Duration::from_secs(10),
},
}),
[],
[RateLimitAccountBarrier {
account_id: account(1),
limit: RateLimit {
max_orders: 1,
window: Duration::from_secs(10),
},
}],
[],
)
.expect("valid config"),
);
let base = Instant::now();
assert!(check_at(&policy, &order(account(1)), base).is_ok());
let reject = check_at(&policy, &order(account(1)), base + Duration::from_secs(1))
.expect_err("must reject");
assert_eq!(reject[0].scope, RejectScope::Order);
assert_eq!(reject[0].reason, "rate limit exceeded: broker barrier");
}
#[test]
fn group_id_is_carried_from_policy_builder() {
use crate::pretrade::PolicyGroupId;
let group = PolicyGroupId::new(7);
let settings = RateLimitSettings::new(
Some(RateLimitBrokerBarrier {
limit: RateLimit {
max_orders: 1,
window: Duration::from_secs(10),
},
}),
[],
[],
[],
)
.expect("valid config");
let policy = policy_from(settings).with_policy_group_id(group);
let observed = <TestPolicy as PreTradePolicy<
OrderOperation,
(),
(),
crate::core::LocalSync,
>>::policy_group_id(&policy);
assert_eq!(observed, group);
}
#[test]
fn account_id_access_error_rejects_with_missing_required_field() {
struct NoAccountId;
impl crate::HasAccountId for NoAccountId {
fn account_id(&self) -> Result<AccountId, crate::RequestFieldAccessError> {
Err(crate::RequestFieldAccessError::new("account_id"))
}
}
impl crate::HasInstrument for NoAccountId {
fn instrument(
&self,
) -> Result<&crate::core::Instrument, crate::RequestFieldAccessError> {
Err(crate::RequestFieldAccessError::new("instrument"))
}
}
let policy = account_policy(account(1), 10, Duration::from_secs(60));
let reject = <TestPolicy as PreTradePolicy<NoAccountId, (), (), crate::core::LocalSync>>::check_pre_trade_start(
&policy,
&PreTradeContext::<NoLocking>::new(None),
&NoAccountId,
)
.expect_err("missing account_id must reject");
let reject = &reject[0];
assert_eq!(reject.scope, RejectScope::Order);
assert_eq!(reject.code, RejectCode::MissingRequiredField);
assert_eq!(
reject.reason,
"failed to access required field 'account ID'"
);
assert_eq!(reject.details, "failed to access field 'account_id'");
}
#[test]
fn broker_only_config_does_not_require_account_id() {
struct NoAccountId;
impl crate::HasAccountId for NoAccountId {
fn account_id(&self) -> Result<AccountId, crate::RequestFieldAccessError> {
Err(crate::RequestFieldAccessError::new("account_id"))
}
}
impl crate::HasInstrument for NoAccountId {
fn instrument(
&self,
) -> Result<&crate::core::Instrument, crate::RequestFieldAccessError> {
Err(crate::RequestFieldAccessError::new("instrument"))
}
}
let policy = broker_policy(10, Duration::from_secs(60));
assert!(
<TestPolicy as PreTradePolicy<NoAccountId, (), (), crate::core::LocalSync>>::check_pre_trade_start(
&policy,
&PreTradeContext::<NoLocking>::new(None),
&NoAccountId,
)
.is_ok()
);
}
#[test]
fn asset_axis_with_no_instrument_returns_missing_required_field() {
let policy = asset_policy("USD", 10, Duration::from_secs(60));
let order = NoInstrumentOrder {
account_id: account(1),
};
let reject = <TestPolicy as PreTradePolicy<
NoInstrumentOrder,
(),
(),
crate::core::LocalSync,
>>::check_pre_trade_start(
&policy, &PreTradeContext::<NoLocking>::new(None), &order
)
.expect_err("asset-axis policy must require instrument");
let reject = &reject[0];
assert_eq!(reject.scope, RejectScope::Order);
assert_eq!(reject.code, RejectCode::MissingRequiredField);
assert_eq!(
reject.reason,
"failed to access required field 'instrument'"
);
assert_eq!(reject.details, "failed to access field 'instrument'");
}
struct NoInstrumentOrder {
account_id: AccountId,
}
impl crate::HasAccountId for NoInstrumentOrder {
fn account_id(&self) -> Result<AccountId, crate::RequestFieldAccessError> {
Ok(self.account_id)
}
}
impl crate::HasInstrument for NoInstrumentOrder {
fn instrument(&self) -> Result<&Instrument, crate::RequestFieldAccessError> {
Err(crate::RequestFieldAccessError::new("instrument"))
}
}
fn check_at(policy: &TestPolicy, order: &OrderOperation, now: Instant) -> Result<(), Rejects> {
with_start_pre_trade_now(now, || {
<TestPolicy as PreTradePolicy<OrderOperation, (), (), crate::core::LocalSync>>::check_pre_trade_start(
policy,
&PreTradeContext::<NoLocking>::new(None),
order,
)
})
}
fn check_dry_run_at(
policy: &TestPolicy,
order: &OrderOperation,
now: Instant,
) -> Result<(), Rejects> {
with_start_pre_trade_now(now, || {
<TestPolicy as PreTradePolicy<OrderOperation, (), (), crate::core::LocalSync>>::check_pre_trade_start_dry_run(
policy,
&PreTradeContext::<NoLocking>::new(None),
order,
)
})
}
fn broker_settings(max_orders: usize, window: Duration) -> RateLimitSettings {
RateLimitSettings::new(
Some(RateLimitBrokerBarrier {
limit: RateLimit { max_orders, window },
}),
[],
[],
[],
)
.expect("valid config")
}
fn broker_policy(max_orders: usize, window: Duration) -> TestPolicy {
policy_from(broker_settings(max_orders, window))
}
fn asset_settings(settlement: &str, max_orders: usize, window: Duration) -> RateLimitSettings {
RateLimitSettings::new(
None,
[RateLimitAssetBarrier {
limit: RateLimit { max_orders, window },
settlement_asset: Asset::new(settlement).expect("asset code must be valid"),
}],
[],
[],
)
.expect("valid config")
}
fn asset_policy(settlement: &str, max_orders: usize, window: Duration) -> TestPolicy {
policy_from(asset_settings(settlement, max_orders, window))
}
fn account_settings(
account_id: AccountId,
max_orders: usize,
window: Duration,
) -> RateLimitSettings {
RateLimitSettings::new(
None,
[],
[RateLimitAccountBarrier {
account_id,
limit: RateLimit { max_orders, window },
}],
[],
)
.expect("valid config")
}
fn account_policy(account_id: AccountId, max_orders: usize, window: Duration) -> TestPolicy {
policy_from(account_settings(account_id, max_orders, window))
}
fn account_asset_settings(
account_id: AccountId,
settlement: &str,
max_orders: usize,
window: Duration,
) -> RateLimitSettings {
RateLimitSettings::new(
None,
[],
[],
[RateLimitAccountAssetBarrier {
account_id,
settlement_asset: Asset::new(settlement).expect("asset code must be valid"),
limit: RateLimit { max_orders, window },
}],
)
.expect("valid config")
}
fn account_asset_policy(
account_id: AccountId,
settlement: &str,
max_orders: usize,
window: Duration,
) -> TestPolicy {
policy_from(account_asset_settings(
account_id, settlement, max_orders, window,
))
}
fn order(account_id: AccountId) -> OrderOperation {
OrderOperation {
instrument: Instrument::new(
Asset::new("AAPL").expect("asset code must be valid"),
Asset::new("USD").expect("asset code must be valid"),
),
account_id,
side: Side::Buy,
trade_amount: TradeAmount::Quantity(
Quantity::from_str("1").expect("quantity literal must be valid"),
),
price: None,
}
}
fn account(id: u64) -> AccountId {
AccountId::from_u64(id)
}
}