use std::{cell::RefCell, collections::HashMap, rc::Rc};
use nautilus_common::{
cache::Cache,
clock::Clock,
messages::data::{
SubscribeCommand, SubscribeInstrumentStatus, SubscribeOptionChain, SubscribeOptionGreeks,
SubscribeQuotes, UnsubscribeCommand, UnsubscribeInstrumentStatus, UnsubscribeOptionGreeks,
UnsubscribeQuotes,
},
msgbus::{self, MStr, Topic, TypedHandler, switchboard},
timer::{TimeEvent, TimeEventCallback},
};
use nautilus_core::{UUID4, correctness::FAILED, datetime::millis_to_nanos_unchecked};
use nautilus_model::{
data::{QuoteTick, option_chain::OptionGreeks},
enums::OptionKind,
identifiers::{InstrumentId, OptionSeriesId, Venue},
instruments::Instrument,
types::Price,
};
use ustr::Ustr;
use super::{
AtmTracker, OptionChainAggregator,
handlers::{OptionChainGreeksHandler, OptionChainQuoteHandler, OptionChainSlicePublisher},
};
use crate::{
client::DataClientAdapter,
engine::{DeferredCommand, DeferredCommandQueue},
};
#[derive(Debug)]
pub struct OptionChainManager {
aggregator: OptionChainAggregator,
topic: MStr<Topic>,
quote_handlers: Vec<TypedHandler<QuoteTick>>,
greeks_handlers: Vec<TypedHandler<OptionGreeks>>,
timer_name: Option<Ustr>,
msgbus_priority: u8,
bootstrapped: bool,
deferred_cmd_queue: DeferredCommandQueue,
clock: Rc<RefCell<dyn Clock>>,
raw_mode: bool,
}
impl OptionChainManager {
#[allow(clippy::too_many_arguments)]
pub(crate) fn create_and_setup(
series_id: OptionSeriesId,
cache: &Rc<RefCell<Cache>>,
cmd: &SubscribeOptionChain,
clock: &Rc<RefCell<dyn Clock>>,
msgbus_priority: u8,
client: Option<&mut DataClientAdapter>,
initial_atm_price: Option<Price>,
deferred_cmd_queue: DeferredCommandQueue,
) -> Rc<RefCell<Self>> {
let topic = switchboard::get_option_chain_topic(series_id);
let instruments = Self::resolve_instruments(cache, &series_id);
let mut tracker = AtmTracker::new();
if let Some((strike, _)) = instruments.values().next() {
tracker.set_forward_precision(strike.precision);
}
if let Some(price) = initial_atm_price {
tracker.set_initial_price(price);
log::info!("Pre-populated ATM with forward price: {price}");
}
let aggregator =
OptionChainAggregator::new(series_id, cmd.strike_range.clone(), tracker, instruments);
let active_instrument_ids = aggregator.instrument_ids();
let all_instrument_ids = aggregator.all_instrument_ids();
let bootstrapped = !active_instrument_ids.is_empty() || all_instrument_ids.is_empty();
let raw_mode = cmd.snapshot_interval_ms.is_none();
let manager = Self {
aggregator,
topic,
quote_handlers: Vec::new(),
greeks_handlers: Vec::new(),
timer_name: None,
msgbus_priority,
bootstrapped,
deferred_cmd_queue,
clock: clock.clone(),
raw_mode,
};
let manager_rc = Rc::new(RefCell::new(manager));
let (quote_handlers, _quote_handler) = Self::register_quote_handlers(
&manager_rc,
&active_instrument_ids,
series_id,
msgbus_priority,
);
let greeks_handlers = Self::register_greeks_handlers(
&manager_rc,
&active_instrument_ids,
series_id,
msgbus_priority,
);
Self::forward_client_subscriptions(
client,
&active_instrument_ids,
cmd,
series_id.venue,
clock,
);
let timer_name = cmd
.snapshot_interval_ms
.map(|ms| Self::setup_timer(&manager_rc, series_id, ms, clock));
{
let mut mgr = manager_rc.borrow_mut();
mgr.quote_handlers = quote_handlers;
mgr.greeks_handlers = greeks_handlers;
mgr.timer_name = timer_name;
}
let mode_str = match cmd.snapshot_interval_ms {
Some(ms) => format!("interval={ms}ms"),
None => "mode=raw".to_string(),
};
log::info!(
"Subscribed option chain for {series_id} ({} active/{} total instruments, {mode_str})",
active_instrument_ids.len(),
all_instrument_ids.len(),
);
manager_rc
}
fn register_quote_handlers(
manager_rc: &Rc<RefCell<Self>>,
instrument_ids: &[InstrumentId],
series_id: OptionSeriesId,
priority: u8,
) -> (Vec<TypedHandler<QuoteTick>>, TypedHandler<QuoteTick>) {
let quote_handler = TypedHandler::new(OptionChainQuoteHandler::new(manager_rc, series_id));
let mut handlers = Vec::with_capacity(instrument_ids.len() + 1);
handlers.push(quote_handler.clone());
for instrument_id in instrument_ids {
let topic = switchboard::get_quotes_topic(*instrument_id);
msgbus::subscribe_quotes(topic.into(), quote_handler.clone(), Some(priority));
handlers.push(quote_handler.clone());
}
(handlers, quote_handler)
}
fn register_greeks_handlers(
manager_rc: &Rc<RefCell<Self>>,
instrument_ids: &[InstrumentId],
series_id: OptionSeriesId,
priority: u8,
) -> Vec<TypedHandler<OptionGreeks>> {
let greeks_handler =
TypedHandler::new(OptionChainGreeksHandler::new(manager_rc, series_id));
let mut handlers = Vec::with_capacity(instrument_ids.len() + 1);
handlers.push(greeks_handler.clone());
for instrument_id in instrument_ids {
let topic = switchboard::get_option_greeks_topic(*instrument_id);
msgbus::subscribe_option_greeks(topic.into(), greeks_handler.clone(), Some(priority));
handlers.push(greeks_handler.clone());
}
handlers
}
fn forward_client_subscriptions(
client: Option<&mut DataClientAdapter>,
instrument_ids: &[InstrumentId],
cmd: &SubscribeOptionChain,
venue: Venue,
clock: &Rc<RefCell<dyn Clock>>,
) {
let ts_init = clock.borrow().timestamp_ns();
let Some(client) = client else {
log::error!(
"Cannot forward option chain subscriptions: no client found for venue={venue}",
);
return;
};
for instrument_id in instrument_ids {
client.execute_subscribe(&SubscribeCommand::Quotes(SubscribeQuotes {
instrument_id: *instrument_id,
client_id: cmd.client_id,
venue: Some(venue),
command_id: UUID4::new(),
ts_init,
correlation_id: None,
params: None,
}));
client.execute_subscribe(&SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
instrument_id: *instrument_id,
client_id: cmd.client_id,
venue: Some(venue),
command_id: UUID4::new(),
ts_init,
correlation_id: None,
params: None,
}));
client.execute_subscribe(&SubscribeCommand::InstrumentStatus(
SubscribeInstrumentStatus {
instrument_id: *instrument_id,
client_id: cmd.client_id,
venue: Some(venue),
command_id: UUID4::new(),
ts_init,
correlation_id: None,
params: None,
},
));
}
log::info!(
"Forwarded {} quote + greeks + instrument status subscriptions to DataClient",
instrument_ids.len(),
);
}
fn setup_timer(
manager_rc: &Rc<RefCell<Self>>,
series_id: OptionSeriesId,
interval_ms: u64,
clock: &Rc<RefCell<dyn Clock>>,
) -> Ustr {
let interval_ns = millis_to_nanos_unchecked(interval_ms as f64);
let publisher = OptionChainSlicePublisher::new(manager_rc);
let timer_name = Ustr::from(&format!("OptionChain|{series_id}|{interval_ms}"));
let now_ns = clock.borrow().timestamp_ns().as_u64();
let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
let callback_fn: Rc<dyn Fn(TimeEvent)> = Rc::new(move |event| publisher.publish(&event));
let callback = TimeEventCallback::from(callback_fn);
clock
.borrow_mut()
.set_timer_ns(
&timer_name,
interval_ns,
Some(start_time_ns.into()),
None,
Some(callback),
None,
None,
)
.expect(FAILED);
timer_name
}
#[must_use]
pub fn all_instrument_ids(&self) -> Vec<InstrumentId> {
self.aggregator.all_instrument_ids()
}
#[must_use]
pub fn venue(&self) -> Venue {
self.aggregator.series_id().venue
}
pub fn teardown(&mut self, clock: &Rc<RefCell<dyn Clock>>) {
let instrument_ids = self.aggregator.instrument_ids();
if let Some(handler) = self.quote_handlers.first() {
for instrument_id in &instrument_ids {
let topic = switchboard::get_quotes_topic(*instrument_id);
msgbus::unsubscribe_quotes(topic.into(), handler);
}
}
if let Some(handler) = self.greeks_handlers.first() {
for instrument_id in &instrument_ids {
let topic = switchboard::get_option_greeks_topic(*instrument_id);
msgbus::unsubscribe_option_greeks(topic.into(), handler);
}
}
if let Some(timer_name) = self.timer_name.take() {
let mut clk = clock.borrow_mut();
if clk.timer_exists(&timer_name) {
clk.cancel_timer(&timer_name);
}
}
self.quote_handlers.clear();
self.greeks_handlers.clear();
}
pub fn handle_greeks(&mut self, greeks: &OptionGreeks) {
self.aggregator
.atm_tracker_mut()
.update_from_option_greeks(greeks);
self.aggregator.update_greeks(greeks);
self.maybe_bootstrap();
if self.raw_mode
&& self.bootstrapped
&& self.aggregator.active_ids().contains(&greeks.instrument_id)
{
self.publish_slice(greeks.ts_event);
}
}
pub fn handle_instrument_expired(&mut self, instrument_id: &InstrumentId) -> bool {
let was_active = self.aggregator.active_ids().contains(instrument_id);
if !self.aggregator.remove_instrument(instrument_id) {
return self.aggregator.is_catalog_empty();
}
if was_active {
if let Some(qh) = self.quote_handlers.first() {
let topic = switchboard::get_quotes_topic(*instrument_id);
msgbus::unsubscribe_quotes(topic.into(), qh);
}
if let Some(gh) = self.greeks_handlers.first() {
let topic = switchboard::get_option_greeks_topic(*instrument_id);
msgbus::unsubscribe_option_greeks(topic.into(), gh);
}
self.push_unsubscribe_commands(*instrument_id);
}
log::info!(
"Removed expired instrument {instrument_id} from option chain {} (was_active={was_active}, remaining={})",
self.aggregator.series_id(),
self.aggregator.instruments().len(),
);
self.aggregator.is_catalog_empty()
}
pub fn handle_quote(&mut self, quote: &QuoteTick) {
self.aggregator.update_quote(quote);
self.maybe_bootstrap();
if self.raw_mode
&& self.bootstrapped
&& self.aggregator.active_ids().contains("e.instrument_id)
{
self.publish_slice(quote.ts_event);
}
}
fn maybe_bootstrap(&mut self) {
if self.bootstrapped {
return;
}
if self.aggregator.atm_tracker().atm_price().is_none() {
return;
}
let active_ids = self.aggregator.recompute_active_set();
self.register_handlers_for_instruments_bulk(&active_ids);
for &id in &active_ids {
self.push_subscribe_commands(id);
}
self.bootstrapped = true;
log::info!(
"Bootstrapped option chain for {} ({} active instruments)",
self.aggregator.series_id(),
active_ids.len(),
);
}
fn register_handlers_for_instruments_bulk(&self, instrument_ids: &[InstrumentId]) {
for &id in instrument_ids {
self.register_handlers_for_instrument(id);
}
}
pub fn add_instrument(
&mut self,
instrument_id: InstrumentId,
strike: Price,
kind: OptionKind,
client: Option<&mut DataClientAdapter>,
clock: &Rc<RefCell<dyn Clock>>,
) -> bool {
if !self.aggregator.add_instrument(instrument_id, strike, kind) {
return false;
}
if self.aggregator.active_ids().contains(&instrument_id) {
self.register_handlers_for_instrument(instrument_id);
}
let venue = self.aggregator.series_id().venue;
Self::forward_instrument_subscriptions(client, instrument_id, venue, clock);
log::info!(
"Added instrument {instrument_id} to option chain {} (active={})",
self.aggregator.series_id(),
self.aggregator.active_ids().contains(&instrument_id),
);
true
}
fn register_handlers_for_instrument(&self, instrument_id: InstrumentId) {
if let Some(qh) = self.quote_handlers.first().cloned() {
let topic = switchboard::get_quotes_topic(instrument_id);
msgbus::subscribe_quotes(topic.into(), qh, Some(self.msgbus_priority));
}
if let Some(gh) = self.greeks_handlers.first().cloned() {
let topic = switchboard::get_option_greeks_topic(instrument_id);
msgbus::subscribe_option_greeks(topic.into(), gh, Some(self.msgbus_priority));
}
}
fn push_subscribe_commands(&self, instrument_id: InstrumentId) {
let venue = self.aggregator.series_id().venue;
let ts_init = self.clock.borrow().timestamp_ns();
let mut queue = self.deferred_cmd_queue.borrow_mut();
queue.push_back(DeferredCommand::Subscribe(SubscribeCommand::Quotes(
SubscribeQuotes {
instrument_id,
client_id: None,
venue: Some(venue),
command_id: UUID4::new(),
ts_init,
correlation_id: None,
params: None,
},
)));
queue.push_back(DeferredCommand::Subscribe(SubscribeCommand::OptionGreeks(
SubscribeOptionGreeks {
instrument_id,
client_id: None,
venue: Some(venue),
command_id: UUID4::new(),
ts_init,
correlation_id: None,
params: None,
},
)));
queue.push_back(DeferredCommand::Subscribe(
SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
instrument_id,
client_id: None,
venue: Some(venue),
command_id: UUID4::new(),
ts_init,
correlation_id: None,
params: None,
}),
));
}
fn push_unsubscribe_commands(&self, instrument_id: InstrumentId) {
let venue = self.aggregator.series_id().venue;
let ts_init = self.clock.borrow().timestamp_ns();
let mut queue = self.deferred_cmd_queue.borrow_mut();
queue.push_back(DeferredCommand::Unsubscribe(UnsubscribeCommand::Quotes(
UnsubscribeQuotes {
instrument_id,
client_id: None,
venue: Some(venue),
command_id: UUID4::new(),
ts_init,
correlation_id: None,
params: None,
},
)));
queue.push_back(DeferredCommand::Unsubscribe(
UnsubscribeCommand::OptionGreeks(UnsubscribeOptionGreeks {
instrument_id,
client_id: None,
venue: Some(venue),
command_id: UUID4::new(),
ts_init,
correlation_id: None,
params: None,
}),
));
queue.push_back(DeferredCommand::Unsubscribe(
UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
instrument_id,
client_id: None,
venue: Some(venue),
command_id: UUID4::new(),
ts_init,
correlation_id: None,
params: None,
}),
));
}
fn forward_instrument_subscriptions(
client: Option<&mut DataClientAdapter>,
instrument_id: InstrumentId,
venue: Venue,
clock: &Rc<RefCell<dyn Clock>>,
) {
let Some(client) = client else {
log::error!(
"Cannot forward subscriptions for {instrument_id}: no client for venue={venue}",
);
return;
};
let ts_init = clock.borrow().timestamp_ns();
client.execute_subscribe(&SubscribeCommand::Quotes(SubscribeQuotes {
instrument_id,
client_id: None,
venue: Some(venue),
command_id: UUID4::new(),
ts_init,
correlation_id: None,
params: None,
}));
client.execute_subscribe(&SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
instrument_id,
client_id: None,
venue: Some(venue),
command_id: UUID4::new(),
ts_init,
correlation_id: None,
params: None,
}));
client.execute_subscribe(&SubscribeCommand::InstrumentStatus(
SubscribeInstrumentStatus {
instrument_id,
client_id: None,
venue: Some(venue),
command_id: UUID4::new(),
ts_init,
correlation_id: None,
params: None,
},
));
}
fn maybe_rebalance(&mut self, now_ns: nautilus_core::UnixNanos) {
let Some(action) = self.aggregator.check_rebalance(now_ns) else {
return;
};
if let Some(qh) = self.quote_handlers.first() {
for id in &action.remove {
msgbus::unsubscribe_quotes(switchboard::get_quotes_topic(*id).into(), qh);
}
}
if let Some(gh) = self.greeks_handlers.first() {
for id in &action.remove {
msgbus::unsubscribe_option_greeks(
switchboard::get_option_greeks_topic(*id).into(),
gh,
);
}
}
if let Some(qh) = self.quote_handlers.first().cloned() {
for id in &action.add {
msgbus::subscribe_quotes(
switchboard::get_quotes_topic(*id).into(),
qh.clone(),
Some(self.msgbus_priority),
);
}
}
if let Some(gh) = self.greeks_handlers.first().cloned() {
for id in &action.add {
msgbus::subscribe_option_greeks(
switchboard::get_option_greeks_topic(*id).into(),
gh.clone(),
Some(self.msgbus_priority),
);
}
}
for &id in &action.add {
self.push_subscribe_commands(id);
}
for &id in &action.remove {
self.push_unsubscribe_commands(id);
}
if !action.add.is_empty() || !action.remove.is_empty() {
log::info!(
"Rebalanced option chain for {}: +{} -{} instruments",
self.aggregator.series_id(),
action.add.len(),
action.remove.len(),
);
}
self.aggregator.apply_rebalance(&action, now_ns);
}
pub fn publish_slice(&mut self, ts: nautilus_core::UnixNanos) {
if self.aggregator.is_expired(ts) {
self.deferred_cmd_queue
.borrow_mut()
.push_back(DeferredCommand::ExpireSeries(self.aggregator.series_id()));
return;
}
self.maybe_rebalance(ts);
let series_id = self.aggregator.series_id();
let slice = self.aggregator.snapshot(ts);
if slice.is_empty() {
log::debug!("OptionChainSlice empty for {series_id}, skipping publish");
return;
}
log::debug!(
"Publishing OptionChainSlice for {} (calls={}, puts={})",
series_id,
slice.call_count(),
slice.put_count(),
);
msgbus::publish_option_chain(self.topic, &slice);
}
fn resolve_instruments(
cache: &Rc<RefCell<Cache>>,
series_id: &OptionSeriesId,
) -> HashMap<InstrumentId, (Price, OptionKind)> {
let cache = cache.borrow();
let mut map = HashMap::new();
for instrument in cache.instruments(&series_id.venue, Some(&series_id.underlying)) {
let Some(expiration) = instrument.expiration_ns() else {
continue;
};
if expiration != series_id.expiration_ns {
continue;
}
if instrument.settlement_currency().code != series_id.settlement_currency {
continue;
}
let Some(strike) = instrument.strike_price() else {
continue;
};
let Some(kind) = instrument.option_kind() else {
continue;
};
map.insert(instrument.id(), (strike, kind));
}
map
}
}
#[cfg(test)]
mod tests {
use std::collections::VecDeque;
use nautilus_common::clock::TestClock;
use nautilus_core::UnixNanos;
use nautilus_model::{data::option_chain::StrikeRange, identifiers::Venue, types::Quantity};
use rstest::*;
use super::*;
fn make_series_id() -> OptionSeriesId {
OptionSeriesId::new(
Venue::new("DERIBIT"),
ustr::Ustr::from("BTC"),
ustr::Ustr::from("BTC"),
UnixNanos::from(1_700_000_000_000_000_000u64),
)
}
fn make_test_queue() -> DeferredCommandQueue {
Rc::new(RefCell::new(VecDeque::new()))
}
fn make_manager() -> (OptionChainManager, DeferredCommandQueue) {
let series_id = make_series_id();
let topic = switchboard::get_option_chain_topic(series_id);
let tracker = AtmTracker::new();
let aggregator = OptionChainAggregator::new(
series_id,
StrikeRange::Fixed(vec![]),
tracker,
HashMap::new(),
);
let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
let queue = make_test_queue();
let manager = OptionChainManager {
aggregator,
topic,
quote_handlers: Vec::new(),
greeks_handlers: Vec::new(),
timer_name: None,
msgbus_priority: 0,
bootstrapped: true,
deferred_cmd_queue: queue.clone(),
clock,
raw_mode: false,
};
(manager, queue)
}
#[rstest]
fn test_manager_handle_quote_no_instrument() {
let (mut manager, _queue) = make_manager();
let quote = QuoteTick::new(
InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
Price::from("100.00"),
Price::from("101.00"),
Quantity::from("1.0"),
Quantity::from("1.0"),
UnixNanos::from(1u64),
UnixNanos::from(1u64),
);
manager.handle_quote("e);
}
#[rstest]
fn test_manager_publish_slice_empty() {
let (mut manager, _queue) = make_manager();
manager.publish_slice(UnixNanos::from(100u64));
}
#[rstest]
fn test_manager_teardown_no_handlers() {
let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
let (mut manager, _queue) = make_manager();
manager.teardown(&clock);
assert!(manager.quote_handlers.is_empty());
}
fn make_option_chain_manager() -> (OptionChainManager, DeferredCommandQueue) {
let series_id = make_series_id();
let topic = switchboard::get_option_chain_topic(series_id);
let strikes = [45000, 47500, 50000, 52500, 55000];
let mut instruments = HashMap::new();
for s in &strikes {
let strike = Price::from(&s.to_string());
let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
let put_id = InstrumentId::from(&format!("BTC-20240101-{s}-P.DERIBIT"));
instruments.insert(call_id, (strike, OptionKind::Call));
instruments.insert(put_id, (strike, OptionKind::Put));
}
let tracker = AtmTracker::new();
let aggregator = OptionChainAggregator::new(
series_id,
StrikeRange::AtmRelative {
strikes_above: 1,
strikes_below: 1,
},
tracker,
instruments,
);
let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
let queue = make_test_queue();
let manager = OptionChainManager {
aggregator,
topic,
quote_handlers: Vec::new(),
greeks_handlers: Vec::new(),
timer_name: None,
msgbus_priority: 0,
bootstrapped: false,
deferred_cmd_queue: queue.clone(),
clock,
raw_mode: false,
};
(manager, queue)
}
fn bootstrap_via_greeks(manager: &mut OptionChainManager) {
use nautilus_model::data::option_chain::OptionGreeks;
let greeks = OptionGreeks {
instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
underlying_price: Some(50000.0),
..Default::default()
};
manager.handle_greeks(&greeks);
}
#[rstest]
fn test_manager_publish_slice_triggers_rebalance() {
let (mut manager, queue) = make_option_chain_manager();
assert_eq!(manager.aggregator.instrument_ids().len(), 0);
bootstrap_via_greeks(&mut manager);
assert!(manager.bootstrapped);
assert_eq!(manager.aggregator.instrument_ids().len(), 6);
assert_eq!(queue.borrow().len(), 18);
manager.publish_slice(UnixNanos::from(100u64));
assert!(manager.aggregator.last_atm_strike().is_some());
}
#[rstest]
fn test_manager_add_instrument_new() {
let (mut manager, _queue) = make_option_chain_manager();
let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
let new_id = InstrumentId::from("BTC-20240101-57500-C.DERIBIT");
let strike = Price::from("57500");
let count_before = manager.aggregator.instruments().len();
let result = manager.add_instrument(new_id, strike, OptionKind::Call, None, &clock);
assert!(result);
assert_eq!(manager.aggregator.instruments().len(), count_before + 1);
}
#[rstest]
fn test_manager_add_instrument_already_known() {
let (mut manager, _queue) = make_option_chain_manager();
let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
let existing_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
let strike = Price::from("50000");
let count_before = manager.aggregator.instruments().len();
let result = manager.add_instrument(existing_id, strike, OptionKind::Call, None, &clock);
assert!(!result);
assert_eq!(manager.aggregator.instruments().len(), count_before);
}
#[rstest]
fn test_manager_deferred_bootstrap_on_first_atm() {
let (mut manager, queue) = make_option_chain_manager();
assert!(!manager.bootstrapped);
assert_eq!(manager.aggregator.instrument_ids().len(), 0);
assert!(queue.borrow().is_empty());
bootstrap_via_greeks(&mut manager);
assert!(manager.bootstrapped);
assert_eq!(manager.aggregator.instrument_ids().len(), 6); assert_eq!(queue.borrow().len(), 18);
assert!(
queue
.borrow()
.iter()
.all(|cmd| matches!(cmd, DeferredCommand::Subscribe(_)))
);
}
#[rstest]
fn test_manager_bootstrap_idempotent() {
use nautilus_model::data::option_chain::OptionGreeks;
let (mut manager, _queue) = make_option_chain_manager();
bootstrap_via_greeks(&mut manager);
assert!(manager.bootstrapped);
let count = manager.aggregator.instrument_ids().len();
let greeks2 = OptionGreeks {
instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
underlying_price: Some(50200.0),
..Default::default()
};
manager.handle_greeks(&greeks2);
assert_eq!(manager.aggregator.instrument_ids().len(), count);
}
#[rstest]
fn test_manager_fixed_range_bootstrapped_immediately() {
let (manager, queue) = make_manager();
assert!(manager.bootstrapped);
assert!(queue.borrow().is_empty());
}
#[rstest]
fn test_manager_forward_price_bootstrap_from_greeks() {
use nautilus_model::data::option_chain::OptionGreeks;
let (mut manager, _queue) = make_option_chain_manager();
assert!(!manager.bootstrapped);
let greeks = OptionGreeks {
instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
underlying_price: Some(50000.0),
..Default::default()
};
manager.handle_greeks(&greeks);
assert!(manager.bootstrapped);
assert_eq!(manager.aggregator.instrument_ids().len(), 6);
}
#[rstest]
fn test_manager_forward_price_no_bootstrap_without_underlying() {
use nautilus_model::data::option_chain::OptionGreeks;
let (mut manager, _queue) = make_option_chain_manager();
assert!(!manager.bootstrapped);
let greeks = OptionGreeks {
instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
underlying_price: None,
..Default::default()
};
manager.handle_greeks(&greeks);
assert!(!manager.bootstrapped);
}
#[rstest]
fn test_handle_instrument_expired_removes_from_aggregator() {
let (mut manager, queue) = make_option_chain_manager();
bootstrap_via_greeks(&mut manager);
assert!(manager.bootstrapped);
let initial_count = manager.aggregator.instruments().len();
queue.borrow_mut().clear();
let expired_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
let is_empty = manager.handle_instrument_expired(&expired_id);
assert!(!is_empty);
assert_eq!(manager.aggregator.instruments().len(), initial_count - 1);
assert!(!manager.aggregator.active_ids().contains(&expired_id));
}
#[rstest]
fn test_handle_instrument_expired_pushes_deferred_unsubscribes() {
let (mut manager, queue) = make_option_chain_manager();
bootstrap_via_greeks(&mut manager);
queue.borrow_mut().clear();
let expired_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
manager.handle_instrument_expired(&expired_id);
let cmds: Vec<_> = queue.borrow().iter().cloned().collect();
assert_eq!(cmds.len(), 3);
assert!(
cmds.iter()
.all(|c| matches!(c, DeferredCommand::Unsubscribe(_)))
);
}
#[rstest]
fn test_handle_instrument_expired_returns_true_when_last() {
let series_id = make_series_id();
let topic = switchboard::get_option_chain_topic(series_id);
let call_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
let strike = Price::from("50000");
let mut instruments = HashMap::new();
instruments.insert(call_id, (strike, OptionKind::Call));
let tracker = AtmTracker::new();
let aggregator = OptionChainAggregator::new(
series_id,
StrikeRange::Fixed(vec![strike]),
tracker,
instruments,
);
let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
let queue = make_test_queue();
let mut manager = OptionChainManager {
aggregator,
topic,
quote_handlers: Vec::new(),
greeks_handlers: Vec::new(),
timer_name: None,
msgbus_priority: 0,
bootstrapped: true,
deferred_cmd_queue: queue,
clock,
raw_mode: false,
};
let is_empty = manager.handle_instrument_expired(&call_id);
assert!(is_empty);
assert!(manager.aggregator.is_catalog_empty());
}
#[rstest]
fn test_handle_instrument_expired_unknown_noop() {
let (mut manager, queue) = make_manager();
queue.borrow_mut().clear();
let unknown = InstrumentId::from("ETH-20240101-3000-C.DERIBIT");
let is_empty = manager.handle_instrument_expired(&unknown);
assert!(is_empty);
assert!(queue.borrow().is_empty()); }
#[rstest]
fn test_publish_slice_pushes_expire_series_when_expired() {
let (mut manager, queue) = make_option_chain_manager();
bootstrap_via_greeks(&mut manager);
queue.borrow_mut().clear();
let expiry_ns = manager.aggregator.series_id().expiration_ns;
manager.publish_slice(expiry_ns);
let cmds: Vec<_> = queue.borrow().iter().cloned().collect();
assert_eq!(cmds.len(), 1);
assert!(matches!(cmds[0], DeferredCommand::ExpireSeries(_)));
}
#[rstest]
fn test_expired_instrument_unsubscribes_include_instrument_status() {
let (mut manager, queue) = make_option_chain_manager();
bootstrap_via_greeks(&mut manager);
queue.borrow_mut().clear();
let expired_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
manager.handle_instrument_expired(&expired_id);
let cmds: Vec<_> = queue.borrow().iter().cloned().collect();
let status_unsubs = cmds
.iter()
.filter(|c| {
matches!(
c,
DeferredCommand::Unsubscribe(UnsubscribeCommand::InstrumentStatus(_))
)
})
.count();
assert_eq!(status_unsubs, 1);
}
}