Skip to main content

nautilus_data/option_chains/
manager.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Per-series option chain manager.
17//!
18//! Each [`OptionChainManager`] instance is self-contained: it owns its aggregator,
19//! msgbus handlers, and timer for a single option series. The `DataEngine` holds
20//! one manager per active series in
21//! `AHashMap<OptionSeriesId, Rc<RefCell<OptionChainManager>>>`.
22
23use std::{cell::RefCell, collections::HashMap, rc::Rc};
24
25use nautilus_common::{
26    cache::Cache,
27    clock::Clock,
28    messages::data::{
29        SubscribeCommand, SubscribeInstrumentStatus, SubscribeOptionChain, SubscribeOptionGreeks,
30        SubscribeQuotes, UnsubscribeCommand, UnsubscribeInstrumentStatus, UnsubscribeOptionGreeks,
31        UnsubscribeQuotes,
32    },
33    msgbus::{self, MStr, Topic, TypedHandler, switchboard},
34    timer::{TimeEvent, TimeEventCallback},
35};
36use nautilus_core::{UUID4, correctness::FAILED, datetime::millis_to_nanos_unchecked};
37use nautilus_model::{
38    data::{QuoteTick, option_chain::OptionGreeks},
39    enums::OptionKind,
40    identifiers::{InstrumentId, OptionSeriesId, Venue},
41    instruments::Instrument,
42    types::Price,
43};
44use ustr::Ustr;
45
46use super::{
47    AtmTracker, OptionChainAggregator,
48    handlers::{OptionChainGreeksHandler, OptionChainQuoteHandler, OptionChainSlicePublisher},
49};
50use crate::{
51    client::DataClientAdapter,
52    engine::{DeferredCommand, DeferredCommandQueue},
53};
54
55/// Per-series option chain manager.
56///
57/// Each instance manages a single option series: its aggregator,
58/// handlers, timer, and lifecycle. The `DataEngine` holds one
59/// manager per active series.
60#[derive(Debug)]
61pub struct OptionChainManager {
62    aggregator: OptionChainAggregator,
63    topic: MStr<Topic>,
64    quote_handlers: Vec<TypedHandler<QuoteTick>>,
65    greeks_handlers: Vec<TypedHandler<OptionGreeks>>,
66    timer_name: Option<Ustr>,
67    msgbus_priority: u32,
68    /// Whether the first ATM price has been received and the active set bootstrapped.
69    bootstrapped: bool,
70    /// Shared deferred command queue — the `DataEngine` drains this on each data tick.
71    deferred_cmd_queue: DeferredCommandQueue,
72    /// Clock reference for constructing command timestamps.
73    clock: Rc<RefCell<dyn Clock>>,
74    /// When `true`, every quote/greeks update for an active instrument immediately publishes a snapshot.
75    raw_mode: bool,
76}
77
78impl OptionChainManager {
79    /// Factory method that creates a per-series manager, registers all msgbus
80    /// handlers, forwards subscribe commands to the data client, and sets up
81    /// the snapshot timer.
82    ///
83    /// Returns the manager wrapped in `Rc<RefCell<>>` (needed for `WeakCell`
84    /// handler pattern).
85    #[expect(clippy::too_many_arguments)]
86    pub(crate) fn create_and_setup(
87        series_id: OptionSeriesId,
88        cache: &Rc<RefCell<Cache>>,
89        cmd: &SubscribeOptionChain,
90        clock: &Rc<RefCell<dyn Clock>>,
91        msgbus_priority: u32,
92        client: Option<&mut DataClientAdapter>,
93        initial_atm_price: Option<Price>,
94        deferred_cmd_queue: DeferredCommandQueue,
95    ) -> Rc<RefCell<Self>> {
96        let topic = switchboard::get_option_chain_topic(series_id);
97        let instruments = Self::resolve_instruments(cache, &series_id);
98
99        let mut tracker = AtmTracker::new();
100
101        // Derive forward price precision from instrument strike prices
102        if let Some((strike, _)) = instruments.values().next() {
103            tracker.set_forward_precision(strike.precision);
104        }
105
106        if let Some(price) = initial_atm_price {
107            tracker.set_initial_price(price);
108            log::info!("Pre-populated ATM with forward price: {price}");
109        }
110        let aggregator =
111            OptionChainAggregator::new(series_id, cmd.strike_range.clone(), tracker, instruments);
112
113        // Initial active set for msgbus handlers (subset of all instruments).
114        // When ATM is unknown (ATM-based ranges), this is empty — deferred until bootstrap.
115        let active_instrument_ids = aggregator.instrument_ids();
116        let all_instrument_ids = aggregator.all_instrument_ids();
117        // If active set is already populated (Fixed range or ATM provided), we're bootstrapped
118        let bootstrapped = !active_instrument_ids.is_empty() || all_instrument_ids.is_empty();
119
120        let raw_mode = cmd.snapshot_interval_ms.is_none();
121
122        let manager = Self {
123            aggregator,
124            topic,
125            quote_handlers: Vec::new(),
126            greeks_handlers: Vec::new(),
127            timer_name: None,
128            msgbus_priority,
129            bootstrapped,
130            deferred_cmd_queue,
131            clock: clock.clone(),
132            raw_mode,
133        };
134        let manager_rc = Rc::new(RefCell::new(manager));
135
136        // Register msgbus handlers for initial active set only
137        let (quote_handlers, _quote_handler) = Self::register_quote_handlers(
138            &manager_rc,
139            &active_instrument_ids,
140            series_id,
141            msgbus_priority,
142        );
143        let greeks_handlers = Self::register_greeks_handlers(
144            &manager_rc,
145            &active_instrument_ids,
146            series_id,
147            msgbus_priority,
148        );
149
150        // Forward wire-level subscriptions for the active set.
151        // When ATM is unknown, active set is empty — deferred until bootstrap.
152        Self::forward_client_subscriptions(
153            client,
154            &active_instrument_ids,
155            cmd,
156            series_id.venue,
157            clock,
158        );
159
160        let timer_name = cmd
161            .snapshot_interval_ms
162            .map(|ms| Self::setup_timer(&manager_rc, series_id, ms, clock));
163
164        {
165            let mut mgr = manager_rc.borrow_mut();
166            mgr.quote_handlers = quote_handlers;
167            mgr.greeks_handlers = greeks_handlers;
168            mgr.timer_name = timer_name;
169        }
170
171        let mode_str = match cmd.snapshot_interval_ms {
172            Some(ms) => format!("interval={ms}ms"),
173            None => "mode=raw".to_string(),
174        };
175        log::info!(
176            "Subscribed option chain for {series_id} ({} active/{} total instruments, {mode_str})",
177            active_instrument_ids.len(),
178            all_instrument_ids.len(),
179        );
180
181        manager_rc
182    }
183
184    /// Registers quote handlers on the msgbus for each instrument.
185    ///
186    /// Always stores the handler prototype as the first element so that
187    /// `register_handlers_for_instrument` can clone it during deferred bootstrap.
188    fn register_quote_handlers(
189        manager_rc: &Rc<RefCell<Self>>,
190        instrument_ids: &[InstrumentId],
191        series_id: OptionSeriesId,
192        priority: u32,
193    ) -> (Vec<TypedHandler<QuoteTick>>, TypedHandler<QuoteTick>) {
194        let quote_handler = TypedHandler::new(OptionChainQuoteHandler::new(manager_rc, series_id));
195        // Always store prototype as first element for bootstrap cloning
196        let mut handlers = Vec::with_capacity(instrument_ids.len() + 1);
197        handlers.push(quote_handler.clone());
198
199        for instrument_id in instrument_ids {
200            let topic = switchboard::get_quotes_topic(*instrument_id);
201            msgbus::subscribe_quotes(topic.into(), quote_handler.clone(), Some(priority));
202            handlers.push(quote_handler.clone());
203        }
204        (handlers, quote_handler)
205    }
206
207    /// Registers greeks handlers on the msgbus for each instrument.
208    ///
209    /// Always stores the handler prototype as the first element so that
210    /// `register_handlers_for_instrument` can clone it during deferred bootstrap.
211    fn register_greeks_handlers(
212        manager_rc: &Rc<RefCell<Self>>,
213        instrument_ids: &[InstrumentId],
214        series_id: OptionSeriesId,
215        priority: u32,
216    ) -> Vec<TypedHandler<OptionGreeks>> {
217        let greeks_handler =
218            TypedHandler::new(OptionChainGreeksHandler::new(manager_rc, series_id));
219        // Always store prototype as first element for bootstrap cloning
220        let mut handlers = Vec::with_capacity(instrument_ids.len() + 1);
221        handlers.push(greeks_handler.clone());
222
223        for instrument_id in instrument_ids {
224            let topic = switchboard::get_option_greeks_topic(*instrument_id);
225            msgbus::subscribe_option_greeks(topic.into(), greeks_handler.clone(), Some(priority));
226            handlers.push(greeks_handler.clone());
227        }
228        handlers
229    }
230
231    /// Forwards subscribe commands to the data client for all instruments.
232    fn forward_client_subscriptions(
233        client: Option<&mut DataClientAdapter>,
234        instrument_ids: &[InstrumentId],
235        cmd: &SubscribeOptionChain,
236        venue: Venue,
237        clock: &Rc<RefCell<dyn Clock>>,
238    ) {
239        let ts_init = clock.borrow().timestamp_ns();
240
241        let Some(client) = client else {
242            log::error!(
243                "Cannot forward option chain subscriptions: no client found for venue={venue}",
244            );
245            return;
246        };
247
248        for instrument_id in instrument_ids {
249            client.execute_subscribe(SubscribeCommand::Quotes(SubscribeQuotes {
250                instrument_id: *instrument_id,
251                client_id: cmd.client_id,
252                venue: Some(venue),
253                command_id: UUID4::new(),
254                ts_init,
255                correlation_id: None,
256                params: None,
257            }));
258            client.execute_subscribe(SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
259                instrument_id: *instrument_id,
260                client_id: cmd.client_id,
261                venue: Some(venue),
262                command_id: UUID4::new(),
263                ts_init,
264                correlation_id: None,
265                params: None,
266            }));
267            client.execute_subscribe(SubscribeCommand::InstrumentStatus(
268                SubscribeInstrumentStatus {
269                    instrument_id: *instrument_id,
270                    client_id: cmd.client_id,
271                    venue: Some(venue),
272                    command_id: UUID4::new(),
273                    ts_init,
274                    correlation_id: None,
275                    params: None,
276                },
277            ));
278        }
279
280        log::info!(
281            "Forwarded {} quote + greeks + instrument status subscriptions to DataClient",
282            instrument_ids.len(),
283        );
284    }
285
286    /// Sets up the snapshot timer for periodic publishing.
287    fn setup_timer(
288        manager_rc: &Rc<RefCell<Self>>,
289        series_id: OptionSeriesId,
290        interval_ms: u64,
291        clock: &Rc<RefCell<dyn Clock>>,
292    ) -> Ustr {
293        let interval_ns = millis_to_nanos_unchecked(interval_ms as f64);
294        let publisher = OptionChainSlicePublisher::new(manager_rc);
295        let timer_name = Ustr::from(&format!("OptionChain|{series_id}|{interval_ms}"));
296
297        let now_ns = clock.borrow().timestamp_ns().as_u64();
298        let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
299
300        let callback_fn: Rc<dyn Fn(TimeEvent)> = Rc::new(move |event| publisher.publish(&event));
301        let callback = TimeEventCallback::from(callback_fn);
302
303        clock
304            .borrow_mut()
305            .set_timer_ns(
306                &timer_name,
307                interval_ns,
308                Some(start_time_ns.into()),
309                None,
310                Some(callback),
311                None,
312                None,
313            )
314            .expect(FAILED);
315
316        timer_name
317    }
318
319    /// Returns all instrument IDs in the full catalog (not just the active set).
320    #[must_use]
321    pub fn all_instrument_ids(&self) -> Vec<InstrumentId> {
322        self.aggregator.all_instrument_ids()
323    }
324
325    /// Returns the venue for this option chain.
326    #[must_use]
327    pub fn venue(&self) -> Venue {
328        self.aggregator.series_id().venue
329    }
330
331    /// Tears down this manager: unregisters all msgbus handlers and cancels the timer.
332    pub fn teardown(&mut self, clock: &Rc<RefCell<dyn Clock>>) {
333        // Unsubscribe from all currently active instruments
334        let instrument_ids = self.aggregator.instrument_ids();
335
336        // Unregister quote handlers
337        if let Some(handler) = self.quote_handlers.first() {
338            for instrument_id in &instrument_ids {
339                let topic = switchboard::get_quotes_topic(*instrument_id);
340                msgbus::unsubscribe_quotes(topic.into(), handler);
341            }
342        }
343
344        // Unregister greeks handlers
345        if let Some(handler) = self.greeks_handlers.first() {
346            for instrument_id in &instrument_ids {
347                let topic = switchboard::get_option_greeks_topic(*instrument_id);
348                msgbus::unsubscribe_option_greeks(topic.into(), handler);
349            }
350        }
351
352        // Cancel timer
353        if let Some(timer_name) = self.timer_name.take() {
354            let mut clk = clock.borrow_mut();
355            if clk.timer_exists(&timer_name) {
356                clk.cancel_timer(&timer_name);
357            }
358        }
359
360        self.quote_handlers.clear();
361        self.greeks_handlers.clear();
362    }
363
364    /// Routes incoming greeks to the aggregator.
365    ///
366    /// Also updates the ATM tracker from the forward price if `ForwardPrice` source is active,
367    /// and triggers deferred bootstrap on the first arrival.
368    pub fn handle_greeks(&mut self, greeks: &OptionGreeks) {
369        if self.aggregator.is_expired(greeks.ts_event) {
370            log::warn!(
371                "Dropping greeks for {}, series {} expired",
372                greeks.instrument_id,
373                self.aggregator.series_id(),
374            );
375            self.deferred_cmd_queue
376                .borrow_mut()
377                .push_back(DeferredCommand::ExpireInstrument(greeks.instrument_id));
378            return;
379        }
380
381        // Update ATM tracker from forward price (ForwardPrice source only)
382        self.aggregator
383            .atm_tracker_mut()
384            .update_from_option_greeks(greeks);
385        // Route greeks to aggregator for storage
386        self.aggregator.update_greeks(greeks);
387        // Check if first ATM arrival triggers deferred bootstrap
388        self.maybe_bootstrap();
389
390        if self.raw_mode
391            && self.bootstrapped
392            && self.aggregator.active_ids().contains(&greeks.instrument_id)
393        {
394            self.publish_slice(greeks.ts_event);
395        }
396    }
397
398    /// Handles an expired/settled instrument by removing it from the aggregator,
399    /// unregistering msgbus handlers, and pushing deferred wire unsubscribes.
400    ///
401    /// Returns `true` if the aggregator catalog is now empty (all instruments expired),
402    /// signaling the engine to tear down this entire manager.
403    pub fn handle_instrument_expired(&mut self, instrument_id: &InstrumentId) -> bool {
404        let was_active = self.aggregator.active_ids().contains(instrument_id);
405
406        if !self.aggregator.remove_instrument(instrument_id) {
407            return self.aggregator.is_catalog_empty();
408        }
409
410        if was_active {
411            // Unregister msgbus handlers for this instrument
412            if let Some(qh) = self.quote_handlers.first() {
413                let topic = switchboard::get_quotes_topic(*instrument_id);
414                msgbus::unsubscribe_quotes(topic.into(), qh);
415            }
416
417            if let Some(gh) = self.greeks_handlers.first() {
418                let topic = switchboard::get_option_greeks_topic(*instrument_id);
419                msgbus::unsubscribe_option_greeks(topic.into(), gh);
420            }
421
422            // Push deferred wire unsubscribes
423            self.push_unsubscribe_commands(*instrument_id);
424        }
425
426        log::info!(
427            "Removed expired instrument {instrument_id} from option chain {} (was_active={was_active}, remaining={})",
428            self.aggregator.series_id(),
429            self.aggregator.instruments().len(),
430        );
431
432        self.aggregator.is_catalog_empty()
433    }
434
435    /// Routes an incoming quote tick to the aggregator, then bootstraps if ready.
436    ///
437    /// This handles both option instrument quotes (aggregator) and ATM source quotes
438    /// (the aggregator's ATM tracker handles filtering internally).
439    pub fn handle_quote(&mut self, quote: &QuoteTick) {
440        if self.aggregator.is_expired(quote.ts_event) {
441            log::warn!(
442                "Dropping quote for {}, series {} expired",
443                quote.instrument_id,
444                self.aggregator.series_id(),
445            );
446            self.deferred_cmd_queue
447                .borrow_mut()
448                .push_back(DeferredCommand::ExpireInstrument(quote.instrument_id));
449            return;
450        }
451
452        self.aggregator.update_quote(quote);
453        self.maybe_bootstrap();
454
455        if self.raw_mode
456            && self.bootstrapped
457            && self.aggregator.active_ids().contains(&quote.instrument_id)
458        {
459            self.publish_slice(quote.ts_event);
460        }
461    }
462
463    /// Bootstraps the active instrument set on the first ATM price arrival.
464    ///
465    /// Computes active strikes, registers msgbus handlers for those instruments,
466    /// and pushes deferred wire subscriptions into the shared command queue.
467    fn maybe_bootstrap(&mut self) {
468        if self.bootstrapped {
469            return;
470        }
471
472        if self.aggregator.atm_tracker().atm_price().is_none() {
473            return;
474        }
475
476        // First ATM received — compute active set and register handlers
477        let active_ids = self.aggregator.recompute_active_set();
478        self.register_handlers_for_instruments_bulk(&active_ids);
479
480        for &id in &active_ids {
481            self.push_subscribe_commands(id);
482        }
483
484        self.bootstrapped = true;
485
486        log::info!(
487            "Bootstrapped option chain for {} ({} active instruments)",
488            self.aggregator.series_id(),
489            active_ids.len(),
490        );
491    }
492
493    /// Registers msgbus handlers for a batch of instruments.
494    fn register_handlers_for_instruments_bulk(&self, instrument_ids: &[InstrumentId]) {
495        for &id in instrument_ids {
496            self.register_handlers_for_instrument(id);
497        }
498    }
499
500    /// Adds a dynamically discovered instrument to this option chain.
501    ///
502    /// Registers msgbus handlers when the instrument falls in the active
503    /// range and forwards wire-level subscriptions via `client`.
504    /// Returns `true` if the instrument was newly inserted.
505    pub fn add_instrument(
506        &mut self,
507        instrument_id: InstrumentId,
508        strike: Price,
509        kind: OptionKind,
510        client: Option<&mut DataClientAdapter>,
511        clock: &Rc<RefCell<dyn Clock>>,
512    ) -> bool {
513        if !self.aggregator.add_instrument(instrument_id, strike, kind) {
514            return false;
515        }
516
517        if self.aggregator.active_ids().contains(&instrument_id) {
518            self.register_handlers_for_instrument(instrument_id);
519        }
520
521        let venue = self.aggregator.series_id().venue;
522        Self::forward_instrument_subscriptions(client, instrument_id, venue, clock);
523
524        log::info!(
525            "Added instrument {instrument_id} to option chain {} (active={})",
526            self.aggregator.series_id(),
527            self.aggregator.active_ids().contains(&instrument_id),
528        );
529
530        true
531    }
532
533    fn register_handlers_for_instrument(&self, instrument_id: InstrumentId) {
534        if let Some(qh) = self.quote_handlers.first().cloned() {
535            let topic = switchboard::get_quotes_topic(instrument_id);
536            msgbus::subscribe_quotes(topic.into(), qh, Some(self.msgbus_priority));
537        }
538
539        if let Some(gh) = self.greeks_handlers.first().cloned() {
540            let topic = switchboard::get_option_greeks_topic(instrument_id);
541            msgbus::subscribe_option_greeks(topic.into(), gh, Some(self.msgbus_priority));
542        }
543    }
544
545    /// Pushes deferred subscribe commands (quotes, greeks, instrument status) for a single instrument.
546    fn push_subscribe_commands(&self, instrument_id: InstrumentId) {
547        let venue = self.aggregator.series_id().venue;
548        let ts_init = self.clock.borrow().timestamp_ns();
549        let mut queue = self.deferred_cmd_queue.borrow_mut();
550        queue.push_back(DeferredCommand::Subscribe(SubscribeCommand::Quotes(
551            SubscribeQuotes {
552                instrument_id,
553                client_id: None,
554                venue: Some(venue),
555                command_id: UUID4::new(),
556                ts_init,
557                correlation_id: None,
558                params: None,
559            },
560        )));
561        queue.push_back(DeferredCommand::Subscribe(SubscribeCommand::OptionGreeks(
562            SubscribeOptionGreeks {
563                instrument_id,
564                client_id: None,
565                venue: Some(venue),
566                command_id: UUID4::new(),
567                ts_init,
568                correlation_id: None,
569                params: None,
570            },
571        )));
572        queue.push_back(DeferredCommand::Subscribe(
573            SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
574                instrument_id,
575                client_id: None,
576                venue: Some(venue),
577                command_id: UUID4::new(),
578                ts_init,
579                correlation_id: None,
580                params: None,
581            }),
582        ));
583    }
584
585    /// Pushes deferred unsubscribe commands (quotes, greeks, instrument status) for a single instrument.
586    fn push_unsubscribe_commands(&self, instrument_id: InstrumentId) {
587        let venue = self.aggregator.series_id().venue;
588        let ts_init = self.clock.borrow().timestamp_ns();
589        let mut queue = self.deferred_cmd_queue.borrow_mut();
590        queue.push_back(DeferredCommand::Unsubscribe(UnsubscribeCommand::Quotes(
591            UnsubscribeQuotes {
592                instrument_id,
593                client_id: None,
594                venue: Some(venue),
595                command_id: UUID4::new(),
596                ts_init,
597                correlation_id: None,
598                params: None,
599            },
600        )));
601        queue.push_back(DeferredCommand::Unsubscribe(
602            UnsubscribeCommand::OptionGreeks(UnsubscribeOptionGreeks {
603                instrument_id,
604                client_id: None,
605                venue: Some(venue),
606                command_id: UUID4::new(),
607                ts_init,
608                correlation_id: None,
609                params: None,
610            }),
611        ));
612        queue.push_back(DeferredCommand::Unsubscribe(
613            UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
614                instrument_id,
615                client_id: None,
616                venue: Some(venue),
617                command_id: UUID4::new(),
618                ts_init,
619                correlation_id: None,
620                params: None,
621            }),
622        ));
623    }
624
625    /// Forwards quote, greeks, and instrument status subscriptions for a single instrument.
626    fn forward_instrument_subscriptions(
627        client: Option<&mut DataClientAdapter>,
628        instrument_id: InstrumentId,
629        venue: Venue,
630        clock: &Rc<RefCell<dyn Clock>>,
631    ) {
632        let Some(client) = client else {
633            log::error!(
634                "Cannot forward subscriptions for {instrument_id}: no client for venue={venue}",
635            );
636            return;
637        };
638
639        let ts_init = clock.borrow().timestamp_ns();
640
641        client.execute_subscribe(SubscribeCommand::Quotes(SubscribeQuotes {
642            instrument_id,
643            client_id: None,
644            venue: Some(venue),
645            command_id: UUID4::new(),
646            ts_init,
647            correlation_id: None,
648            params: None,
649        }));
650        client.execute_subscribe(SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
651            instrument_id,
652            client_id: None,
653            venue: Some(venue),
654            command_id: UUID4::new(),
655            ts_init,
656            correlation_id: None,
657            params: None,
658        }));
659        client.execute_subscribe(SubscribeCommand::InstrumentStatus(
660            SubscribeInstrumentStatus {
661                instrument_id,
662                client_id: None,
663                venue: Some(venue),
664                command_id: UUID4::new(),
665                ts_init,
666                correlation_id: None,
667                params: None,
668            },
669        ));
670    }
671
672    /// Checks if ATM has shifted and rebalances msgbus subscriptions if needed.
673    fn maybe_rebalance(&mut self, now_ns: nautilus_core::UnixNanos) {
674        let Some(action) = self.aggregator.check_rebalance(now_ns) else {
675            return;
676        };
677
678        // Unsubscribe removed instruments from msgbus
679        if let Some(qh) = self.quote_handlers.first() {
680            for id in &action.remove {
681                msgbus::unsubscribe_quotes(switchboard::get_quotes_topic(*id).into(), qh);
682            }
683        }
684
685        if let Some(gh) = self.greeks_handlers.first() {
686            for id in &action.remove {
687                msgbus::unsubscribe_option_greeks(
688                    switchboard::get_option_greeks_topic(*id).into(),
689                    gh,
690                );
691            }
692        }
693
694        // Subscribe new instruments on msgbus
695        if let Some(qh) = self.quote_handlers.first().cloned() {
696            for id in &action.add {
697                msgbus::subscribe_quotes(
698                    switchboard::get_quotes_topic(*id).into(),
699                    qh.clone(),
700                    Some(self.msgbus_priority),
701                );
702            }
703        }
704
705        if let Some(gh) = self.greeks_handlers.first().cloned() {
706            for id in &action.add {
707                msgbus::subscribe_option_greeks(
708                    switchboard::get_option_greeks_topic(*id).into(),
709                    gh.clone(),
710                    Some(self.msgbus_priority),
711                );
712            }
713        }
714
715        // Push deferred wire-level changes into the shared command queue
716        for &id in &action.add {
717            self.push_subscribe_commands(id);
718        }
719
720        for &id in &action.remove {
721            self.push_unsubscribe_commands(id);
722        }
723
724        if !action.add.is_empty() || !action.remove.is_empty() {
725            log::info!(
726                "Rebalanced option chain for {}: +{} -{} instruments",
727                self.aggregator.series_id(),
728                action.add.len(),
729                action.remove.len(),
730            );
731        }
732
733        // Apply state changes to aggregator
734        self.aggregator.apply_rebalance(&action, now_ns);
735    }
736
737    /// Takes the accumulated snapshot and publishes it to the msgbus.
738    pub fn publish_slice(&mut self, ts: nautilus_core::UnixNanos) {
739        // Proactive expiry safeguard
740        if self.aggregator.is_expired(ts) {
741            self.deferred_cmd_queue
742                .borrow_mut()
743                .push_back(DeferredCommand::ExpireSeries(self.aggregator.series_id()));
744            return;
745        }
746
747        self.maybe_rebalance(ts);
748
749        let series_id = self.aggregator.series_id();
750        let slice = self.aggregator.snapshot(ts);
751
752        if slice.is_empty() {
753            log::debug!("OptionChainSlice empty for {series_id}, skipping publish");
754            return;
755        }
756
757        log::debug!(
758            "Publishing OptionChainSlice for {} (calls={}, puts={})",
759            series_id,
760            slice.call_count(),
761            slice.put_count(),
762        );
763        msgbus::publish_option_chain(self.topic, &slice);
764    }
765
766    /// Resolves instruments from cache that match the given option series.
767    fn resolve_instruments(
768        cache: &Rc<RefCell<Cache>>,
769        series_id: &OptionSeriesId,
770    ) -> HashMap<InstrumentId, (Price, OptionKind)> {
771        let cache = cache.borrow();
772        let mut map = HashMap::new();
773
774        for instrument in cache.instruments(&series_id.venue, Some(&series_id.underlying)) {
775            let Some(expiration) = instrument.expiration_ns() else {
776                continue;
777            };
778
779            if expiration != series_id.expiration_ns {
780                continue;
781            }
782
783            if instrument.settlement_currency().code != series_id.settlement_currency {
784                continue;
785            }
786
787            let Some(strike) = instrument.strike_price() else {
788                continue;
789            };
790
791            let Some(kind) = instrument.option_kind() else {
792                continue;
793            };
794
795            map.insert(instrument.id(), (strike, kind));
796        }
797
798        map
799    }
800}
801
802#[cfg(test)]
803mod tests {
804    use std::collections::VecDeque;
805
806    use nautilus_common::clock::TestClock;
807    use nautilus_core::UnixNanos;
808    use nautilus_model::{data::option_chain::StrikeRange, identifiers::Venue, types::Quantity};
809    use rstest::*;
810
811    use super::*;
812
813    fn make_series_id() -> OptionSeriesId {
814        OptionSeriesId::new(
815            Venue::new("DERIBIT"),
816            ustr::Ustr::from("BTC"),
817            ustr::Ustr::from("BTC"),
818            UnixNanos::from(1_700_000_000_000_000_000u64),
819        )
820    }
821
822    fn make_test_queue() -> DeferredCommandQueue {
823        Rc::new(RefCell::new(VecDeque::new()))
824    }
825
826    fn make_manager() -> (OptionChainManager, DeferredCommandQueue) {
827        let series_id = make_series_id();
828        let topic = switchboard::get_option_chain_topic(series_id);
829        let tracker = AtmTracker::new();
830        let aggregator = OptionChainAggregator::new(
831            series_id,
832            StrikeRange::Fixed(vec![]),
833            tracker,
834            HashMap::new(),
835        );
836        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
837        let queue = make_test_queue();
838
839        let manager = OptionChainManager {
840            aggregator,
841            topic,
842            quote_handlers: Vec::new(),
843            greeks_handlers: Vec::new(),
844            timer_name: None,
845            msgbus_priority: 0,
846            bootstrapped: true,
847            deferred_cmd_queue: queue.clone(),
848            clock,
849            raw_mode: false,
850        };
851        (manager, queue)
852    }
853
854    #[rstest]
855    fn test_manager_handle_quote_no_instrument() {
856        let (mut manager, _queue) = make_manager();
857
858        // Should not panic — quote for unknown instrument
859        let quote = QuoteTick::new(
860            InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
861            Price::from("100.00"),
862            Price::from("101.00"),
863            Quantity::from("1.0"),
864            Quantity::from("1.0"),
865            UnixNanos::from(1u64),
866            UnixNanos::from(1u64),
867        );
868        manager.handle_quote(&quote);
869    }
870
871    #[rstest]
872    fn test_manager_publish_slice_empty() {
873        let (mut manager, _queue) = make_manager();
874        // Should not panic — empty slice skips publish
875        manager.publish_slice(UnixNanos::from(100u64));
876    }
877
878    #[rstest]
879    fn test_manager_teardown_no_handlers() {
880        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
881        let (mut manager, _queue) = make_manager();
882        // Should not panic — no handlers to unregister
883        manager.teardown(&clock);
884        assert!(manager.quote_handlers.is_empty());
885    }
886
887    fn make_option_chain_manager() -> (OptionChainManager, DeferredCommandQueue) {
888        let series_id = make_series_id();
889        let topic = switchboard::get_option_chain_topic(series_id);
890
891        let strikes = [45000, 47500, 50000, 52500, 55000];
892        let mut instruments = HashMap::new();
893
894        for s in &strikes {
895            let strike = Price::from(&s.to_string());
896            let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
897            let put_id = InstrumentId::from(&format!("BTC-20240101-{s}-P.DERIBIT"));
898            instruments.insert(call_id, (strike, OptionKind::Call));
899            instruments.insert(put_id, (strike, OptionKind::Put));
900        }
901
902        let tracker = AtmTracker::new();
903        let aggregator = OptionChainAggregator::new(
904            series_id,
905            StrikeRange::AtmRelative {
906                strikes_above: 1,
907                strikes_below: 1,
908            },
909            tracker,
910            instruments,
911        );
912        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
913        let queue = make_test_queue();
914
915        let manager = OptionChainManager {
916            aggregator,
917            topic,
918            quote_handlers: Vec::new(),
919            greeks_handlers: Vec::new(),
920            timer_name: None,
921            msgbus_priority: 0,
922            bootstrapped: false,
923            deferred_cmd_queue: queue.clone(),
924            clock,
925            raw_mode: false,
926        };
927        (manager, queue)
928    }
929
930    fn bootstrap_via_greeks(manager: &mut OptionChainManager) {
931        use nautilus_model::data::option_chain::OptionGreeks;
932        let greeks = OptionGreeks {
933            instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
934            underlying_price: Some(50000.0),
935            ..Default::default()
936        };
937        manager.handle_greeks(&greeks);
938    }
939
940    #[rstest]
941    fn test_manager_publish_slice_triggers_rebalance() {
942        let (mut manager, queue) = make_option_chain_manager();
943        // Initially no instruments active (ATM unknown, deferred)
944        assert_eq!(manager.aggregator.instrument_ids().len(), 0);
945
946        // Feed ATM near 50000 via greeks — bootstrap computes active set (3 strikes × 2 = 6)
947        bootstrap_via_greeks(&mut manager);
948        assert!(manager.bootstrapped);
949        assert_eq!(manager.aggregator.instrument_ids().len(), 6); // 3 strikes × 2
950
951        // Deferred queue should contain subscribe commands (6 instruments × 3 = 18 commands)
952        assert_eq!(queue.borrow().len(), 18);
953
954        // publish_slice should still work normally after bootstrap
955        manager.publish_slice(UnixNanos::from(100u64));
956        assert!(manager.aggregator.last_atm_strike().is_some());
957    }
958
959    #[rstest]
960    fn test_manager_add_instrument_new() {
961        let (mut manager, _queue) = make_option_chain_manager();
962        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
963        let new_id = InstrumentId::from("BTC-20240101-57500-C.DERIBIT");
964        let strike = Price::from("57500");
965        let count_before = manager.aggregator.instruments().len();
966
967        let result = manager.add_instrument(new_id, strike, OptionKind::Call, None, &clock);
968
969        assert!(result);
970        assert_eq!(manager.aggregator.instruments().len(), count_before + 1);
971    }
972
973    #[rstest]
974    fn test_manager_add_instrument_already_known() {
975        let (mut manager, _queue) = make_option_chain_manager();
976        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
977        let existing_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
978        let strike = Price::from("50000");
979        let count_before = manager.aggregator.instruments().len();
980
981        let result = manager.add_instrument(existing_id, strike, OptionKind::Call, None, &clock);
982
983        assert!(!result);
984        assert_eq!(manager.aggregator.instruments().len(), count_before);
985    }
986
987    #[rstest]
988    fn test_manager_deferred_bootstrap_on_first_atm() {
989        let (mut manager, queue) = make_option_chain_manager();
990        // Initially not bootstrapped, no active instruments
991        assert!(!manager.bootstrapped);
992        assert_eq!(manager.aggregator.instrument_ids().len(), 0);
993        assert!(queue.borrow().is_empty());
994
995        // Feed ATM via greeks → triggers bootstrap
996        bootstrap_via_greeks(&mut manager);
997
998        assert!(manager.bootstrapped);
999        assert_eq!(manager.aggregator.instrument_ids().len(), 6); // 3 strikes × 2
1000        // 6 instruments × 3 commands each (quotes + greeks + instrument_status) = 18 deferred commands
1001        assert_eq!(queue.borrow().len(), 18);
1002
1003        // All commands should be Subscribe variants
1004        assert!(
1005            queue
1006                .borrow()
1007                .iter()
1008                .all(|cmd| matches!(cmd, DeferredCommand::Subscribe(_)))
1009        );
1010    }
1011
1012    #[rstest]
1013    fn test_manager_bootstrap_idempotent() {
1014        use nautilus_model::data::option_chain::OptionGreeks;
1015
1016        let (mut manager, _queue) = make_option_chain_manager();
1017        bootstrap_via_greeks(&mut manager);
1018        assert!(manager.bootstrapped);
1019        let count = manager.aggregator.instrument_ids().len();
1020
1021        // Feed another ATM update — bootstrap should not fire again
1022        let greeks2 = OptionGreeks {
1023            instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
1024            underlying_price: Some(50200.0),
1025            ..Default::default()
1026        };
1027        manager.handle_greeks(&greeks2);
1028        assert_eq!(manager.aggregator.instrument_ids().len(), count);
1029    }
1030
1031    #[rstest]
1032    fn test_manager_fixed_range_bootstrapped_immediately() {
1033        // Fixed range manager is bootstrapped at creation (no ATM needed)
1034        let (manager, queue) = make_manager();
1035        assert!(manager.bootstrapped);
1036        assert!(queue.borrow().is_empty());
1037    }
1038
1039    #[rstest]
1040    fn test_manager_forward_price_bootstrap_from_greeks() {
1041        use nautilus_model::data::option_chain::OptionGreeks;
1042
1043        let (mut manager, _queue) = make_option_chain_manager();
1044        assert!(!manager.bootstrapped);
1045
1046        // First greeks with underlying_price → updates ATM tracker and triggers bootstrap
1047        let greeks = OptionGreeks {
1048            instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
1049            underlying_price: Some(50000.0),
1050            ..Default::default()
1051        };
1052        manager.handle_greeks(&greeks);
1053        assert!(manager.bootstrapped);
1054        // 3 strikes × 2 sides = 6 active instruments
1055        assert_eq!(manager.aggregator.instrument_ids().len(), 6);
1056    }
1057
1058    #[rstest]
1059    fn test_manager_forward_price_no_bootstrap_without_underlying() {
1060        use nautilus_model::data::option_chain::OptionGreeks;
1061
1062        let (mut manager, _queue) = make_option_chain_manager();
1063        assert!(!manager.bootstrapped);
1064
1065        // Greeks with no underlying_price → should not bootstrap
1066        let greeks = OptionGreeks {
1067            instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
1068            underlying_price: None,
1069            ..Default::default()
1070        };
1071        manager.handle_greeks(&greeks);
1072        assert!(!manager.bootstrapped);
1073    }
1074
1075    #[rstest]
1076    fn test_handle_instrument_expired_removes_from_aggregator() {
1077        let (mut manager, queue) = make_option_chain_manager();
1078        // Bootstrap so instruments are active
1079        bootstrap_via_greeks(&mut manager);
1080        assert!(manager.bootstrapped);
1081        let initial_count = manager.aggregator.instruments().len();
1082        queue.borrow_mut().clear(); // clear bootstrap commands
1083
1084        let expired_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1085        let is_empty = manager.handle_instrument_expired(&expired_id);
1086
1087        assert!(!is_empty);
1088        assert_eq!(manager.aggregator.instruments().len(), initial_count - 1);
1089        assert!(!manager.aggregator.active_ids().contains(&expired_id));
1090    }
1091
1092    #[rstest]
1093    fn test_handle_instrument_expired_pushes_deferred_unsubscribes() {
1094        let (mut manager, queue) = make_option_chain_manager();
1095        bootstrap_via_greeks(&mut manager);
1096        queue.borrow_mut().clear();
1097
1098        let expired_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1099        manager.handle_instrument_expired(&expired_id);
1100
1101        // Should push 3 unsubscribe commands (quotes + greeks + instrument_status)
1102        let cmds: Vec<_> = queue.borrow().iter().cloned().collect();
1103        assert_eq!(cmds.len(), 3);
1104        assert!(
1105            cmds.iter()
1106                .all(|c| matches!(c, DeferredCommand::Unsubscribe(_)))
1107        );
1108    }
1109
1110    #[rstest]
1111    fn test_handle_instrument_expired_returns_true_when_last() {
1112        let series_id = make_series_id();
1113        let topic = switchboard::get_option_chain_topic(series_id);
1114        let call_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1115        let strike = Price::from("50000");
1116        let mut instruments = HashMap::new();
1117        instruments.insert(call_id, (strike, OptionKind::Call));
1118        let tracker = AtmTracker::new();
1119        let aggregator = OptionChainAggregator::new(
1120            series_id,
1121            StrikeRange::Fixed(vec![strike]),
1122            tracker,
1123            instruments,
1124        );
1125        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1126        let queue = make_test_queue();
1127
1128        let mut manager = OptionChainManager {
1129            aggregator,
1130            topic,
1131            quote_handlers: Vec::new(),
1132            greeks_handlers: Vec::new(),
1133            timer_name: None,
1134            msgbus_priority: 0,
1135            bootstrapped: true,
1136            deferred_cmd_queue: queue,
1137            clock,
1138            raw_mode: false,
1139        };
1140
1141        let is_empty = manager.handle_instrument_expired(&call_id);
1142        assert!(is_empty);
1143        assert!(manager.aggregator.is_catalog_empty());
1144    }
1145
1146    #[rstest]
1147    fn test_handle_instrument_expired_unknown_noop() {
1148        let (mut manager, queue) = make_manager();
1149        queue.borrow_mut().clear();
1150
1151        let unknown = InstrumentId::from("ETH-20240101-3000-C.DERIBIT");
1152        let is_empty = manager.handle_instrument_expired(&unknown);
1153
1154        // Empty manager returns true (catalog was already empty)
1155        assert!(is_empty);
1156        assert!(queue.borrow().is_empty()); // no deferred commands pushed
1157    }
1158
1159    #[rstest]
1160    fn test_publish_slice_pushes_expire_series_when_expired() {
1161        let (mut manager, queue) = make_option_chain_manager();
1162        bootstrap_via_greeks(&mut manager);
1163        queue.borrow_mut().clear();
1164
1165        // Publish at the expiration timestamp — should push ExpireSeries, not publish
1166        let expiry_ns = manager.aggregator.series_id().expiration_ns;
1167        manager.publish_slice(expiry_ns);
1168
1169        let cmds: Vec<_> = queue.borrow().iter().cloned().collect();
1170        assert_eq!(cmds.len(), 1);
1171        assert!(matches!(cmds[0], DeferredCommand::ExpireSeries(_)));
1172    }
1173
1174    #[rstest]
1175    fn test_expired_instrument_unsubscribes_include_instrument_status() {
1176        let (mut manager, queue) = make_option_chain_manager();
1177        bootstrap_via_greeks(&mut manager);
1178        queue.borrow_mut().clear();
1179
1180        let expired_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1181        manager.handle_instrument_expired(&expired_id);
1182
1183        let cmds: Vec<_> = queue.borrow().iter().cloned().collect();
1184        // Should have exactly one InstrumentStatus unsubscribe among the 3
1185        let status_unsubs = cmds
1186            .iter()
1187            .filter(|c| {
1188                matches!(
1189                    c,
1190                    DeferredCommand::Unsubscribe(UnsubscribeCommand::InstrumentStatus(_))
1191                )
1192            })
1193            .count();
1194        assert_eq!(status_unsubs, 1);
1195    }
1196}