1use std::{cell::RefCell, collections::HashMap, rc::Rc};
24
25use nautilus_common::{
26 cache::Cache,
27 clock::Clock,
28 messages::data::{
29 SubscribeCommand, SubscribeInstrumentStatus, SubscribeOptionChain, SubscribeOptionGreeks,
30 SubscribeQuotes, UnsubscribeCommand, UnsubscribeInstrumentStatus, UnsubscribeOptionGreeks,
31 UnsubscribeQuotes,
32 },
33 msgbus::{self, MStr, Topic, TypedHandler, switchboard},
34 timer::{TimeEvent, TimeEventCallback},
35};
36use nautilus_core::{UUID4, correctness::FAILED, datetime::millis_to_nanos_unchecked};
37use nautilus_model::{
38 data::{QuoteTick, option_chain::OptionGreeks},
39 enums::OptionKind,
40 identifiers::{InstrumentId, OptionSeriesId, Venue},
41 instruments::Instrument,
42 types::Price,
43};
44use ustr::Ustr;
45
46use super::{
47 AtmTracker, OptionChainAggregator,
48 handlers::{OptionChainGreeksHandler, OptionChainQuoteHandler, OptionChainSlicePublisher},
49};
50use crate::{
51 client::DataClientAdapter,
52 engine::{DeferredCommand, DeferredCommandQueue},
53};
54
55#[derive(Debug)]
61pub struct OptionChainManager {
62 aggregator: OptionChainAggregator,
63 topic: MStr<Topic>,
64 quote_handlers: Vec<TypedHandler<QuoteTick>>,
65 greeks_handlers: Vec<TypedHandler<OptionGreeks>>,
66 timer_name: Option<Ustr>,
67 msgbus_priority: u8,
68 bootstrapped: bool,
70 deferred_cmd_queue: DeferredCommandQueue,
72 clock: Rc<RefCell<dyn Clock>>,
74 raw_mode: bool,
76}
77
78impl OptionChainManager {
79 #[allow(clippy::too_many_arguments)]
86 pub(crate) fn create_and_setup(
87 series_id: OptionSeriesId,
88 cache: &Rc<RefCell<Cache>>,
89 cmd: &SubscribeOptionChain,
90 clock: &Rc<RefCell<dyn Clock>>,
91 msgbus_priority: u8,
92 client: Option<&mut DataClientAdapter>,
93 initial_atm_price: Option<Price>,
94 deferred_cmd_queue: DeferredCommandQueue,
95 ) -> Rc<RefCell<Self>> {
96 let topic = switchboard::get_option_chain_topic(series_id);
97 let instruments = Self::resolve_instruments(cache, &series_id);
98
99 let mut tracker = AtmTracker::new();
100
101 if let Some((strike, _)) = instruments.values().next() {
103 tracker.set_forward_precision(strike.precision);
104 }
105
106 if let Some(price) = initial_atm_price {
107 tracker.set_initial_price(price);
108 log::info!("Pre-populated ATM with forward price: {price}");
109 }
110 let aggregator =
111 OptionChainAggregator::new(series_id, cmd.strike_range.clone(), tracker, instruments);
112
113 let active_instrument_ids = aggregator.instrument_ids();
116 let all_instrument_ids = aggregator.all_instrument_ids();
117 let bootstrapped = !active_instrument_ids.is_empty() || all_instrument_ids.is_empty();
119
120 let raw_mode = cmd.snapshot_interval_ms.is_none();
121
122 let manager = Self {
123 aggregator,
124 topic,
125 quote_handlers: Vec::new(),
126 greeks_handlers: Vec::new(),
127 timer_name: None,
128 msgbus_priority,
129 bootstrapped,
130 deferred_cmd_queue,
131 clock: clock.clone(),
132 raw_mode,
133 };
134 let manager_rc = Rc::new(RefCell::new(manager));
135
136 let (quote_handlers, _quote_handler) = Self::register_quote_handlers(
138 &manager_rc,
139 &active_instrument_ids,
140 series_id,
141 msgbus_priority,
142 );
143 let greeks_handlers = Self::register_greeks_handlers(
144 &manager_rc,
145 &active_instrument_ids,
146 series_id,
147 msgbus_priority,
148 );
149
150 Self::forward_client_subscriptions(
153 client,
154 &active_instrument_ids,
155 cmd,
156 series_id.venue,
157 clock,
158 );
159
160 let timer_name = cmd
161 .snapshot_interval_ms
162 .map(|ms| Self::setup_timer(&manager_rc, series_id, ms, clock));
163
164 {
165 let mut mgr = manager_rc.borrow_mut();
166 mgr.quote_handlers = quote_handlers;
167 mgr.greeks_handlers = greeks_handlers;
168 mgr.timer_name = timer_name;
169 }
170
171 let mode_str = match cmd.snapshot_interval_ms {
172 Some(ms) => format!("interval={ms}ms"),
173 None => "mode=raw".to_string(),
174 };
175 log::info!(
176 "Subscribed option chain for {series_id} ({} active/{} total instruments, {mode_str})",
177 active_instrument_ids.len(),
178 all_instrument_ids.len(),
179 );
180
181 manager_rc
182 }
183
184 fn register_quote_handlers(
189 manager_rc: &Rc<RefCell<Self>>,
190 instrument_ids: &[InstrumentId],
191 series_id: OptionSeriesId,
192 priority: u8,
193 ) -> (Vec<TypedHandler<QuoteTick>>, TypedHandler<QuoteTick>) {
194 let quote_handler = TypedHandler::new(OptionChainQuoteHandler::new(manager_rc, series_id));
195 let mut handlers = Vec::with_capacity(instrument_ids.len() + 1);
197 handlers.push(quote_handler.clone());
198 for instrument_id in instrument_ids {
199 let topic = switchboard::get_quotes_topic(*instrument_id);
200 msgbus::subscribe_quotes(topic.into(), quote_handler.clone(), Some(priority));
201 handlers.push(quote_handler.clone());
202 }
203 (handlers, quote_handler)
204 }
205
206 fn register_greeks_handlers(
211 manager_rc: &Rc<RefCell<Self>>,
212 instrument_ids: &[InstrumentId],
213 series_id: OptionSeriesId,
214 priority: u8,
215 ) -> Vec<TypedHandler<OptionGreeks>> {
216 let greeks_handler =
217 TypedHandler::new(OptionChainGreeksHandler::new(manager_rc, series_id));
218 let mut handlers = Vec::with_capacity(instrument_ids.len() + 1);
220 handlers.push(greeks_handler.clone());
221 for instrument_id in instrument_ids {
222 let topic = switchboard::get_option_greeks_topic(*instrument_id);
223 msgbus::subscribe_option_greeks(topic.into(), greeks_handler.clone(), Some(priority));
224 handlers.push(greeks_handler.clone());
225 }
226 handlers
227 }
228
229 fn forward_client_subscriptions(
231 client: Option<&mut DataClientAdapter>,
232 instrument_ids: &[InstrumentId],
233 cmd: &SubscribeOptionChain,
234 venue: Venue,
235 clock: &Rc<RefCell<dyn Clock>>,
236 ) {
237 let ts_init = clock.borrow().timestamp_ns();
238
239 let Some(client) = client else {
240 log::error!(
241 "Cannot forward option chain subscriptions: no client found for venue={venue}",
242 );
243 return;
244 };
245
246 for instrument_id in instrument_ids {
247 client.execute_subscribe(&SubscribeCommand::Quotes(SubscribeQuotes {
248 instrument_id: *instrument_id,
249 client_id: cmd.client_id,
250 venue: Some(venue),
251 command_id: UUID4::new(),
252 ts_init,
253 correlation_id: None,
254 params: None,
255 }));
256 client.execute_subscribe(&SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
257 instrument_id: *instrument_id,
258 client_id: cmd.client_id,
259 venue: Some(venue),
260 command_id: UUID4::new(),
261 ts_init,
262 correlation_id: None,
263 params: None,
264 }));
265 client.execute_subscribe(&SubscribeCommand::InstrumentStatus(
266 SubscribeInstrumentStatus {
267 instrument_id: *instrument_id,
268 client_id: cmd.client_id,
269 venue: Some(venue),
270 command_id: UUID4::new(),
271 ts_init,
272 correlation_id: None,
273 params: None,
274 },
275 ));
276 }
277
278 log::info!(
279 "Forwarded {} quote + greeks + instrument status subscriptions to DataClient",
280 instrument_ids.len(),
281 );
282 }
283
284 fn setup_timer(
286 manager_rc: &Rc<RefCell<Self>>,
287 series_id: OptionSeriesId,
288 interval_ms: u64,
289 clock: &Rc<RefCell<dyn Clock>>,
290 ) -> Ustr {
291 let interval_ns = millis_to_nanos_unchecked(interval_ms as f64);
292 let publisher = OptionChainSlicePublisher::new(manager_rc);
293 let timer_name = Ustr::from(&format!("OptionChain|{series_id}|{interval_ms}"));
294
295 let now_ns = clock.borrow().timestamp_ns().as_u64();
296 let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
297
298 let callback_fn: Rc<dyn Fn(TimeEvent)> = Rc::new(move |event| publisher.publish(&event));
299 let callback = TimeEventCallback::from(callback_fn);
300
301 clock
302 .borrow_mut()
303 .set_timer_ns(
304 &timer_name,
305 interval_ns,
306 Some(start_time_ns.into()),
307 None,
308 Some(callback),
309 None,
310 None,
311 )
312 .expect(FAILED);
313
314 timer_name
315 }
316
317 #[must_use]
319 pub fn all_instrument_ids(&self) -> Vec<InstrumentId> {
320 self.aggregator.all_instrument_ids()
321 }
322
323 #[must_use]
325 pub fn venue(&self) -> Venue {
326 self.aggregator.series_id().venue
327 }
328
329 pub fn teardown(&mut self, clock: &Rc<RefCell<dyn Clock>>) {
331 let instrument_ids = self.aggregator.instrument_ids();
333
334 if let Some(handler) = self.quote_handlers.first() {
336 for instrument_id in &instrument_ids {
337 let topic = switchboard::get_quotes_topic(*instrument_id);
338 msgbus::unsubscribe_quotes(topic.into(), handler);
339 }
340 }
341
342 if let Some(handler) = self.greeks_handlers.first() {
344 for instrument_id in &instrument_ids {
345 let topic = switchboard::get_option_greeks_topic(*instrument_id);
346 msgbus::unsubscribe_option_greeks(topic.into(), handler);
347 }
348 }
349
350 if let Some(timer_name) = self.timer_name.take() {
352 let mut clk = clock.borrow_mut();
353 if clk.timer_exists(&timer_name) {
354 clk.cancel_timer(&timer_name);
355 }
356 }
357
358 self.quote_handlers.clear();
359 self.greeks_handlers.clear();
360 }
361
362 pub fn handle_greeks(&mut self, greeks: &OptionGreeks) {
367 self.aggregator
369 .atm_tracker_mut()
370 .update_from_option_greeks(greeks);
371 self.aggregator.update_greeks(greeks);
373 self.maybe_bootstrap();
375
376 if self.raw_mode
377 && self.bootstrapped
378 && self.aggregator.active_ids().contains(&greeks.instrument_id)
379 {
380 self.publish_slice(greeks.ts_event);
381 }
382 }
383
384 pub fn handle_instrument_expired(&mut self, instrument_id: &InstrumentId) -> bool {
390 let was_active = self.aggregator.active_ids().contains(instrument_id);
391
392 if !self.aggregator.remove_instrument(instrument_id) {
393 return self.aggregator.is_catalog_empty();
394 }
395
396 if was_active {
397 if let Some(qh) = self.quote_handlers.first() {
399 let topic = switchboard::get_quotes_topic(*instrument_id);
400 msgbus::unsubscribe_quotes(topic.into(), qh);
401 }
402
403 if let Some(gh) = self.greeks_handlers.first() {
404 let topic = switchboard::get_option_greeks_topic(*instrument_id);
405 msgbus::unsubscribe_option_greeks(topic.into(), gh);
406 }
407
408 self.push_unsubscribe_commands(*instrument_id);
410 }
411
412 log::info!(
413 "Removed expired instrument {instrument_id} from option chain {} (was_active={was_active}, remaining={})",
414 self.aggregator.series_id(),
415 self.aggregator.instruments().len(),
416 );
417
418 self.aggregator.is_catalog_empty()
419 }
420
421 pub fn handle_quote(&mut self, quote: &QuoteTick) {
426 self.aggregator.update_quote(quote);
427 self.maybe_bootstrap();
428
429 if self.raw_mode
430 && self.bootstrapped
431 && self.aggregator.active_ids().contains("e.instrument_id)
432 {
433 self.publish_slice(quote.ts_event);
434 }
435 }
436
437 fn maybe_bootstrap(&mut self) {
442 if self.bootstrapped {
443 return;
444 }
445
446 if self.aggregator.atm_tracker().atm_price().is_none() {
447 return;
448 }
449
450 let active_ids = self.aggregator.recompute_active_set();
452 self.register_handlers_for_instruments_bulk(&active_ids);
453
454 for &id in &active_ids {
455 self.push_subscribe_commands(id);
456 }
457
458 self.bootstrapped = true;
459
460 log::info!(
461 "Bootstrapped option chain for {} ({} active instruments)",
462 self.aggregator.series_id(),
463 active_ids.len(),
464 );
465 }
466
467 fn register_handlers_for_instruments_bulk(&self, instrument_ids: &[InstrumentId]) {
469 for &id in instrument_ids {
470 self.register_handlers_for_instrument(id);
471 }
472 }
473
474 pub fn add_instrument(
480 &mut self,
481 instrument_id: InstrumentId,
482 strike: Price,
483 kind: OptionKind,
484 client: Option<&mut DataClientAdapter>,
485 clock: &Rc<RefCell<dyn Clock>>,
486 ) -> bool {
487 if !self.aggregator.add_instrument(instrument_id, strike, kind) {
488 return false;
489 }
490
491 if self.aggregator.active_ids().contains(&instrument_id) {
492 self.register_handlers_for_instrument(instrument_id);
493 }
494
495 let venue = self.aggregator.series_id().venue;
496 Self::forward_instrument_subscriptions(client, instrument_id, venue, clock);
497
498 log::info!(
499 "Added instrument {instrument_id} to option chain {} (active={})",
500 self.aggregator.series_id(),
501 self.aggregator.active_ids().contains(&instrument_id),
502 );
503
504 true
505 }
506
507 fn register_handlers_for_instrument(&self, instrument_id: InstrumentId) {
508 if let Some(qh) = self.quote_handlers.first().cloned() {
509 let topic = switchboard::get_quotes_topic(instrument_id);
510 msgbus::subscribe_quotes(topic.into(), qh, Some(self.msgbus_priority));
511 }
512
513 if let Some(gh) = self.greeks_handlers.first().cloned() {
514 let topic = switchboard::get_option_greeks_topic(instrument_id);
515 msgbus::subscribe_option_greeks(topic.into(), gh, Some(self.msgbus_priority));
516 }
517 }
518
519 fn push_subscribe_commands(&self, instrument_id: InstrumentId) {
521 let venue = self.aggregator.series_id().venue;
522 let ts_init = self.clock.borrow().timestamp_ns();
523 let mut queue = self.deferred_cmd_queue.borrow_mut();
524 queue.push_back(DeferredCommand::Subscribe(SubscribeCommand::Quotes(
525 SubscribeQuotes {
526 instrument_id,
527 client_id: None,
528 venue: Some(venue),
529 command_id: UUID4::new(),
530 ts_init,
531 correlation_id: None,
532 params: None,
533 },
534 )));
535 queue.push_back(DeferredCommand::Subscribe(SubscribeCommand::OptionGreeks(
536 SubscribeOptionGreeks {
537 instrument_id,
538 client_id: None,
539 venue: Some(venue),
540 command_id: UUID4::new(),
541 ts_init,
542 correlation_id: None,
543 params: None,
544 },
545 )));
546 queue.push_back(DeferredCommand::Subscribe(
547 SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
548 instrument_id,
549 client_id: None,
550 venue: Some(venue),
551 command_id: UUID4::new(),
552 ts_init,
553 correlation_id: None,
554 params: None,
555 }),
556 ));
557 }
558
559 fn push_unsubscribe_commands(&self, instrument_id: InstrumentId) {
561 let venue = self.aggregator.series_id().venue;
562 let ts_init = self.clock.borrow().timestamp_ns();
563 let mut queue = self.deferred_cmd_queue.borrow_mut();
564 queue.push_back(DeferredCommand::Unsubscribe(UnsubscribeCommand::Quotes(
565 UnsubscribeQuotes {
566 instrument_id,
567 client_id: None,
568 venue: Some(venue),
569 command_id: UUID4::new(),
570 ts_init,
571 correlation_id: None,
572 params: None,
573 },
574 )));
575 queue.push_back(DeferredCommand::Unsubscribe(
576 UnsubscribeCommand::OptionGreeks(UnsubscribeOptionGreeks {
577 instrument_id,
578 client_id: None,
579 venue: Some(venue),
580 command_id: UUID4::new(),
581 ts_init,
582 correlation_id: None,
583 params: None,
584 }),
585 ));
586 queue.push_back(DeferredCommand::Unsubscribe(
587 UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
588 instrument_id,
589 client_id: None,
590 venue: Some(venue),
591 command_id: UUID4::new(),
592 ts_init,
593 correlation_id: None,
594 params: None,
595 }),
596 ));
597 }
598
599 fn forward_instrument_subscriptions(
601 client: Option<&mut DataClientAdapter>,
602 instrument_id: InstrumentId,
603 venue: Venue,
604 clock: &Rc<RefCell<dyn Clock>>,
605 ) {
606 let Some(client) = client else {
607 log::error!(
608 "Cannot forward subscriptions for {instrument_id}: no client for venue={venue}",
609 );
610 return;
611 };
612
613 let ts_init = clock.borrow().timestamp_ns();
614
615 client.execute_subscribe(&SubscribeCommand::Quotes(SubscribeQuotes {
616 instrument_id,
617 client_id: None,
618 venue: Some(venue),
619 command_id: UUID4::new(),
620 ts_init,
621 correlation_id: None,
622 params: None,
623 }));
624 client.execute_subscribe(&SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
625 instrument_id,
626 client_id: None,
627 venue: Some(venue),
628 command_id: UUID4::new(),
629 ts_init,
630 correlation_id: None,
631 params: None,
632 }));
633 client.execute_subscribe(&SubscribeCommand::InstrumentStatus(
634 SubscribeInstrumentStatus {
635 instrument_id,
636 client_id: None,
637 venue: Some(venue),
638 command_id: UUID4::new(),
639 ts_init,
640 correlation_id: None,
641 params: None,
642 },
643 ));
644 }
645
646 fn maybe_rebalance(&mut self, now_ns: nautilus_core::UnixNanos) {
648 let Some(action) = self.aggregator.check_rebalance(now_ns) else {
649 return;
650 };
651
652 if let Some(qh) = self.quote_handlers.first() {
654 for id in &action.remove {
655 msgbus::unsubscribe_quotes(switchboard::get_quotes_topic(*id).into(), qh);
656 }
657 }
658
659 if let Some(gh) = self.greeks_handlers.first() {
660 for id in &action.remove {
661 msgbus::unsubscribe_option_greeks(
662 switchboard::get_option_greeks_topic(*id).into(),
663 gh,
664 );
665 }
666 }
667
668 if let Some(qh) = self.quote_handlers.first().cloned() {
670 for id in &action.add {
671 msgbus::subscribe_quotes(
672 switchboard::get_quotes_topic(*id).into(),
673 qh.clone(),
674 Some(self.msgbus_priority),
675 );
676 }
677 }
678
679 if let Some(gh) = self.greeks_handlers.first().cloned() {
680 for id in &action.add {
681 msgbus::subscribe_option_greeks(
682 switchboard::get_option_greeks_topic(*id).into(),
683 gh.clone(),
684 Some(self.msgbus_priority),
685 );
686 }
687 }
688
689 for &id in &action.add {
691 self.push_subscribe_commands(id);
692 }
693 for &id in &action.remove {
694 self.push_unsubscribe_commands(id);
695 }
696
697 if !action.add.is_empty() || !action.remove.is_empty() {
698 log::info!(
699 "Rebalanced option chain for {}: +{} -{} instruments",
700 self.aggregator.series_id(),
701 action.add.len(),
702 action.remove.len(),
703 );
704 }
705
706 self.aggregator.apply_rebalance(&action, now_ns);
708 }
709
710 pub fn publish_slice(&mut self, ts: nautilus_core::UnixNanos) {
712 if self.aggregator.is_expired(ts) {
714 self.deferred_cmd_queue
715 .borrow_mut()
716 .push_back(DeferredCommand::ExpireSeries(self.aggregator.series_id()));
717 return;
718 }
719
720 self.maybe_rebalance(ts);
721
722 let series_id = self.aggregator.series_id();
723 let slice = self.aggregator.snapshot(ts);
724
725 if slice.is_empty() {
726 log::debug!("OptionChainSlice empty for {series_id}, skipping publish");
727 return;
728 }
729
730 log::debug!(
731 "Publishing OptionChainSlice for {} (calls={}, puts={})",
732 series_id,
733 slice.call_count(),
734 slice.put_count(),
735 );
736 msgbus::publish_option_chain(self.topic, &slice);
737 }
738
739 fn resolve_instruments(
741 cache: &Rc<RefCell<Cache>>,
742 series_id: &OptionSeriesId,
743 ) -> HashMap<InstrumentId, (Price, OptionKind)> {
744 let cache = cache.borrow();
745 let mut map = HashMap::new();
746
747 for instrument in cache.instruments(&series_id.venue, Some(&series_id.underlying)) {
748 let Some(expiration) = instrument.expiration_ns() else {
749 continue;
750 };
751
752 if expiration != series_id.expiration_ns {
753 continue;
754 }
755
756 if instrument.settlement_currency().code != series_id.settlement_currency {
757 continue;
758 }
759
760 let Some(strike) = instrument.strike_price() else {
761 continue;
762 };
763
764 let Some(kind) = instrument.option_kind() else {
765 continue;
766 };
767
768 map.insert(instrument.id(), (strike, kind));
769 }
770
771 map
772 }
773}
774
775#[cfg(test)]
776mod tests {
777 use std::collections::VecDeque;
778
779 use nautilus_common::clock::TestClock;
780 use nautilus_core::UnixNanos;
781 use nautilus_model::{data::option_chain::StrikeRange, identifiers::Venue, types::Quantity};
782 use rstest::*;
783
784 use super::*;
785
786 fn make_series_id() -> OptionSeriesId {
787 OptionSeriesId::new(
788 Venue::new("DERIBIT"),
789 ustr::Ustr::from("BTC"),
790 ustr::Ustr::from("BTC"),
791 UnixNanos::from(1_700_000_000_000_000_000u64),
792 )
793 }
794
795 fn make_test_queue() -> DeferredCommandQueue {
796 Rc::new(RefCell::new(VecDeque::new()))
797 }
798
799 fn make_manager() -> (OptionChainManager, DeferredCommandQueue) {
800 let series_id = make_series_id();
801 let topic = switchboard::get_option_chain_topic(series_id);
802 let tracker = AtmTracker::new();
803 let aggregator = OptionChainAggregator::new(
804 series_id,
805 StrikeRange::Fixed(vec![]),
806 tracker,
807 HashMap::new(),
808 );
809 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
810 let queue = make_test_queue();
811
812 let manager = OptionChainManager {
813 aggregator,
814 topic,
815 quote_handlers: Vec::new(),
816 greeks_handlers: Vec::new(),
817 timer_name: None,
818 msgbus_priority: 0,
819 bootstrapped: true,
820 deferred_cmd_queue: queue.clone(),
821 clock,
822 raw_mode: false,
823 };
824 (manager, queue)
825 }
826
827 #[rstest]
828 fn test_manager_handle_quote_no_instrument() {
829 let (mut manager, _queue) = make_manager();
830
831 let quote = QuoteTick::new(
833 InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
834 Price::from("100.00"),
835 Price::from("101.00"),
836 Quantity::from("1.0"),
837 Quantity::from("1.0"),
838 UnixNanos::from(1u64),
839 UnixNanos::from(1u64),
840 );
841 manager.handle_quote("e);
842 }
843
844 #[rstest]
845 fn test_manager_publish_slice_empty() {
846 let (mut manager, _queue) = make_manager();
847 manager.publish_slice(UnixNanos::from(100u64));
849 }
850
851 #[rstest]
852 fn test_manager_teardown_no_handlers() {
853 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
854 let (mut manager, _queue) = make_manager();
855 manager.teardown(&clock);
857 assert!(manager.quote_handlers.is_empty());
858 }
859
860 fn make_option_chain_manager() -> (OptionChainManager, DeferredCommandQueue) {
861 let series_id = make_series_id();
862 let topic = switchboard::get_option_chain_topic(series_id);
863
864 let strikes = [45000, 47500, 50000, 52500, 55000];
865 let mut instruments = HashMap::new();
866 for s in &strikes {
867 let strike = Price::from(&s.to_string());
868 let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
869 let put_id = InstrumentId::from(&format!("BTC-20240101-{s}-P.DERIBIT"));
870 instruments.insert(call_id, (strike, OptionKind::Call));
871 instruments.insert(put_id, (strike, OptionKind::Put));
872 }
873
874 let tracker = AtmTracker::new();
875 let aggregator = OptionChainAggregator::new(
876 series_id,
877 StrikeRange::AtmRelative {
878 strikes_above: 1,
879 strikes_below: 1,
880 },
881 tracker,
882 instruments,
883 );
884 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
885 let queue = make_test_queue();
886
887 let manager = OptionChainManager {
888 aggregator,
889 topic,
890 quote_handlers: Vec::new(),
891 greeks_handlers: Vec::new(),
892 timer_name: None,
893 msgbus_priority: 0,
894 bootstrapped: false,
895 deferred_cmd_queue: queue.clone(),
896 clock,
897 raw_mode: false,
898 };
899 (manager, queue)
900 }
901
902 fn bootstrap_via_greeks(manager: &mut OptionChainManager) {
903 use nautilus_model::data::option_chain::OptionGreeks;
904 let greeks = OptionGreeks {
905 instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
906 underlying_price: Some(50000.0),
907 ..Default::default()
908 };
909 manager.handle_greeks(&greeks);
910 }
911
912 #[rstest]
913 fn test_manager_publish_slice_triggers_rebalance() {
914 let (mut manager, queue) = make_option_chain_manager();
915 assert_eq!(manager.aggregator.instrument_ids().len(), 0);
917
918 bootstrap_via_greeks(&mut manager);
920 assert!(manager.bootstrapped);
921 assert_eq!(manager.aggregator.instrument_ids().len(), 6); assert_eq!(queue.borrow().len(), 18);
925
926 manager.publish_slice(UnixNanos::from(100u64));
928 assert!(manager.aggregator.last_atm_strike().is_some());
929 }
930
931 #[rstest]
932 fn test_manager_add_instrument_new() {
933 let (mut manager, _queue) = make_option_chain_manager();
934 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
935 let new_id = InstrumentId::from("BTC-20240101-57500-C.DERIBIT");
936 let strike = Price::from("57500");
937 let count_before = manager.aggregator.instruments().len();
938
939 let result = manager.add_instrument(new_id, strike, OptionKind::Call, None, &clock);
940
941 assert!(result);
942 assert_eq!(manager.aggregator.instruments().len(), count_before + 1);
943 }
944
945 #[rstest]
946 fn test_manager_add_instrument_already_known() {
947 let (mut manager, _queue) = make_option_chain_manager();
948 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
949 let existing_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
950 let strike = Price::from("50000");
951 let count_before = manager.aggregator.instruments().len();
952
953 let result = manager.add_instrument(existing_id, strike, OptionKind::Call, None, &clock);
954
955 assert!(!result);
956 assert_eq!(manager.aggregator.instruments().len(), count_before);
957 }
958
959 #[rstest]
960 fn test_manager_deferred_bootstrap_on_first_atm() {
961 let (mut manager, queue) = make_option_chain_manager();
962 assert!(!manager.bootstrapped);
964 assert_eq!(manager.aggregator.instrument_ids().len(), 0);
965 assert!(queue.borrow().is_empty());
966
967 bootstrap_via_greeks(&mut manager);
969
970 assert!(manager.bootstrapped);
971 assert_eq!(manager.aggregator.instrument_ids().len(), 6); assert_eq!(queue.borrow().len(), 18);
974
975 assert!(
977 queue
978 .borrow()
979 .iter()
980 .all(|cmd| matches!(cmd, DeferredCommand::Subscribe(_)))
981 );
982 }
983
984 #[rstest]
985 fn test_manager_bootstrap_idempotent() {
986 use nautilus_model::data::option_chain::OptionGreeks;
987
988 let (mut manager, _queue) = make_option_chain_manager();
989 bootstrap_via_greeks(&mut manager);
990 assert!(manager.bootstrapped);
991 let count = manager.aggregator.instrument_ids().len();
992
993 let greeks2 = OptionGreeks {
995 instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
996 underlying_price: Some(50200.0),
997 ..Default::default()
998 };
999 manager.handle_greeks(&greeks2);
1000 assert_eq!(manager.aggregator.instrument_ids().len(), count);
1001 }
1002
1003 #[rstest]
1004 fn test_manager_fixed_range_bootstrapped_immediately() {
1005 let (manager, queue) = make_manager();
1007 assert!(manager.bootstrapped);
1008 assert!(queue.borrow().is_empty());
1009 }
1010
1011 #[rstest]
1012 fn test_manager_forward_price_bootstrap_from_greeks() {
1013 use nautilus_model::data::option_chain::OptionGreeks;
1014
1015 let (mut manager, _queue) = make_option_chain_manager();
1016 assert!(!manager.bootstrapped);
1017
1018 let greeks = OptionGreeks {
1020 instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
1021 underlying_price: Some(50000.0),
1022 ..Default::default()
1023 };
1024 manager.handle_greeks(&greeks);
1025 assert!(manager.bootstrapped);
1026 assert_eq!(manager.aggregator.instrument_ids().len(), 6);
1028 }
1029
1030 #[rstest]
1031 fn test_manager_forward_price_no_bootstrap_without_underlying() {
1032 use nautilus_model::data::option_chain::OptionGreeks;
1033
1034 let (mut manager, _queue) = make_option_chain_manager();
1035 assert!(!manager.bootstrapped);
1036
1037 let greeks = OptionGreeks {
1039 instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
1040 underlying_price: None,
1041 ..Default::default()
1042 };
1043 manager.handle_greeks(&greeks);
1044 assert!(!manager.bootstrapped);
1045 }
1046
1047 #[rstest]
1048 fn test_handle_instrument_expired_removes_from_aggregator() {
1049 let (mut manager, queue) = make_option_chain_manager();
1050 bootstrap_via_greeks(&mut manager);
1052 assert!(manager.bootstrapped);
1053 let initial_count = manager.aggregator.instruments().len();
1054 queue.borrow_mut().clear(); let expired_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1057 let is_empty = manager.handle_instrument_expired(&expired_id);
1058
1059 assert!(!is_empty);
1060 assert_eq!(manager.aggregator.instruments().len(), initial_count - 1);
1061 assert!(!manager.aggregator.active_ids().contains(&expired_id));
1062 }
1063
1064 #[rstest]
1065 fn test_handle_instrument_expired_pushes_deferred_unsubscribes() {
1066 let (mut manager, queue) = make_option_chain_manager();
1067 bootstrap_via_greeks(&mut manager);
1068 queue.borrow_mut().clear();
1069
1070 let expired_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1071 manager.handle_instrument_expired(&expired_id);
1072
1073 let cmds: Vec<_> = queue.borrow().iter().cloned().collect();
1075 assert_eq!(cmds.len(), 3);
1076 assert!(
1077 cmds.iter()
1078 .all(|c| matches!(c, DeferredCommand::Unsubscribe(_)))
1079 );
1080 }
1081
1082 #[rstest]
1083 fn test_handle_instrument_expired_returns_true_when_last() {
1084 let series_id = make_series_id();
1085 let topic = switchboard::get_option_chain_topic(series_id);
1086 let call_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1087 let strike = Price::from("50000");
1088 let mut instruments = HashMap::new();
1089 instruments.insert(call_id, (strike, OptionKind::Call));
1090 let tracker = AtmTracker::new();
1091 let aggregator = OptionChainAggregator::new(
1092 series_id,
1093 StrikeRange::Fixed(vec![strike]),
1094 tracker,
1095 instruments,
1096 );
1097 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1098 let queue = make_test_queue();
1099
1100 let mut manager = OptionChainManager {
1101 aggregator,
1102 topic,
1103 quote_handlers: Vec::new(),
1104 greeks_handlers: Vec::new(),
1105 timer_name: None,
1106 msgbus_priority: 0,
1107 bootstrapped: true,
1108 deferred_cmd_queue: queue,
1109 clock,
1110 raw_mode: false,
1111 };
1112
1113 let is_empty = manager.handle_instrument_expired(&call_id);
1114 assert!(is_empty);
1115 assert!(manager.aggregator.is_catalog_empty());
1116 }
1117
1118 #[rstest]
1119 fn test_handle_instrument_expired_unknown_noop() {
1120 let (mut manager, queue) = make_manager();
1121 queue.borrow_mut().clear();
1122
1123 let unknown = InstrumentId::from("ETH-20240101-3000-C.DERIBIT");
1124 let is_empty = manager.handle_instrument_expired(&unknown);
1125
1126 assert!(is_empty);
1128 assert!(queue.borrow().is_empty()); }
1130
1131 #[rstest]
1132 fn test_publish_slice_pushes_expire_series_when_expired() {
1133 let (mut manager, queue) = make_option_chain_manager();
1134 bootstrap_via_greeks(&mut manager);
1135 queue.borrow_mut().clear();
1136
1137 let expiry_ns = manager.aggregator.series_id().expiration_ns;
1139 manager.publish_slice(expiry_ns);
1140
1141 let cmds: Vec<_> = queue.borrow().iter().cloned().collect();
1142 assert_eq!(cmds.len(), 1);
1143 assert!(matches!(cmds[0], DeferredCommand::ExpireSeries(_)));
1144 }
1145
1146 #[rstest]
1147 fn test_expired_instrument_unsubscribes_include_instrument_status() {
1148 let (mut manager, queue) = make_option_chain_manager();
1149 bootstrap_via_greeks(&mut manager);
1150 queue.borrow_mut().clear();
1151
1152 let expired_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1153 manager.handle_instrument_expired(&expired_id);
1154
1155 let cmds: Vec<_> = queue.borrow().iter().cloned().collect();
1156 let status_unsubs = cmds
1158 .iter()
1159 .filter(|c| {
1160 matches!(
1161 c,
1162 DeferredCommand::Unsubscribe(UnsubscribeCommand::InstrumentStatus(_))
1163 )
1164 })
1165 .count();
1166 assert_eq!(status_unsubs, 1);
1167 }
1168}