use crate::error::{IntegrateError, Result};
use crate::specialized::finance::pricing::black_scholes::normal_cdf;
use std::collections::{HashMap, VecDeque};
#[derive(Debug, Clone)]
pub struct MarketTick {
pub symbol: String,
pub price: f64,
pub volume: f64,
pub timestamp: u64,
pub bid: f64,
pub ask: f64,
}
#[derive(Debug, Clone)]
pub enum RiskAlert {
LimitBreach {
symbol: String,
current: f64,
limit: f64,
},
VarExceeded {
portfolio: String,
current_var: f64,
limit: f64,
},
GreeksThreshold {
symbol: String,
greek: String,
value: f64,
},
}
pub trait RiskMonitor: Send + Sync {
fn on_market_data(&mut self, tick: &MarketTick) -> Result<Option<RiskAlert>>;
fn name(&self) -> &str;
}
#[derive(Debug, Clone)]
pub struct OptionPosition {
pub strike: f64,
pub time_to_expiry: f64,
pub volatility: f64,
pub risk_free_rate: f64,
pub quantity: f64,
pub delta_alert_threshold: f64,
}
fn normal_pdf(x: f64) -> f64 {
(-0.5 * x * x).exp() / (2.0 * std::f64::consts::PI).sqrt()
}
fn bs_call_greeks(
spot: f64,
strike: f64,
time: f64,
vol: f64,
rate: f64,
) -> (f64, f64, f64, f64, f64) {
if spot <= 0.0 || strike <= 0.0 || time <= 0.0 || vol <= 0.0 {
return (0.0, 0.0, 0.0, 0.0, 0.0);
}
let sqrt_t = time.sqrt();
let d1 = ((spot / strike).ln() + (rate + 0.5 * vol * vol) * time) / (vol * sqrt_t);
let d2 = d1 - vol * sqrt_t;
let nd1 = normal_cdf(d1);
let nd2 = normal_cdf(d2);
let phi_d1 = normal_pdf(d1);
let delta = nd1;
let gamma = phi_d1 / (spot * vol * sqrt_t);
let vega = spot * phi_d1 * sqrt_t;
let theta = -spot * phi_d1 * vol / (2.0 * sqrt_t) - rate * strike * (-rate * time).exp() * nd2;
let rho = strike * time * (-rate * time).exp() * nd2;
(delta, gamma, vega, theta, rho)
}
#[derive(Debug)]
pub struct StreamingGreeks {
positions: HashMap<String, OptionPosition>,
current_greeks: HashMap<String, (f64, f64, f64, f64, f64)>,
}
impl StreamingGreeks {
pub fn new() -> Self {
Self {
positions: HashMap::new(),
current_greeks: HashMap::new(),
}
}
pub fn add_position(&mut self, symbol: String, pos: OptionPosition) {
self.positions.insert(symbol, pos);
}
pub fn greeks_for(&self, symbol: &str) -> Option<(f64, f64, f64, f64, f64)> {
self.current_greeks.get(symbol).copied()
}
}
impl Default for StreamingGreeks {
fn default() -> Self {
Self::new()
}
}
impl RiskMonitor for StreamingGreeks {
fn name(&self) -> &str {
"StreamingGreeks"
}
fn on_market_data(&mut self, tick: &MarketTick) -> Result<Option<RiskAlert>> {
if let Some(pos) = self.positions.get(&tick.symbol) {
let (delta, gamma, vega, theta, rho) = bs_call_greeks(
tick.price,
pos.strike,
pos.time_to_expiry,
pos.volatility,
pos.risk_free_rate,
);
let scaled_delta = delta * pos.quantity;
let threshold = pos.delta_alert_threshold;
self.current_greeks
.insert(tick.symbol.clone(), (delta, gamma, vega, theta, rho));
if scaled_delta.abs() > threshold {
return Ok(Some(RiskAlert::GreeksThreshold {
symbol: tick.symbol.clone(),
greek: "delta".to_string(),
value: scaled_delta.abs(),
}));
}
}
Ok(None)
}
}
#[derive(Debug)]
pub struct PositionLimitChecker {
limits: HashMap<String, f64>,
current_positions: HashMap<String, f64>,
}
impl PositionLimitChecker {
pub fn new() -> Self {
Self {
limits: HashMap::new(),
current_positions: HashMap::new(),
}
}
pub fn set_limit(&mut self, symbol: String, limit: f64) {
self.limits.insert(symbol, limit);
}
pub fn set_position(&mut self, symbol: String, size: f64) {
self.current_positions.insert(symbol, size);
}
}
impl Default for PositionLimitChecker {
fn default() -> Self {
Self::new()
}
}
impl RiskMonitor for PositionLimitChecker {
fn name(&self) -> &str {
"PositionLimitChecker"
}
fn on_market_data(&mut self, tick: &MarketTick) -> Result<Option<RiskAlert>> {
let Some(&limit) = self.limits.get(&tick.symbol) else {
return Ok(None);
};
let position = self
.current_positions
.get(&tick.symbol)
.copied()
.unwrap_or(0.0);
let notional = tick.price * position.abs();
if notional > limit {
return Ok(Some(RiskAlert::LimitBreach {
symbol: tick.symbol.clone(),
current: notional,
limit,
}));
}
Ok(None)
}
}
#[derive(Debug)]
pub struct IncrementalVaR {
window: VecDeque<f64>,
window_size: usize,
var_limit: f64,
portfolio: String,
prev_price: HashMap<String, f64>,
}
impl IncrementalVaR {
pub fn new(portfolio: impl Into<String>, window_size: usize, var_limit: f64) -> Result<Self> {
if window_size == 0 {
return Err(IntegrateError::ValueError(
"window_size must be positive".to_string(),
));
}
Ok(Self {
window: VecDeque::with_capacity(window_size),
window_size,
var_limit,
portfolio: portfolio.into(),
prev_price: HashMap::new(),
})
}
pub fn current_var(&self) -> Option<f64> {
if self.window.len() < 2 {
return None;
}
let mut sorted: Vec<f64> = self.window.iter().copied().collect();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let idx = ((1.0 - 0.95) * sorted.len() as f64).floor() as usize;
let idx = idx.min(sorted.len().saturating_sub(1));
Some(-sorted[idx])
}
}
impl RiskMonitor for IncrementalVaR {
fn name(&self) -> &str {
"IncrementalVaR"
}
fn on_market_data(&mut self, tick: &MarketTick) -> Result<Option<RiskAlert>> {
if tick.price <= 0.0 {
return Err(IntegrateError::ValueError(format!(
"Non-positive price {} for {}",
tick.price, tick.symbol
)));
}
if let Some(&prev) = self.prev_price.get(&tick.symbol) {
if prev > 0.0 {
let log_return = (tick.price / prev).ln();
if self.window.len() >= self.window_size {
self.window.pop_front();
}
self.window.push_back(log_return);
}
}
self.prev_price.insert(tick.symbol.clone(), tick.price);
if let Some(var) = self.current_var() {
if var > self.var_limit {
return Ok(Some(RiskAlert::VarExceeded {
portfolio: self.portfolio.clone(),
current_var: var,
limit: self.var_limit,
}));
}
}
Ok(None)
}
}
#[derive(Debug, Default)]
pub struct RiskAggregator {
positions: HashMap<String, (f64, f64)>,
var_contributions: HashMap<String, f64>,
}
impl RiskAggregator {
pub fn new() -> Self {
Self::default()
}
pub fn update_position(&mut self, symbol: String, size: f64, price: f64) {
self.positions.insert(symbol, (size, price));
}
pub fn set_var_contribution(&mut self, symbol: String, var: f64) {
self.var_contributions.insert(symbol, var);
}
pub fn total_notional(&self) -> f64 {
self.positions
.values()
.map(|(size, price)| size.abs() * price)
.sum()
}
pub fn total_var(&self) -> f64 {
self.var_contributions.values().copied().sum()
}
}
impl RiskMonitor for RiskAggregator {
fn name(&self) -> &str {
"RiskAggregator"
}
fn on_market_data(&mut self, tick: &MarketTick) -> Result<Option<RiskAlert>> {
if let Some(pos) = self.positions.get_mut(&tick.symbol) {
pos.1 = tick.price;
}
Ok(None)
}
}
pub struct EventDrivenRisk {
monitors: Vec<Box<dyn RiskMonitor>>,
}
impl EventDrivenRisk {
pub fn new() -> Self {
Self {
monitors: Vec::new(),
}
}
pub fn add_monitor(&mut self, monitor: Box<dyn RiskMonitor>) {
self.monitors.push(monitor);
}
pub fn process_tick(&mut self, tick: &MarketTick) -> Vec<RiskAlert> {
let mut alerts = Vec::new();
for monitor in &mut self.monitors {
match monitor.on_market_data(tick) {
Ok(Some(alert)) => alerts.push(alert),
Ok(None) => {}
Err(e) => {
eprintln!("[EventDrivenRisk] monitor '{}' error: {e}", monitor.name());
}
}
}
alerts
}
}
impl Default for EventDrivenRisk {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_tick(symbol: &str, price: f64) -> MarketTick {
MarketTick {
symbol: symbol.to_string(),
price,
volume: 1000.0,
timestamp: 1_700_000_000_000,
bid: price - 0.01,
ask: price + 0.01,
}
}
#[test]
fn test_position_limit_breach() {
let mut checker = PositionLimitChecker::new();
checker.set_limit("AAPL".to_string(), 5_000.0);
checker.set_position("AAPL".to_string(), 100.0);
let tick = make_tick("AAPL", 60.0);
let alert = checker
.on_market_data(&tick)
.expect("monitor should succeed");
match alert {
Some(RiskAlert::LimitBreach {
symbol,
current,
limit,
}) => {
assert_eq!(symbol, "AAPL");
assert!((current - 6_000.0).abs() < 1e-9, "current={current}");
assert!((limit - 5_000.0).abs() < 1e-9, "limit={limit}");
}
other => panic!("Expected LimitBreach, got {other:?}"),
}
}
#[test]
fn test_position_limit_no_breach() {
let mut checker = PositionLimitChecker::new();
checker.set_limit("AAPL".to_string(), 5_000.0);
checker.set_position("AAPL".to_string(), 100.0);
let tick = make_tick("AAPL", 40.0);
let alert = checker
.on_market_data(&tick)
.expect("monitor should succeed");
assert!(alert.is_none(), "Expected no alert, got {alert:?}");
}
#[test]
fn test_incremental_var_known_sequence() {
let mut ivar = IncrementalVaR::new("TEST", 252, 0.0).expect("should construct");
let prices: Vec<f64> = vec![
100.0, 95.0, 100.0, 95.0, 100.0, 95.0, 100.0, 95.0, 100.0, 95.0, 100.0, 95.0, 100.0,
95.0, 100.0, 95.0, 100.0, 95.0, 100.0, 95.0,
];
for &p in &prices {
let tick = make_tick("SPX", p);
let _ = ivar.on_market_data(&tick).expect("tick should succeed");
}
let var = ivar.current_var().expect("should have enough data");
assert!(var > 0.0, "VaR must be positive, got {var}");
assert!(var < 0.2, "VaR seems too large: {var}");
}
#[test]
fn test_event_driven_dispatches_to_multiple_monitors() {
let mut checker1 = PositionLimitChecker::new();
checker1.set_limit("X".to_string(), 1.0); checker1.set_position("X".to_string(), 10.0);
let mut checker2 = PositionLimitChecker::new();
checker2.set_limit("X".to_string(), 1.0);
checker2.set_position("X".to_string(), 10.0);
let mut dispatcher = EventDrivenRisk::new();
dispatcher.add_monitor(Box::new(checker1));
dispatcher.add_monitor(Box::new(checker2));
let tick = make_tick("X", 100.0);
let alerts = dispatcher.process_tick(&tick);
assert_eq!(
alerts.len(),
2,
"Both monitors should have fired an alert, got {}",
alerts.len()
);
}
#[test]
fn test_streaming_greeks_delta_plausible() {
let mut sg = StreamingGreeks::new();
sg.add_position(
"AAPL".to_string(),
OptionPosition {
strike: 150.0,
time_to_expiry: 0.25, volatility: 0.20,
risk_free_rate: 0.05,
quantity: 1.0,
delta_alert_threshold: f64::INFINITY, },
);
let tick = make_tick("AAPL", 155.0);
let _ = sg.on_market_data(&tick).expect("tick should succeed");
let (delta, gamma, vega, _theta, _rho) = sg
.greeks_for("AAPL")
.expect("Greeks should be available after tick");
assert!(delta > 0.0 && delta < 1.0, "delta out of range: {delta}");
assert!(gamma >= 0.0, "gamma negative: {gamma}");
assert!(vega >= 0.0, "vega negative: {vega}");
}
#[test]
fn test_full_pipeline() {
let mut dispatcher = EventDrivenRisk::new();
let mut checker = PositionLimitChecker::new();
checker.set_limit("SPY".to_string(), 1_000.0);
checker.set_position("SPY".to_string(), 20.0);
dispatcher.add_monitor(Box::new(checker));
let ivar = IncrementalVaR::new("portfolio", 252, -f64::INFINITY).expect("should construct");
dispatcher.add_monitor(Box::new(ivar));
let mut sg = StreamingGreeks::new();
sg.add_position(
"SPY".to_string(),
OptionPosition {
strike: 400.0,
time_to_expiry: 0.5,
volatility: 0.18,
risk_free_rate: 0.04,
quantity: 1.0,
delta_alert_threshold: f64::INFINITY,
},
);
dispatcher.add_monitor(Box::new(sg));
let mut agg = RiskAggregator::new();
agg.update_position("SPY".to_string(), 20.0, 420.0);
dispatcher.add_monitor(Box::new(agg));
let mut total_alerts = 0usize;
for i in 0..20u64 {
let tick = MarketTick {
symbol: "SPY".to_string(),
price: 60.0,
volume: 500.0,
timestamp: 1_700_000_000_000 + i * 1_000,
bid: 59.99,
ask: 60.01,
};
let alerts = dispatcher.process_tick(&tick);
total_alerts += alerts.len();
}
assert!(
total_alerts >= 20,
"Expected at least 20 alerts across 20 ticks, got {total_alerts}"
);
}
}