1use std::{cell::RefCell, collections::HashMap, rc::Rc};
24
25use nautilus_common::{
26 cache::Cache,
27 clock::Clock,
28 messages::data::{
29 SubscribeCommand, SubscribeInstrumentStatus, SubscribeOptionChain, SubscribeOptionGreeks,
30 SubscribeQuotes, UnsubscribeCommand, UnsubscribeInstrumentStatus, UnsubscribeOptionGreeks,
31 UnsubscribeQuotes,
32 },
33 msgbus::{self, MStr, Topic, TypedHandler, switchboard},
34 timer::{TimeEvent, TimeEventCallback},
35};
36use nautilus_core::{UUID4, correctness::FAILED, datetime::millis_to_nanos_unchecked};
37use nautilus_model::{
38 data::{QuoteTick, option_chain::OptionGreeks},
39 enums::OptionKind,
40 identifiers::{InstrumentId, OptionSeriesId, Venue},
41 instruments::Instrument,
42 types::Price,
43};
44use ustr::Ustr;
45
46use super::{
47 AtmTracker, OptionChainAggregator,
48 handlers::{OptionChainGreeksHandler, OptionChainQuoteHandler, OptionChainSlicePublisher},
49};
50use crate::{
51 client::DataClientAdapter,
52 engine::{DeferredCommand, DeferredCommandQueue},
53};
54
55#[derive(Debug)]
61pub struct OptionChainManager {
62 aggregator: OptionChainAggregator,
63 topic: MStr<Topic>,
64 quote_handlers: Vec<TypedHandler<QuoteTick>>,
65 greeks_handlers: Vec<TypedHandler<OptionGreeks>>,
66 timer_name: Option<Ustr>,
67 msgbus_priority: u32,
68 bootstrapped: bool,
70 deferred_cmd_queue: DeferredCommandQueue,
72 clock: Rc<RefCell<dyn Clock>>,
74 raw_mode: bool,
76}
77
78impl OptionChainManager {
79 #[expect(clippy::too_many_arguments)]
86 pub(crate) fn create_and_setup(
87 series_id: OptionSeriesId,
88 cache: &Rc<RefCell<Cache>>,
89 cmd: &SubscribeOptionChain,
90 clock: &Rc<RefCell<dyn Clock>>,
91 msgbus_priority: u32,
92 client: Option<&mut DataClientAdapter>,
93 initial_atm_price: Option<Price>,
94 deferred_cmd_queue: DeferredCommandQueue,
95 ) -> Rc<RefCell<Self>> {
96 let topic = switchboard::get_option_chain_topic(series_id);
97 let instruments = Self::resolve_instruments(cache, &series_id);
98
99 let mut tracker = AtmTracker::new();
100
101 if let Some((strike, _)) = instruments.values().next() {
103 tracker.set_forward_precision(strike.precision);
104 }
105
106 if let Some(price) = initial_atm_price {
107 tracker.set_initial_price(price);
108 log::info!("Pre-populated ATM with forward price: {price}");
109 }
110 let aggregator =
111 OptionChainAggregator::new(series_id, cmd.strike_range.clone(), tracker, instruments);
112
113 let active_instrument_ids = aggregator.instrument_ids();
116 let all_instrument_ids = aggregator.all_instrument_ids();
117 let bootstrapped = !active_instrument_ids.is_empty() || all_instrument_ids.is_empty();
119
120 let raw_mode = cmd.snapshot_interval_ms.is_none();
121
122 let manager = Self {
123 aggregator,
124 topic,
125 quote_handlers: Vec::new(),
126 greeks_handlers: Vec::new(),
127 timer_name: None,
128 msgbus_priority,
129 bootstrapped,
130 deferred_cmd_queue,
131 clock: clock.clone(),
132 raw_mode,
133 };
134 let manager_rc = Rc::new(RefCell::new(manager));
135
136 let (quote_handlers, _quote_handler) = Self::register_quote_handlers(
138 &manager_rc,
139 &active_instrument_ids,
140 series_id,
141 msgbus_priority,
142 );
143 let greeks_handlers = Self::register_greeks_handlers(
144 &manager_rc,
145 &active_instrument_ids,
146 series_id,
147 msgbus_priority,
148 );
149
150 Self::forward_client_subscriptions(
153 client,
154 &active_instrument_ids,
155 cmd,
156 series_id.venue,
157 clock,
158 );
159
160 let timer_name = cmd
161 .snapshot_interval_ms
162 .map(|ms| Self::setup_timer(&manager_rc, series_id, ms, clock));
163
164 {
165 let mut mgr = manager_rc.borrow_mut();
166 mgr.quote_handlers = quote_handlers;
167 mgr.greeks_handlers = greeks_handlers;
168 mgr.timer_name = timer_name;
169 }
170
171 let mode_str = match cmd.snapshot_interval_ms {
172 Some(ms) => format!("interval={ms}ms"),
173 None => "mode=raw".to_string(),
174 };
175 log::info!(
176 "Subscribed option chain for {series_id} ({} active/{} total instruments, {mode_str})",
177 active_instrument_ids.len(),
178 all_instrument_ids.len(),
179 );
180
181 manager_rc
182 }
183
184 fn register_quote_handlers(
189 manager_rc: &Rc<RefCell<Self>>,
190 instrument_ids: &[InstrumentId],
191 series_id: OptionSeriesId,
192 priority: u32,
193 ) -> (Vec<TypedHandler<QuoteTick>>, TypedHandler<QuoteTick>) {
194 let quote_handler = TypedHandler::new(OptionChainQuoteHandler::new(manager_rc, series_id));
195 let mut handlers = Vec::with_capacity(instrument_ids.len() + 1);
197 handlers.push(quote_handler.clone());
198
199 for instrument_id in instrument_ids {
200 let topic = switchboard::get_quotes_topic(*instrument_id);
201 msgbus::subscribe_quotes(topic.into(), quote_handler.clone(), Some(priority));
202 handlers.push(quote_handler.clone());
203 }
204 (handlers, quote_handler)
205 }
206
207 fn register_greeks_handlers(
212 manager_rc: &Rc<RefCell<Self>>,
213 instrument_ids: &[InstrumentId],
214 series_id: OptionSeriesId,
215 priority: u32,
216 ) -> Vec<TypedHandler<OptionGreeks>> {
217 let greeks_handler =
218 TypedHandler::new(OptionChainGreeksHandler::new(manager_rc, series_id));
219 let mut handlers = Vec::with_capacity(instrument_ids.len() + 1);
221 handlers.push(greeks_handler.clone());
222
223 for instrument_id in instrument_ids {
224 let topic = switchboard::get_option_greeks_topic(*instrument_id);
225 msgbus::subscribe_option_greeks(topic.into(), greeks_handler.clone(), Some(priority));
226 handlers.push(greeks_handler.clone());
227 }
228 handlers
229 }
230
231 fn forward_client_subscriptions(
233 client: Option<&mut DataClientAdapter>,
234 instrument_ids: &[InstrumentId],
235 cmd: &SubscribeOptionChain,
236 venue: Venue,
237 clock: &Rc<RefCell<dyn Clock>>,
238 ) {
239 let ts_init = clock.borrow().timestamp_ns();
240
241 let Some(client) = client else {
242 log::error!(
243 "Cannot forward option chain subscriptions: no client found for venue={venue}",
244 );
245 return;
246 };
247
248 for instrument_id in instrument_ids {
249 client.execute_subscribe(SubscribeCommand::Quotes(SubscribeQuotes {
250 instrument_id: *instrument_id,
251 client_id: cmd.client_id,
252 venue: Some(venue),
253 command_id: UUID4::new(),
254 ts_init,
255 correlation_id: None,
256 params: None,
257 }));
258 client.execute_subscribe(SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
259 instrument_id: *instrument_id,
260 client_id: cmd.client_id,
261 venue: Some(venue),
262 command_id: UUID4::new(),
263 ts_init,
264 correlation_id: None,
265 params: None,
266 }));
267 client.execute_subscribe(SubscribeCommand::InstrumentStatus(
268 SubscribeInstrumentStatus {
269 instrument_id: *instrument_id,
270 client_id: cmd.client_id,
271 venue: Some(venue),
272 command_id: UUID4::new(),
273 ts_init,
274 correlation_id: None,
275 params: None,
276 },
277 ));
278 }
279
280 log::info!(
281 "Forwarded {} quote + greeks + instrument status subscriptions to DataClient",
282 instrument_ids.len(),
283 );
284 }
285
286 fn setup_timer(
288 manager_rc: &Rc<RefCell<Self>>,
289 series_id: OptionSeriesId,
290 interval_ms: u64,
291 clock: &Rc<RefCell<dyn Clock>>,
292 ) -> Ustr {
293 let interval_ns = millis_to_nanos_unchecked(interval_ms as f64);
294 let publisher = OptionChainSlicePublisher::new(manager_rc);
295 let timer_name = Ustr::from(&format!("OptionChain|{series_id}|{interval_ms}"));
296
297 let now_ns = clock.borrow().timestamp_ns().as_u64();
298 let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
299
300 let callback_fn: Rc<dyn Fn(TimeEvent)> = Rc::new(move |event| publisher.publish(&event));
301 let callback = TimeEventCallback::from(callback_fn);
302
303 clock
304 .borrow_mut()
305 .set_timer_ns(
306 &timer_name,
307 interval_ns,
308 Some(start_time_ns.into()),
309 None,
310 Some(callback),
311 None,
312 None,
313 )
314 .expect(FAILED);
315
316 timer_name
317 }
318
319 #[must_use]
321 pub fn all_instrument_ids(&self) -> Vec<InstrumentId> {
322 self.aggregator.all_instrument_ids()
323 }
324
325 #[must_use]
327 pub fn venue(&self) -> Venue {
328 self.aggregator.series_id().venue
329 }
330
331 pub fn teardown(&mut self, clock: &Rc<RefCell<dyn Clock>>) {
333 let instrument_ids = self.aggregator.instrument_ids();
335
336 if let Some(handler) = self.quote_handlers.first() {
338 for instrument_id in &instrument_ids {
339 let topic = switchboard::get_quotes_topic(*instrument_id);
340 msgbus::unsubscribe_quotes(topic.into(), handler);
341 }
342 }
343
344 if let Some(handler) = self.greeks_handlers.first() {
346 for instrument_id in &instrument_ids {
347 let topic = switchboard::get_option_greeks_topic(*instrument_id);
348 msgbus::unsubscribe_option_greeks(topic.into(), handler);
349 }
350 }
351
352 if let Some(timer_name) = self.timer_name.take() {
354 let mut clk = clock.borrow_mut();
355 if clk.timer_exists(&timer_name) {
356 clk.cancel_timer(&timer_name);
357 }
358 }
359
360 self.quote_handlers.clear();
361 self.greeks_handlers.clear();
362 }
363
364 pub fn handle_greeks(&mut self, greeks: &OptionGreeks) {
369 if self.aggregator.is_expired(greeks.ts_event) {
370 log::warn!(
371 "Dropping greeks for {}, series {} expired",
372 greeks.instrument_id,
373 self.aggregator.series_id(),
374 );
375 self.deferred_cmd_queue
376 .borrow_mut()
377 .push_back(DeferredCommand::ExpireInstrument(greeks.instrument_id));
378 return;
379 }
380
381 self.aggregator
383 .atm_tracker_mut()
384 .update_from_option_greeks(greeks);
385 self.aggregator.update_greeks(greeks);
387 self.maybe_bootstrap();
389
390 if self.raw_mode
391 && self.bootstrapped
392 && self.aggregator.active_ids().contains(&greeks.instrument_id)
393 {
394 self.publish_slice(greeks.ts_event);
395 }
396 }
397
398 pub fn handle_instrument_expired(&mut self, instrument_id: &InstrumentId) -> bool {
404 let was_active = self.aggregator.active_ids().contains(instrument_id);
405
406 if !self.aggregator.remove_instrument(instrument_id) {
407 return self.aggregator.is_catalog_empty();
408 }
409
410 if was_active {
411 if let Some(qh) = self.quote_handlers.first() {
413 let topic = switchboard::get_quotes_topic(*instrument_id);
414 msgbus::unsubscribe_quotes(topic.into(), qh);
415 }
416
417 if let Some(gh) = self.greeks_handlers.first() {
418 let topic = switchboard::get_option_greeks_topic(*instrument_id);
419 msgbus::unsubscribe_option_greeks(topic.into(), gh);
420 }
421
422 self.push_unsubscribe_commands(*instrument_id);
424 }
425
426 log::info!(
427 "Removed expired instrument {instrument_id} from option chain {} (was_active={was_active}, remaining={})",
428 self.aggregator.series_id(),
429 self.aggregator.instruments().len(),
430 );
431
432 self.aggregator.is_catalog_empty()
433 }
434
435 pub fn handle_quote(&mut self, quote: &QuoteTick) {
440 if self.aggregator.is_expired(quote.ts_event) {
441 log::warn!(
442 "Dropping quote for {}, series {} expired",
443 quote.instrument_id,
444 self.aggregator.series_id(),
445 );
446 self.deferred_cmd_queue
447 .borrow_mut()
448 .push_back(DeferredCommand::ExpireInstrument(quote.instrument_id));
449 return;
450 }
451
452 self.aggregator.update_quote(quote);
453 self.maybe_bootstrap();
454
455 if self.raw_mode
456 && self.bootstrapped
457 && self.aggregator.active_ids().contains("e.instrument_id)
458 {
459 self.publish_slice(quote.ts_event);
460 }
461 }
462
463 fn maybe_bootstrap(&mut self) {
468 if self.bootstrapped {
469 return;
470 }
471
472 if self.aggregator.atm_tracker().atm_price().is_none() {
473 return;
474 }
475
476 let active_ids = self.aggregator.recompute_active_set();
478 self.register_handlers_for_instruments_bulk(&active_ids);
479
480 for &id in &active_ids {
481 self.push_subscribe_commands(id);
482 }
483
484 self.bootstrapped = true;
485
486 log::info!(
487 "Bootstrapped option chain for {} ({} active instruments)",
488 self.aggregator.series_id(),
489 active_ids.len(),
490 );
491 }
492
493 fn register_handlers_for_instruments_bulk(&self, instrument_ids: &[InstrumentId]) {
495 for &id in instrument_ids {
496 self.register_handlers_for_instrument(id);
497 }
498 }
499
500 pub fn add_instrument(
506 &mut self,
507 instrument_id: InstrumentId,
508 strike: Price,
509 kind: OptionKind,
510 client: Option<&mut DataClientAdapter>,
511 clock: &Rc<RefCell<dyn Clock>>,
512 ) -> bool {
513 if !self.aggregator.add_instrument(instrument_id, strike, kind) {
514 return false;
515 }
516
517 if self.aggregator.active_ids().contains(&instrument_id) {
518 self.register_handlers_for_instrument(instrument_id);
519 }
520
521 let venue = self.aggregator.series_id().venue;
522 Self::forward_instrument_subscriptions(client, instrument_id, venue, clock);
523
524 log::info!(
525 "Added instrument {instrument_id} to option chain {} (active={})",
526 self.aggregator.series_id(),
527 self.aggregator.active_ids().contains(&instrument_id),
528 );
529
530 true
531 }
532
533 fn register_handlers_for_instrument(&self, instrument_id: InstrumentId) {
534 if let Some(qh) = self.quote_handlers.first().cloned() {
535 let topic = switchboard::get_quotes_topic(instrument_id);
536 msgbus::subscribe_quotes(topic.into(), qh, Some(self.msgbus_priority));
537 }
538
539 if let Some(gh) = self.greeks_handlers.first().cloned() {
540 let topic = switchboard::get_option_greeks_topic(instrument_id);
541 msgbus::subscribe_option_greeks(topic.into(), gh, Some(self.msgbus_priority));
542 }
543 }
544
545 fn push_subscribe_commands(&self, instrument_id: InstrumentId) {
547 let venue = self.aggregator.series_id().venue;
548 let ts_init = self.clock.borrow().timestamp_ns();
549 let mut queue = self.deferred_cmd_queue.borrow_mut();
550 queue.push_back(DeferredCommand::Subscribe(SubscribeCommand::Quotes(
551 SubscribeQuotes {
552 instrument_id,
553 client_id: None,
554 venue: Some(venue),
555 command_id: UUID4::new(),
556 ts_init,
557 correlation_id: None,
558 params: None,
559 },
560 )));
561 queue.push_back(DeferredCommand::Subscribe(SubscribeCommand::OptionGreeks(
562 SubscribeOptionGreeks {
563 instrument_id,
564 client_id: None,
565 venue: Some(venue),
566 command_id: UUID4::new(),
567 ts_init,
568 correlation_id: None,
569 params: None,
570 },
571 )));
572 queue.push_back(DeferredCommand::Subscribe(
573 SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
574 instrument_id,
575 client_id: None,
576 venue: Some(venue),
577 command_id: UUID4::new(),
578 ts_init,
579 correlation_id: None,
580 params: None,
581 }),
582 ));
583 }
584
585 fn push_unsubscribe_commands(&self, instrument_id: InstrumentId) {
587 let venue = self.aggregator.series_id().venue;
588 let ts_init = self.clock.borrow().timestamp_ns();
589 let mut queue = self.deferred_cmd_queue.borrow_mut();
590 queue.push_back(DeferredCommand::Unsubscribe(UnsubscribeCommand::Quotes(
591 UnsubscribeQuotes {
592 instrument_id,
593 client_id: None,
594 venue: Some(venue),
595 command_id: UUID4::new(),
596 ts_init,
597 correlation_id: None,
598 params: None,
599 },
600 )));
601 queue.push_back(DeferredCommand::Unsubscribe(
602 UnsubscribeCommand::OptionGreeks(UnsubscribeOptionGreeks {
603 instrument_id,
604 client_id: None,
605 venue: Some(venue),
606 command_id: UUID4::new(),
607 ts_init,
608 correlation_id: None,
609 params: None,
610 }),
611 ));
612 queue.push_back(DeferredCommand::Unsubscribe(
613 UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
614 instrument_id,
615 client_id: None,
616 venue: Some(venue),
617 command_id: UUID4::new(),
618 ts_init,
619 correlation_id: None,
620 params: None,
621 }),
622 ));
623 }
624
625 fn forward_instrument_subscriptions(
627 client: Option<&mut DataClientAdapter>,
628 instrument_id: InstrumentId,
629 venue: Venue,
630 clock: &Rc<RefCell<dyn Clock>>,
631 ) {
632 let Some(client) = client else {
633 log::error!(
634 "Cannot forward subscriptions for {instrument_id}: no client for venue={venue}",
635 );
636 return;
637 };
638
639 let ts_init = clock.borrow().timestamp_ns();
640
641 client.execute_subscribe(SubscribeCommand::Quotes(SubscribeQuotes {
642 instrument_id,
643 client_id: None,
644 venue: Some(venue),
645 command_id: UUID4::new(),
646 ts_init,
647 correlation_id: None,
648 params: None,
649 }));
650 client.execute_subscribe(SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
651 instrument_id,
652 client_id: None,
653 venue: Some(venue),
654 command_id: UUID4::new(),
655 ts_init,
656 correlation_id: None,
657 params: None,
658 }));
659 client.execute_subscribe(SubscribeCommand::InstrumentStatus(
660 SubscribeInstrumentStatus {
661 instrument_id,
662 client_id: None,
663 venue: Some(venue),
664 command_id: UUID4::new(),
665 ts_init,
666 correlation_id: None,
667 params: None,
668 },
669 ));
670 }
671
672 fn maybe_rebalance(&mut self, now_ns: nautilus_core::UnixNanos) {
674 let Some(action) = self.aggregator.check_rebalance(now_ns) else {
675 return;
676 };
677
678 if let Some(qh) = self.quote_handlers.first() {
680 for id in &action.remove {
681 msgbus::unsubscribe_quotes(switchboard::get_quotes_topic(*id).into(), qh);
682 }
683 }
684
685 if let Some(gh) = self.greeks_handlers.first() {
686 for id in &action.remove {
687 msgbus::unsubscribe_option_greeks(
688 switchboard::get_option_greeks_topic(*id).into(),
689 gh,
690 );
691 }
692 }
693
694 if let Some(qh) = self.quote_handlers.first().cloned() {
696 for id in &action.add {
697 msgbus::subscribe_quotes(
698 switchboard::get_quotes_topic(*id).into(),
699 qh.clone(),
700 Some(self.msgbus_priority),
701 );
702 }
703 }
704
705 if let Some(gh) = self.greeks_handlers.first().cloned() {
706 for id in &action.add {
707 msgbus::subscribe_option_greeks(
708 switchboard::get_option_greeks_topic(*id).into(),
709 gh.clone(),
710 Some(self.msgbus_priority),
711 );
712 }
713 }
714
715 for &id in &action.add {
717 self.push_subscribe_commands(id);
718 }
719
720 for &id in &action.remove {
721 self.push_unsubscribe_commands(id);
722 }
723
724 if !action.add.is_empty() || !action.remove.is_empty() {
725 log::info!(
726 "Rebalanced option chain for {}: +{} -{} instruments",
727 self.aggregator.series_id(),
728 action.add.len(),
729 action.remove.len(),
730 );
731 }
732
733 self.aggregator.apply_rebalance(&action, now_ns);
735 }
736
737 pub fn publish_slice(&mut self, ts: nautilus_core::UnixNanos) {
739 if self.aggregator.is_expired(ts) {
741 self.deferred_cmd_queue
742 .borrow_mut()
743 .push_back(DeferredCommand::ExpireSeries(self.aggregator.series_id()));
744 return;
745 }
746
747 self.maybe_rebalance(ts);
748
749 let series_id = self.aggregator.series_id();
750 let slice = self.aggregator.snapshot(ts);
751
752 if slice.is_empty() {
753 log::debug!("OptionChainSlice empty for {series_id}, skipping publish");
754 return;
755 }
756
757 log::debug!(
758 "Publishing OptionChainSlice for {} (calls={}, puts={})",
759 series_id,
760 slice.call_count(),
761 slice.put_count(),
762 );
763 msgbus::publish_option_chain(self.topic, &slice);
764 }
765
766 fn resolve_instruments(
768 cache: &Rc<RefCell<Cache>>,
769 series_id: &OptionSeriesId,
770 ) -> HashMap<InstrumentId, (Price, OptionKind)> {
771 let cache = cache.borrow();
772 let mut map = HashMap::new();
773
774 for instrument in cache.instruments(&series_id.venue, Some(&series_id.underlying)) {
775 let Some(expiration) = instrument.expiration_ns() else {
776 continue;
777 };
778
779 if expiration != series_id.expiration_ns {
780 continue;
781 }
782
783 if instrument.settlement_currency().code != series_id.settlement_currency {
784 continue;
785 }
786
787 let Some(strike) = instrument.strike_price() else {
788 continue;
789 };
790
791 let Some(kind) = instrument.option_kind() else {
792 continue;
793 };
794
795 map.insert(instrument.id(), (strike, kind));
796 }
797
798 map
799 }
800}
801
802#[cfg(test)]
803mod tests {
804 use std::collections::VecDeque;
805
806 use nautilus_common::clock::TestClock;
807 use nautilus_core::UnixNanos;
808 use nautilus_model::{data::option_chain::StrikeRange, identifiers::Venue, types::Quantity};
809 use rstest::*;
810
811 use super::*;
812
813 fn make_series_id() -> OptionSeriesId {
814 OptionSeriesId::new(
815 Venue::new("DERIBIT"),
816 ustr::Ustr::from("BTC"),
817 ustr::Ustr::from("BTC"),
818 UnixNanos::from(1_700_000_000_000_000_000u64),
819 )
820 }
821
822 fn make_test_queue() -> DeferredCommandQueue {
823 Rc::new(RefCell::new(VecDeque::new()))
824 }
825
826 fn make_manager() -> (OptionChainManager, DeferredCommandQueue) {
827 let series_id = make_series_id();
828 let topic = switchboard::get_option_chain_topic(series_id);
829 let tracker = AtmTracker::new();
830 let aggregator = OptionChainAggregator::new(
831 series_id,
832 StrikeRange::Fixed(vec![]),
833 tracker,
834 HashMap::new(),
835 );
836 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
837 let queue = make_test_queue();
838
839 let manager = OptionChainManager {
840 aggregator,
841 topic,
842 quote_handlers: Vec::new(),
843 greeks_handlers: Vec::new(),
844 timer_name: None,
845 msgbus_priority: 0,
846 bootstrapped: true,
847 deferred_cmd_queue: queue.clone(),
848 clock,
849 raw_mode: false,
850 };
851 (manager, queue)
852 }
853
854 #[rstest]
855 fn test_manager_handle_quote_no_instrument() {
856 let (mut manager, _queue) = make_manager();
857
858 let quote = QuoteTick::new(
860 InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
861 Price::from("100.00"),
862 Price::from("101.00"),
863 Quantity::from("1.0"),
864 Quantity::from("1.0"),
865 UnixNanos::from(1u64),
866 UnixNanos::from(1u64),
867 );
868 manager.handle_quote("e);
869 }
870
871 #[rstest]
872 fn test_manager_publish_slice_empty() {
873 let (mut manager, _queue) = make_manager();
874 manager.publish_slice(UnixNanos::from(100u64));
876 }
877
878 #[rstest]
879 fn test_manager_teardown_no_handlers() {
880 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
881 let (mut manager, _queue) = make_manager();
882 manager.teardown(&clock);
884 assert!(manager.quote_handlers.is_empty());
885 }
886
887 fn make_option_chain_manager() -> (OptionChainManager, DeferredCommandQueue) {
888 let series_id = make_series_id();
889 let topic = switchboard::get_option_chain_topic(series_id);
890
891 let strikes = [45000, 47500, 50000, 52500, 55000];
892 let mut instruments = HashMap::new();
893
894 for s in &strikes {
895 let strike = Price::from(&s.to_string());
896 let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
897 let put_id = InstrumentId::from(&format!("BTC-20240101-{s}-P.DERIBIT"));
898 instruments.insert(call_id, (strike, OptionKind::Call));
899 instruments.insert(put_id, (strike, OptionKind::Put));
900 }
901
902 let tracker = AtmTracker::new();
903 let aggregator = OptionChainAggregator::new(
904 series_id,
905 StrikeRange::AtmRelative {
906 strikes_above: 1,
907 strikes_below: 1,
908 },
909 tracker,
910 instruments,
911 );
912 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
913 let queue = make_test_queue();
914
915 let manager = OptionChainManager {
916 aggregator,
917 topic,
918 quote_handlers: Vec::new(),
919 greeks_handlers: Vec::new(),
920 timer_name: None,
921 msgbus_priority: 0,
922 bootstrapped: false,
923 deferred_cmd_queue: queue.clone(),
924 clock,
925 raw_mode: false,
926 };
927 (manager, queue)
928 }
929
930 fn bootstrap_via_greeks(manager: &mut OptionChainManager) {
931 use nautilus_model::data::option_chain::OptionGreeks;
932 let greeks = OptionGreeks {
933 instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
934 underlying_price: Some(50000.0),
935 ..Default::default()
936 };
937 manager.handle_greeks(&greeks);
938 }
939
940 #[rstest]
941 fn test_manager_publish_slice_triggers_rebalance() {
942 let (mut manager, queue) = make_option_chain_manager();
943 assert_eq!(manager.aggregator.instrument_ids().len(), 0);
945
946 bootstrap_via_greeks(&mut manager);
948 assert!(manager.bootstrapped);
949 assert_eq!(manager.aggregator.instrument_ids().len(), 6); assert_eq!(queue.borrow().len(), 18);
953
954 manager.publish_slice(UnixNanos::from(100u64));
956 assert!(manager.aggregator.last_atm_strike().is_some());
957 }
958
959 #[rstest]
960 fn test_manager_add_instrument_new() {
961 let (mut manager, _queue) = make_option_chain_manager();
962 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
963 let new_id = InstrumentId::from("BTC-20240101-57500-C.DERIBIT");
964 let strike = Price::from("57500");
965 let count_before = manager.aggregator.instruments().len();
966
967 let result = manager.add_instrument(new_id, strike, OptionKind::Call, None, &clock);
968
969 assert!(result);
970 assert_eq!(manager.aggregator.instruments().len(), count_before + 1);
971 }
972
973 #[rstest]
974 fn test_manager_add_instrument_already_known() {
975 let (mut manager, _queue) = make_option_chain_manager();
976 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
977 let existing_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
978 let strike = Price::from("50000");
979 let count_before = manager.aggregator.instruments().len();
980
981 let result = manager.add_instrument(existing_id, strike, OptionKind::Call, None, &clock);
982
983 assert!(!result);
984 assert_eq!(manager.aggregator.instruments().len(), count_before);
985 }
986
987 #[rstest]
988 fn test_manager_deferred_bootstrap_on_first_atm() {
989 let (mut manager, queue) = make_option_chain_manager();
990 assert!(!manager.bootstrapped);
992 assert_eq!(manager.aggregator.instrument_ids().len(), 0);
993 assert!(queue.borrow().is_empty());
994
995 bootstrap_via_greeks(&mut manager);
997
998 assert!(manager.bootstrapped);
999 assert_eq!(manager.aggregator.instrument_ids().len(), 6); assert_eq!(queue.borrow().len(), 18);
1002
1003 assert!(
1005 queue
1006 .borrow()
1007 .iter()
1008 .all(|cmd| matches!(cmd, DeferredCommand::Subscribe(_)))
1009 );
1010 }
1011
1012 #[rstest]
1013 fn test_manager_bootstrap_idempotent() {
1014 use nautilus_model::data::option_chain::OptionGreeks;
1015
1016 let (mut manager, _queue) = make_option_chain_manager();
1017 bootstrap_via_greeks(&mut manager);
1018 assert!(manager.bootstrapped);
1019 let count = manager.aggregator.instrument_ids().len();
1020
1021 let greeks2 = OptionGreeks {
1023 instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
1024 underlying_price: Some(50200.0),
1025 ..Default::default()
1026 };
1027 manager.handle_greeks(&greeks2);
1028 assert_eq!(manager.aggregator.instrument_ids().len(), count);
1029 }
1030
1031 #[rstest]
1032 fn test_manager_fixed_range_bootstrapped_immediately() {
1033 let (manager, queue) = make_manager();
1035 assert!(manager.bootstrapped);
1036 assert!(queue.borrow().is_empty());
1037 }
1038
1039 #[rstest]
1040 fn test_manager_forward_price_bootstrap_from_greeks() {
1041 use nautilus_model::data::option_chain::OptionGreeks;
1042
1043 let (mut manager, _queue) = make_option_chain_manager();
1044 assert!(!manager.bootstrapped);
1045
1046 let greeks = OptionGreeks {
1048 instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
1049 underlying_price: Some(50000.0),
1050 ..Default::default()
1051 };
1052 manager.handle_greeks(&greeks);
1053 assert!(manager.bootstrapped);
1054 assert_eq!(manager.aggregator.instrument_ids().len(), 6);
1056 }
1057
1058 #[rstest]
1059 fn test_manager_forward_price_no_bootstrap_without_underlying() {
1060 use nautilus_model::data::option_chain::OptionGreeks;
1061
1062 let (mut manager, _queue) = make_option_chain_manager();
1063 assert!(!manager.bootstrapped);
1064
1065 let greeks = OptionGreeks {
1067 instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
1068 underlying_price: None,
1069 ..Default::default()
1070 };
1071 manager.handle_greeks(&greeks);
1072 assert!(!manager.bootstrapped);
1073 }
1074
1075 #[rstest]
1076 fn test_handle_instrument_expired_removes_from_aggregator() {
1077 let (mut manager, queue) = make_option_chain_manager();
1078 bootstrap_via_greeks(&mut manager);
1080 assert!(manager.bootstrapped);
1081 let initial_count = manager.aggregator.instruments().len();
1082 queue.borrow_mut().clear(); let expired_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1085 let is_empty = manager.handle_instrument_expired(&expired_id);
1086
1087 assert!(!is_empty);
1088 assert_eq!(manager.aggregator.instruments().len(), initial_count - 1);
1089 assert!(!manager.aggregator.active_ids().contains(&expired_id));
1090 }
1091
1092 #[rstest]
1093 fn test_handle_instrument_expired_pushes_deferred_unsubscribes() {
1094 let (mut manager, queue) = make_option_chain_manager();
1095 bootstrap_via_greeks(&mut manager);
1096 queue.borrow_mut().clear();
1097
1098 let expired_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1099 manager.handle_instrument_expired(&expired_id);
1100
1101 let cmds: Vec<_> = queue.borrow().iter().cloned().collect();
1103 assert_eq!(cmds.len(), 3);
1104 assert!(
1105 cmds.iter()
1106 .all(|c| matches!(c, DeferredCommand::Unsubscribe(_)))
1107 );
1108 }
1109
1110 #[rstest]
1111 fn test_handle_instrument_expired_returns_true_when_last() {
1112 let series_id = make_series_id();
1113 let topic = switchboard::get_option_chain_topic(series_id);
1114 let call_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1115 let strike = Price::from("50000");
1116 let mut instruments = HashMap::new();
1117 instruments.insert(call_id, (strike, OptionKind::Call));
1118 let tracker = AtmTracker::new();
1119 let aggregator = OptionChainAggregator::new(
1120 series_id,
1121 StrikeRange::Fixed(vec![strike]),
1122 tracker,
1123 instruments,
1124 );
1125 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1126 let queue = make_test_queue();
1127
1128 let mut manager = OptionChainManager {
1129 aggregator,
1130 topic,
1131 quote_handlers: Vec::new(),
1132 greeks_handlers: Vec::new(),
1133 timer_name: None,
1134 msgbus_priority: 0,
1135 bootstrapped: true,
1136 deferred_cmd_queue: queue,
1137 clock,
1138 raw_mode: false,
1139 };
1140
1141 let is_empty = manager.handle_instrument_expired(&call_id);
1142 assert!(is_empty);
1143 assert!(manager.aggregator.is_catalog_empty());
1144 }
1145
1146 #[rstest]
1147 fn test_handle_instrument_expired_unknown_noop() {
1148 let (mut manager, queue) = make_manager();
1149 queue.borrow_mut().clear();
1150
1151 let unknown = InstrumentId::from("ETH-20240101-3000-C.DERIBIT");
1152 let is_empty = manager.handle_instrument_expired(&unknown);
1153
1154 assert!(is_empty);
1156 assert!(queue.borrow().is_empty()); }
1158
1159 #[rstest]
1160 fn test_publish_slice_pushes_expire_series_when_expired() {
1161 let (mut manager, queue) = make_option_chain_manager();
1162 bootstrap_via_greeks(&mut manager);
1163 queue.borrow_mut().clear();
1164
1165 let expiry_ns = manager.aggregator.series_id().expiration_ns;
1167 manager.publish_slice(expiry_ns);
1168
1169 let cmds: Vec<_> = queue.borrow().iter().cloned().collect();
1170 assert_eq!(cmds.len(), 1);
1171 assert!(matches!(cmds[0], DeferredCommand::ExpireSeries(_)));
1172 }
1173
1174 #[rstest]
1175 fn test_expired_instrument_unsubscribes_include_instrument_status() {
1176 let (mut manager, queue) = make_option_chain_manager();
1177 bootstrap_via_greeks(&mut manager);
1178 queue.borrow_mut().clear();
1179
1180 let expired_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1181 manager.handle_instrument_expired(&expired_id);
1182
1183 let cmds: Vec<_> = queue.borrow().iter().cloned().collect();
1184 let status_unsubs = cmds
1186 .iter()
1187 .filter(|c| {
1188 matches!(
1189 c,
1190 DeferredCommand::Unsubscribe(UnsubscribeCommand::InstrumentStatus(_))
1191 )
1192 })
1193 .count();
1194 assert_eq!(status_unsubs, 1);
1195 }
1196}