use std::collections::{BTreeMap, HashMap, HashSet};
use nautilus_core::UnixNanos;
use nautilus_model::{
data::{
QuoteTick,
option_chain::{OptionChainSlice, OptionGreeks, OptionStrikeData, StrikeRange},
},
enums::OptionKind,
identifiers::{InstrumentId, OptionSeriesId},
types::Price,
};
use super::{
AtmTracker,
constants::{DEFAULT_REBALANCE_COOLDOWN_NS, DEFAULT_REBALANCE_HYSTERESIS},
};
#[derive(Debug)]
pub struct OptionChainAggregator {
series_id: OptionSeriesId,
strike_range: StrikeRange,
atm_tracker: AtmTracker,
instruments: HashMap<InstrumentId, (Price, OptionKind)>,
active_ids: HashSet<InstrumentId>,
last_atm_strike: Option<Price>,
hysteresis: f64,
cooldown_ns: u64,
last_rebalance_ns: Option<UnixNanos>,
max_ts_event: UnixNanos,
pending_greeks: HashMap<InstrumentId, OptionGreeks>,
call_buffer: BTreeMap<Price, OptionStrikeData>,
put_buffer: BTreeMap<Price, OptionStrikeData>,
}
impl OptionChainAggregator {
pub fn new(
series_id: OptionSeriesId,
strike_range: StrikeRange,
atm_tracker: AtmTracker,
instruments: HashMap<InstrumentId, (Price, OptionKind)>,
) -> Self {
let all_strikes = Self::sorted_strikes(&instruments);
let atm_price = atm_tracker.atm_price();
let active_strikes: HashSet<Price> = strike_range
.resolve(atm_price, &all_strikes)
.into_iter()
.collect();
let active_ids: HashSet<InstrumentId> = instruments
.iter()
.filter(|(_, (strike, _))| active_strikes.contains(strike))
.map(|(id, _)| *id)
.collect();
let last_atm_strike =
atm_price.and_then(|atm| Self::find_closest_strike(&all_strikes, atm));
Self {
series_id,
strike_range,
atm_tracker,
instruments,
active_ids,
last_atm_strike,
hysteresis: DEFAULT_REBALANCE_HYSTERESIS,
cooldown_ns: DEFAULT_REBALANCE_COOLDOWN_NS,
last_rebalance_ns: None,
max_ts_event: UnixNanos::default(),
pending_greeks: HashMap::new(),
call_buffer: BTreeMap::new(),
put_buffer: BTreeMap::new(),
}
}
pub fn atm_tracker_mut(&mut self) -> &mut AtmTracker {
&mut self.atm_tracker
}
#[must_use]
pub fn instrument_ids(&self) -> Vec<InstrumentId> {
self.active_ids.iter().copied().collect()
}
#[must_use]
pub fn active_ids(&self) -> &HashSet<InstrumentId> {
&self.active_ids
}
#[must_use]
pub fn series_id(&self) -> OptionSeriesId {
self.series_id
}
#[must_use]
pub fn is_expired(&self, now_ns: UnixNanos) -> bool {
now_ns >= self.series_id.expiration_ns
}
#[must_use]
pub fn instruments(&self) -> &HashMap<InstrumentId, (Price, OptionKind)> {
&self.instruments
}
#[must_use]
pub fn all_instrument_ids(&self) -> Vec<InstrumentId> {
self.instruments.keys().copied().collect()
}
#[must_use]
pub fn is_catalog_empty(&self) -> bool {
self.instruments.is_empty()
}
#[must_use]
pub fn remove_instrument(&mut self, instrument_id: &InstrumentId) -> bool {
let Some((strike, kind)) = self.instruments.remove(instrument_id) else {
return false;
};
self.active_ids.remove(instrument_id);
self.pending_greeks.remove(instrument_id);
let has_sibling = self
.instruments
.values()
.any(|(s, k)| *s == strike && *k == kind);
if !has_sibling {
let buffer = match kind {
OptionKind::Call => &mut self.call_buffer,
OptionKind::Put => &mut self.put_buffer,
};
buffer.remove(&strike);
}
true
}
#[must_use]
pub fn atm_tracker(&self) -> &AtmTracker {
&self.atm_tracker
}
pub fn recompute_active_set(&mut self) -> Vec<InstrumentId> {
let atm_price = self.atm_tracker.atm_price();
let all_strikes = Self::sorted_strikes(&self.instruments);
let active_strikes: HashSet<Price> = self
.strike_range
.resolve(atm_price, &all_strikes)
.into_iter()
.collect();
self.active_ids = self
.instruments
.iter()
.filter(|(_, (strike, _))| active_strikes.contains(strike))
.map(|(id, _)| *id)
.collect();
self.last_atm_strike =
atm_price.and_then(|atm| Self::find_closest_strike(&all_strikes, atm));
self.active_ids.iter().copied().collect()
}
#[must_use]
pub fn add_instrument(
&mut self,
instrument_id: InstrumentId,
strike: Price,
kind: OptionKind,
) -> bool {
if self.instruments.contains_key(&instrument_id) {
return false;
}
self.instruments.insert(instrument_id, (strike, kind));
let all_strikes = Self::sorted_strikes(&self.instruments);
let atm_price = self.atm_tracker.atm_price();
let active_strikes: HashSet<Price> = self
.strike_range
.resolve(atm_price, &all_strikes)
.into_iter()
.collect();
if active_strikes.contains(&strike) {
self.active_ids.insert(instrument_id);
}
true
}
fn sorted_strikes(instruments: &HashMap<InstrumentId, (Price, OptionKind)>) -> Vec<Price> {
let mut strikes: Vec<Price> = instruments.values().map(|(s, _)| *s).collect();
strikes.sort();
strikes.dedup();
strikes
}
fn find_closest_strike(all_strikes: &[Price], atm: Price) -> Option<Price> {
all_strikes
.iter()
.min_by(|a, b| {
let da = (a.as_f64() - atm.as_f64()).abs();
let db = (b.as_f64() - atm.as_f64()).abs();
da.partial_cmp(&db).unwrap_or(std::cmp::Ordering::Equal)
})
.copied()
}
pub fn update_quote(&mut self, quote: &QuoteTick) {
if self.is_expired(quote.ts_event) {
log::warn!(
"Dropping quote for {}, series {} expired at {}",
quote.instrument_id,
self.series_id,
self.series_id.expiration_ns,
);
return;
}
if !self.active_ids.contains("e.instrument_id) {
return;
}
if let Some(&(strike, kind)) = self.instruments.get("e.instrument_id) {
if quote.ts_event > self.max_ts_event {
self.max_ts_event = quote.ts_event;
}
let buffer = match kind {
OptionKind::Call => &mut self.call_buffer,
OptionKind::Put => &mut self.put_buffer,
};
match buffer.get_mut(&strike) {
Some(data) => data.quote = *quote,
None => {
let greeks = self.pending_greeks.remove("e.instrument_id);
buffer.insert(
strike,
OptionStrikeData {
quote: *quote,
greeks,
},
);
}
}
}
}
pub fn update_greeks(&mut self, greeks: &OptionGreeks) {
if self.is_expired(greeks.ts_event) {
log::warn!(
"Dropping greeks for {}, series {} expired at {}",
greeks.instrument_id,
self.series_id,
self.series_id.expiration_ns,
);
return;
}
if !self.active_ids.contains(&greeks.instrument_id) {
return;
}
if let Some(&(strike, kind)) = self.instruments.get(&greeks.instrument_id) {
let buffer = match kind {
OptionKind::Call => &mut self.call_buffer,
OptionKind::Put => &mut self.put_buffer,
};
match buffer.get_mut(&strike) {
Some(data) => data.greeks = Some(*greeks),
None => {
self.pending_greeks.insert(greeks.instrument_id, *greeks);
}
}
}
}
pub fn snapshot(&self, ts_init: UnixNanos) -> OptionChainSlice {
let atm_price = self.atm_tracker.atm_price();
let catalog_strikes = Self::sorted_strikes(&self.instruments);
let atm_strike = atm_price.and_then(|atm| Self::find_closest_strike(&catalog_strikes, atm));
let active_strikes: HashSet<Price> = self
.active_ids
.iter()
.filter_map(|id| self.instruments.get(id).map(|(s, _)| *s))
.collect();
let mut calls = BTreeMap::new();
for (strike, data) in &self.call_buffer {
if active_strikes.contains(strike) {
calls.insert(*strike, data.clone());
}
}
let mut puts = BTreeMap::new();
for (strike, data) in &self.put_buffer {
if active_strikes.contains(strike) {
puts.insert(*strike, data.clone());
}
}
let ts_event = if self.max_ts_event == UnixNanos::default() {
ts_init
} else {
self.max_ts_event
};
OptionChainSlice {
series_id: self.series_id,
atm_strike,
calls,
puts,
ts_event,
ts_init,
}
}
#[must_use]
pub fn is_buffer_empty(&self) -> bool {
self.call_buffer.is_empty() && self.put_buffer.is_empty()
}
#[must_use]
pub fn check_rebalance(&self, now_ns: UnixNanos) -> Option<RebalanceAction> {
if matches!(self.strike_range, StrikeRange::Fixed(_)) {
return None;
}
let atm_price = self.atm_tracker.atm_price()?;
let all_strikes = Self::sorted_strikes(&self.instruments);
let current_atm_strike = Self::find_closest_strike(&all_strikes, atm_price)?;
if self.last_atm_strike == Some(current_atm_strike) {
return None;
}
if let Some(last_strike) = self.last_atm_strike
&& self.hysteresis > 0.0
{
let last_f = last_strike.as_f64();
let atm_f = atm_price.as_f64();
let direction = atm_f - last_f;
let next_strike = if direction > 0.0 {
all_strikes.iter().find(|s| s.as_f64() > last_f)
} else {
all_strikes.iter().rev().find(|s| s.as_f64() < last_f)
};
if let Some(next) = next_strike {
let gap = (next.as_f64() - last_f).abs();
let threshold = last_f + direction.signum() * self.hysteresis * gap;
if direction > 0.0 && atm_f < threshold {
return None;
}
if direction < 0.0 && atm_f > threshold {
return None;
}
}
}
if self.cooldown_ns > 0
&& let Some(last_ts) = self.last_rebalance_ns
&& now_ns.as_u64().saturating_sub(last_ts.as_u64()) < self.cooldown_ns
{
return None;
}
let new_active_strikes: HashSet<Price> = self
.strike_range
.resolve(Some(atm_price), &all_strikes)
.into_iter()
.collect();
let new_active: HashSet<InstrumentId> = self
.instruments
.iter()
.filter(|(_, (s, _))| new_active_strikes.contains(s))
.map(|(id, _)| *id)
.collect();
let add = new_active.difference(&self.active_ids).copied().collect();
let remove = self.active_ids.difference(&new_active).copied().collect();
Some(RebalanceAction { add, remove })
}
pub fn apply_rebalance(&mut self, action: &RebalanceAction, now_ns: UnixNanos) {
for id in &action.add {
self.active_ids.insert(*id);
}
for id in &action.remove {
self.active_ids.remove(id);
}
let active_strikes: HashSet<Price> = self
.active_ids
.iter()
.filter_map(|id| self.instruments.get(id))
.map(|(s, _)| *s)
.collect();
self.call_buffer
.retain(|strike, _| active_strikes.contains(strike));
self.put_buffer
.retain(|strike, _| active_strikes.contains(strike));
self.pending_greeks
.retain(|id, _| self.active_ids.contains(id));
if let Some(atm) = self.atm_tracker.atm_price() {
let all_strikes = Self::sorted_strikes(&self.instruments);
self.last_atm_strike = Self::find_closest_strike(&all_strikes, atm);
}
self.last_rebalance_ns = Some(now_ns);
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RebalanceAction {
pub add: Vec<InstrumentId>,
pub remove: Vec<InstrumentId>,
}
#[cfg(test)]
impl OptionChainAggregator {
fn call_buffer_len(&self) -> usize {
self.call_buffer.len()
}
fn put_buffer_len(&self) -> usize {
self.put_buffer.len()
}
fn get_call_greeks_from_buffer(&self, strike: &Price) -> Option<&OptionGreeks> {
self.call_buffer.get(strike).and_then(|d| d.greeks.as_ref())
}
pub(crate) fn last_atm_strike(&self) -> Option<Price> {
self.last_atm_strike
}
fn set_hysteresis(&mut self, h: f64) {
self.hysteresis = h;
}
fn set_cooldown_ns(&mut self, ns: u64) {
self.cooldown_ns = ns;
}
fn pending_greeks_count(&self) -> usize {
self.pending_greeks.len()
}
}
#[cfg(test)]
mod tests {
use nautilus_model::{data::greeks::OptionGreekValues, identifiers::Venue, types::Quantity};
use rstest::*;
use super::*;
fn make_series_id() -> OptionSeriesId {
OptionSeriesId::new(
Venue::new("DERIBIT"),
ustr::Ustr::from("BTC"),
ustr::Ustr::from("BTC"),
UnixNanos::from(1_700_000_000_000_000_000u64),
)
}
fn make_quote(instrument_id: InstrumentId, bid: &str, ask: &str) -> QuoteTick {
QuoteTick::new(
instrument_id,
Price::from(bid),
Price::from(ask),
Quantity::from("1.0"),
Quantity::from("1.0"),
UnixNanos::from(1u64),
UnixNanos::from(1u64),
)
}
fn now() -> UnixNanos {
UnixNanos::from(1_000_000_000_000_000_000u64)
}
fn set_atm_via_greeks(agg: &mut OptionChainAggregator, price: f64) {
let greeks = OptionGreeks {
instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
underlying_price: Some(price),
..Default::default()
};
agg.atm_tracker_mut().update_from_option_greeks(&greeks);
}
fn make_aggregator() -> (OptionChainAggregator, InstrumentId, InstrumentId) {
let call_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
let put_id = InstrumentId::from("BTC-20240101-50000-P.DERIBIT");
let strike = Price::from("50000");
let mut instrument_map = HashMap::new();
instrument_map.insert(call_id, (strike, OptionKind::Call));
instrument_map.insert(put_id, (strike, OptionKind::Put));
let tracker = AtmTracker::new();
let agg = OptionChainAggregator::new(
make_series_id(),
StrikeRange::Fixed(vec![strike]),
tracker,
instrument_map,
);
(agg, call_id, put_id)
}
#[rstest]
fn test_aggregator_instrument_ids() {
let (agg, call_id, put_id) = make_aggregator();
let ids = agg.instrument_ids();
assert_eq!(ids.len(), 2);
assert!(ids.contains(&call_id));
assert!(ids.contains(&put_id));
}
#[rstest]
fn test_aggregator_update_quote() {
let (mut agg, call_id, _) = make_aggregator();
let quote = make_quote(call_id, "100.00", "101.00");
agg.update_quote("e);
assert_eq!(agg.call_buffer_len(), 1);
assert_eq!(agg.put_buffer_len(), 0);
}
#[rstest]
fn test_aggregator_update_greeks() {
let (mut agg, call_id, _) = make_aggregator();
let quote = make_quote(call_id, "100.00", "101.00");
agg.update_quote("e);
let greeks = OptionGreeks {
instrument_id: call_id,
greeks: OptionGreekValues {
delta: 0.55,
..Default::default()
},
..Default::default()
};
agg.update_greeks(&greeks);
let strike = Price::from("50000");
let data = agg.get_call_greeks_from_buffer(&strike);
assert!(data.is_some());
assert_eq!(data.unwrap().delta, 0.55);
}
#[rstest]
fn test_aggregator_snapshot_preserves_state() {
let (mut agg, call_id, _) = make_aggregator();
let quote = make_quote(call_id, "100.00", "101.00");
agg.update_quote("e);
let slice = agg.snapshot(UnixNanos::from(100u64));
assert_eq!(slice.call_count(), 1);
assert_eq!(slice.ts_init, UnixNanos::from(100u64));
assert!(!agg.is_buffer_empty());
let slice2 = agg.snapshot(UnixNanos::from(200u64));
assert_eq!(slice2.call_count(), 1);
assert_eq!(slice2.ts_init, UnixNanos::from(200u64));
}
#[rstest]
fn test_aggregator_ignores_unknown_instrument() {
let (mut agg, _, _) = make_aggregator();
let unknown_id = InstrumentId::from("ETH-20240101-3000-C.DERIBIT");
let quote = make_quote(unknown_id, "100.00", "101.00");
agg.update_quote("e);
assert!(agg.is_buffer_empty());
}
#[rstest]
fn test_check_rebalance_returns_none() {
let (agg, _, _) = make_aggregator();
assert!(agg.check_rebalance(now()).is_none());
}
fn make_multi_strike_aggregator() -> OptionChainAggregator {
let strikes = [45000, 47500, 50000, 52500, 55000];
let mut instruments = HashMap::new();
for s in &strikes {
let strike = Price::from(&s.to_string());
let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
let put_id = InstrumentId::from(&format!("BTC-20240101-{s}-P.DERIBIT"));
instruments.insert(call_id, (strike, OptionKind::Call));
instruments.insert(put_id, (strike, OptionKind::Put));
}
let tracker = AtmTracker::new();
let mut agg = OptionChainAggregator::new(
make_series_id(),
StrikeRange::AtmRelative {
strikes_above: 1,
strikes_below: 1,
},
tracker,
instruments,
);
agg.set_hysteresis(0.0);
agg.set_cooldown_ns(0);
agg
}
#[rstest]
fn test_check_rebalance_fixed_always_none() {
let (mut agg, _, _) = make_aggregator();
set_atm_via_greeks(&mut agg, 50000.0);
assert!(agg.check_rebalance(now()).is_none());
}
#[rstest]
fn test_check_rebalance_no_atm_returns_none() {
let agg = make_multi_strike_aggregator();
assert!(agg.check_rebalance(now()).is_none());
}
#[rstest]
fn test_check_rebalance_atm_unchanged_returns_none() {
let mut agg = make_multi_strike_aggregator();
set_atm_via_greeks(&mut agg, 50000.0);
let action = agg.check_rebalance(now()).unwrap();
agg.apply_rebalance(&action, now());
set_atm_via_greeks(&mut agg, 50200.0);
assert!(agg.check_rebalance(now()).is_none());
}
#[rstest]
fn test_check_rebalance_detects_atm_shift() {
let mut agg = make_multi_strike_aggregator();
set_atm_via_greeks(&mut agg, 50000.0);
let action = agg.check_rebalance(now()).unwrap();
agg.apply_rebalance(&action, now());
assert_eq!(agg.instrument_ids().len(), 6);
set_atm_via_greeks(&mut agg, 55000.0);
let action2 = agg.check_rebalance(now()).unwrap();
assert!(!action2.add.is_empty() || !action2.remove.is_empty());
}
#[rstest]
fn test_apply_rebalance_updates_instrument_map() {
let mut agg = make_multi_strike_aggregator();
set_atm_via_greeks(&mut agg, 50000.0);
let action = agg.check_rebalance(now()).unwrap();
agg.apply_rebalance(&action, now());
let active_ids = agg.instrument_ids();
assert_eq!(active_ids.len(), 6);
set_atm_via_greeks(&mut agg, 55000.0);
let action2 = agg.check_rebalance(now()).unwrap();
agg.apply_rebalance(&action2, now());
let active_ids2 = agg.instrument_ids();
assert_eq!(active_ids2.len(), 4); }
#[rstest]
fn test_apply_rebalance_cleans_buffers() {
let mut agg = make_multi_strike_aggregator();
set_atm_via_greeks(&mut agg, 50000.0);
let action = agg.check_rebalance(now()).unwrap();
agg.apply_rebalance(&action, now());
let call_47500 = InstrumentId::from("BTC-20240101-47500-C.DERIBIT");
let quote = make_quote(call_47500, "100.00", "101.00");
agg.update_quote("e);
assert_eq!(agg.call_buffer_len(), 1);
set_atm_via_greeks(&mut agg, 55000.0);
let action2 = agg.check_rebalance(now()).unwrap();
agg.apply_rebalance(&action2, now());
assert_eq!(agg.call_buffer_len(), 0);
}
#[rstest]
fn test_initial_active_set_empty_when_no_atm() {
let agg = make_multi_strike_aggregator();
assert_eq!(agg.instrument_ids().len(), 0);
assert_eq!(agg.all_instrument_ids().len(), 10);
}
#[rstest]
fn test_catalog_vs_active_separation() {
let mut agg = make_multi_strike_aggregator();
set_atm_via_greeks(&mut agg, 50000.0);
let action = agg.check_rebalance(now()).unwrap();
agg.apply_rebalance(&action, now());
assert_eq!(agg.instruments().len(), 10);
assert_eq!(agg.instrument_ids().len(), 6);
}
#[rstest]
fn test_add_instrument_already_known() {
let (mut agg, call_id, _) = make_aggregator();
let strike = Price::from("50000");
let count_before = agg.instruments().len();
let result = agg.add_instrument(call_id, strike, OptionKind::Call);
assert!(!result);
assert_eq!(agg.instruments().len(), count_before);
}
#[rstest]
fn test_add_instrument_new_in_active_range() {
let (mut agg, _, _) = make_aggregator();
let new_id = InstrumentId::from("BTC-20240101-50000-C2.DERIBIT");
let strike = Price::from("50000");
let result = agg.add_instrument(new_id, strike, OptionKind::Call);
assert!(result);
assert_eq!(agg.instruments().len(), 3);
assert!(agg.active_ids().contains(&new_id));
}
#[rstest]
fn test_add_instrument_new_out_of_range() {
let (mut agg, _, _) = make_aggregator();
let new_id = InstrumentId::from("BTC-20240101-60000-C.DERIBIT");
let strike = Price::from("60000");
let result = agg.add_instrument(new_id, strike, OptionKind::Call);
assert!(result);
assert_eq!(agg.instruments().len(), 3);
assert!(!agg.active_ids().contains(&new_id));
}
#[rstest]
fn test_add_instrument_available_for_rebalance() {
let mut agg = make_multi_strike_aggregator();
set_atm_via_greeks(&mut agg, 50000.0);
let action = agg.check_rebalance(now()).unwrap();
agg.apply_rebalance(&action, now());
assert_eq!(agg.instrument_ids().len(), 6);
let new_id = InstrumentId::from("BTC-20240101-57500-C.DERIBIT");
let strike = Price::from("57500");
let result = agg.add_instrument(new_id, strike, OptionKind::Call);
assert!(result);
assert!(!agg.active_ids().contains(&new_id));
set_atm_via_greeks(&mut agg, 57500.0);
let action2 = agg.check_rebalance(now()).unwrap();
agg.apply_rebalance(&action2, now());
assert!(agg.active_ids().contains(&new_id));
}
#[rstest]
fn test_hysteresis_blocks_small_movement() {
let strikes = [47500, 50000, 52500];
let mut instruments = HashMap::new();
for s in &strikes {
let strike = Price::from(&s.to_string());
let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
instruments.insert(call_id, (strike, OptionKind::Call));
}
let tracker = AtmTracker::new();
let mut agg = OptionChainAggregator::new(
make_series_id(),
StrikeRange::AtmRelative {
strikes_above: 1,
strikes_below: 1,
},
tracker,
instruments,
);
agg.set_hysteresis(0.6);
agg.set_cooldown_ns(0);
set_atm_via_greeks(&mut agg, 50000.0);
let action = agg.check_rebalance(now()).unwrap();
agg.apply_rebalance(&action, now());
assert_eq!(agg.last_atm_strike(), Some(Price::from("50000")));
set_atm_via_greeks(&mut agg, 51000.0);
assert!(agg.check_rebalance(now()).is_none());
}
#[rstest]
fn test_hysteresis_allows_large_movement() {
let strikes = [47500, 50000, 52500];
let mut instruments = HashMap::new();
for s in &strikes {
let strike = Price::from(&s.to_string());
let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
instruments.insert(call_id, (strike, OptionKind::Call));
}
let tracker = AtmTracker::new();
let mut agg = OptionChainAggregator::new(
make_series_id(),
StrikeRange::AtmRelative {
strikes_above: 1,
strikes_below: 1,
},
tracker,
instruments,
);
agg.set_hysteresis(0.6);
agg.set_cooldown_ns(0);
set_atm_via_greeks(&mut agg, 50000.0);
let action = agg.check_rebalance(now()).unwrap();
agg.apply_rebalance(&action, now());
set_atm_via_greeks(&mut agg, 52000.0);
assert!(agg.check_rebalance(now()).is_some());
}
#[rstest]
fn test_zero_hysteresis_disables_guard() {
let mut agg = make_multi_strike_aggregator();
agg.set_hysteresis(0.0);
agg.set_cooldown_ns(0);
set_atm_via_greeks(&mut agg, 50000.0);
let action = agg.check_rebalance(now()).unwrap();
agg.apply_rebalance(&action, now());
set_atm_via_greeks(&mut agg, 52500.0);
assert!(agg.check_rebalance(now()).is_some());
}
#[rstest]
fn test_cooldown_blocks_rapid_rebalance() {
let mut agg = make_multi_strike_aggregator();
agg.set_hysteresis(0.0);
agg.set_cooldown_ns(5_000_000_000);
set_atm_via_greeks(&mut agg, 50000.0);
let t0 = now();
let action = agg.check_rebalance(t0).unwrap();
agg.apply_rebalance(&action, t0);
set_atm_via_greeks(&mut agg, 55000.0);
let t1 = UnixNanos::from(t0.as_u64() + 1_000_000_000); assert!(agg.check_rebalance(t1).is_none());
}
#[rstest]
fn test_cooldown_allows_after_elapsed() {
let mut agg = make_multi_strike_aggregator();
agg.set_hysteresis(0.0);
agg.set_cooldown_ns(5_000_000_000);
set_atm_via_greeks(&mut agg, 50000.0);
let t0 = now();
let action = agg.check_rebalance(t0).unwrap();
agg.apply_rebalance(&action, t0);
set_atm_via_greeks(&mut agg, 55000.0);
let t1 = UnixNanos::from(t0.as_u64() + 6_000_000_000); assert!(agg.check_rebalance(t1).is_some());
}
#[rstest]
fn test_zero_cooldown_disables_guard() {
let mut agg = make_multi_strike_aggregator();
agg.set_hysteresis(0.0);
agg.set_cooldown_ns(0);
set_atm_via_greeks(&mut agg, 50000.0);
let t0 = now();
let action = agg.check_rebalance(t0).unwrap();
agg.apply_rebalance(&action, t0);
set_atm_via_greeks(&mut agg, 55000.0);
assert!(agg.check_rebalance(t0).is_some());
}
#[rstest]
fn test_pending_greeks_consumed_on_first_quote() {
let (mut agg, call_id, _) = make_aggregator();
let greeks = OptionGreeks {
instrument_id: call_id,
greeks: OptionGreekValues {
delta: 0.55,
..Default::default()
},
..Default::default()
};
agg.update_greeks(&greeks);
assert_eq!(agg.pending_greeks_count(), 1);
let quote = make_quote(call_id, "100.00", "101.00");
agg.update_quote("e);
assert_eq!(agg.pending_greeks_count(), 0);
let strike = Price::from("50000");
let data = agg.get_call_greeks_from_buffer(&strike);
assert!(data.is_some());
assert_eq!(data.unwrap().delta, 0.55);
}
#[rstest]
fn test_snapshot_ts_event_reflects_max_quote_timestamp() {
let (mut agg, call_id, put_id) = make_aggregator();
let quote1 = QuoteTick::new(
call_id,
Price::from("100.00"),
Price::from("101.00"),
Quantity::from("1.0"),
Quantity::from("1.0"),
UnixNanos::from(500u64), UnixNanos::from(500u64),
);
agg.update_quote("e1);
let quote2 = QuoteTick::new(
put_id,
Price::from("50.00"),
Price::from("51.00"),
Quantity::from("1.0"),
Quantity::from("1.0"),
UnixNanos::from(800u64), UnixNanos::from(800u64),
);
agg.update_quote("e2);
let slice = agg.snapshot(UnixNanos::from(1000u64));
assert_eq!(slice.ts_event, UnixNanos::from(800u64));
assert_eq!(slice.ts_init, UnixNanos::from(1000u64));
}
#[rstest]
fn test_snapshot_ts_event_fallback_when_no_quotes() {
let (agg, _, _) = make_aggregator();
let slice = agg.snapshot(UnixNanos::from(1000u64));
assert_eq!(slice.ts_event, UnixNanos::from(1000u64));
}
#[rstest]
fn test_snapshot_retains_buffered_data_during_hysteresis_window() {
let strikes = [47500, 50000, 52500];
let mut instruments = HashMap::new();
for s in &strikes {
let strike = Price::from(&s.to_string());
let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
instruments.insert(call_id, (strike, OptionKind::Call));
}
let tracker = AtmTracker::new();
let mut agg = OptionChainAggregator::new(
make_series_id(),
StrikeRange::AtmRelative {
strikes_above: 1,
strikes_below: 1,
},
tracker,
instruments,
);
agg.set_hysteresis(0.6);
agg.set_cooldown_ns(0);
set_atm_via_greeks(&mut agg, 50000.0);
let action = agg.check_rebalance(now()).unwrap();
agg.apply_rebalance(&action, now());
assert_eq!(agg.instrument_ids().len(), 3);
let q1 = make_quote(
InstrumentId::from("BTC-20240101-47500-C.DERIBIT"),
"3000.00",
"3100.00",
);
let q2 = make_quote(
InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
"1500.00",
"1600.00",
);
let q3 = make_quote(
InstrumentId::from("BTC-20240101-52500-C.DERIBIT"),
"500.00",
"600.00",
);
agg.update_quote(&q1);
agg.update_quote(&q2);
agg.update_quote(&q3);
assert_eq!(agg.call_buffer_len(), 3);
set_atm_via_greeks(&mut agg, 51000.0);
assert!(agg.check_rebalance(now()).is_none());
let slice = agg.snapshot(UnixNanos::from(100u64));
assert_eq!(slice.call_count(), 3);
}
#[rstest]
fn test_remove_instrument_from_catalog() {
let (mut agg, call_id, put_id) = make_aggregator();
assert_eq!(agg.instruments().len(), 2);
let removed = agg.remove_instrument(&call_id);
assert!(removed);
assert_eq!(agg.instruments().len(), 1);
assert!(!agg.active_ids().contains(&call_id));
assert!(agg.instruments().contains_key(&put_id));
}
#[rstest]
fn test_remove_instrument_cleans_buffer() {
let (mut agg, call_id, _) = make_aggregator();
let quote = make_quote(call_id, "100.00", "101.00");
agg.update_quote("e);
assert_eq!(agg.call_buffer_len(), 1);
let _ = agg.remove_instrument(&call_id);
assert_eq!(agg.call_buffer_len(), 0);
}
#[rstest]
fn test_remove_instrument_preserves_sibling_buffer() {
let (mut agg, call_id, _) = make_aggregator();
let sibling_id = InstrumentId::from("BTC-20240101-50000-C2.DERIBIT");
let strike = Price::from("50000");
let _ = agg.add_instrument(sibling_id, strike, OptionKind::Call);
let quote = make_quote(call_id, "100.00", "101.00");
agg.update_quote("e);
assert_eq!(agg.call_buffer_len(), 1);
let _ = agg.remove_instrument(&call_id);
assert_eq!(agg.call_buffer_len(), 1); assert!(agg.instruments().contains_key(&sibling_id));
}
#[rstest]
fn test_remove_instrument_unknown_noop() {
let (mut agg, _, _) = make_aggregator();
let unknown = InstrumentId::from("ETH-20240101-3000-C.DERIBIT");
assert!(!agg.remove_instrument(&unknown));
assert_eq!(agg.instruments().len(), 2);
}
#[rstest]
fn test_remove_instrument_cleans_pending_greeks() {
let (mut agg, call_id, _) = make_aggregator();
let greeks = OptionGreeks {
instrument_id: call_id,
greeks: OptionGreekValues {
delta: 0.55,
..Default::default()
},
..Default::default()
};
agg.update_greeks(&greeks);
assert_eq!(agg.pending_greeks_count(), 1);
let _ = agg.remove_instrument(&call_id);
assert_eq!(agg.pending_greeks_count(), 0);
}
#[rstest]
fn test_is_catalog_empty_after_full_removal() {
let (mut agg, call_id, put_id) = make_aggregator();
assert!(!agg.is_catalog_empty());
let _ = agg.remove_instrument(&call_id);
assert!(!agg.is_catalog_empty());
let _ = agg.remove_instrument(&put_id);
assert!(agg.is_catalog_empty());
}
#[rstest]
fn test_expired_quote_is_dropped() {
let (mut agg, call_id, _) = make_aggregator();
let expired_quote = QuoteTick::new(
call_id,
Price::from("100.00"),
Price::from("101.00"),
Quantity::from("1.0"),
Quantity::from("1.0"),
UnixNanos::from(1_700_000_000_000_000_000u64),
UnixNanos::from(1_700_000_000_000_000_000u64),
);
agg.update_quote(&expired_quote);
assert!(agg.is_buffer_empty());
}
#[rstest]
fn test_expired_greeks_are_dropped() {
let (mut agg, call_id, _) = make_aggregator();
let quote = make_quote(call_id, "100.00", "101.00");
agg.update_quote("e);
assert_eq!(agg.call_buffer_len(), 1);
let greeks = OptionGreeks {
instrument_id: call_id,
ts_event: UnixNanos::from(1_700_000_000_000_000_000u64),
greeks: OptionGreekValues {
delta: 0.55,
..Default::default()
},
..Default::default()
};
agg.update_greeks(&greeks);
let strike = Price::from("50000");
assert!(agg.get_call_greeks_from_buffer(&strike).is_none());
}
}