Skip to main content

nautilus_data/option_chains/
manager.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Per-series option chain manager.
17//!
18//! Each [`OptionChainManager`] instance is self-contained: it owns its aggregator,
19//! msgbus handlers, and timer for a single option series. The `DataEngine` holds
20//! one manager per active series in
21//! `AHashMap<OptionSeriesId, Rc<RefCell<OptionChainManager>>>`.
22
23use std::{cell::RefCell, collections::HashMap, rc::Rc};
24
25use nautilus_common::{
26    cache::Cache,
27    clock::Clock,
28    messages::data::{
29        SubscribeCommand, SubscribeInstrumentStatus, SubscribeOptionChain, SubscribeOptionGreeks,
30        SubscribeQuotes, UnsubscribeCommand, UnsubscribeInstrumentStatus, UnsubscribeOptionGreeks,
31        UnsubscribeQuotes,
32    },
33    msgbus::{self, MStr, Topic, TypedHandler, switchboard},
34    timer::{TimeEvent, TimeEventCallback},
35};
36use nautilus_core::{UUID4, correctness::FAILED, datetime::millis_to_nanos_unchecked};
37use nautilus_model::{
38    data::{QuoteTick, option_chain::OptionGreeks},
39    enums::OptionKind,
40    identifiers::{InstrumentId, OptionSeriesId, Venue},
41    instruments::Instrument,
42    types::Price,
43};
44use ustr::Ustr;
45
46use super::{
47    AtmTracker, OptionChainAggregator,
48    handlers::{OptionChainGreeksHandler, OptionChainQuoteHandler, OptionChainSlicePublisher},
49};
50use crate::{
51    client::DataClientAdapter,
52    engine::{DeferredCommand, DeferredCommandQueue},
53};
54
55/// Per-series option chain manager.
56///
57/// Each instance manages a single option series: its aggregator,
58/// handlers, timer, and lifecycle. The `DataEngine` holds one
59/// manager per active series.
60#[derive(Debug)]
61pub struct OptionChainManager {
62    aggregator: OptionChainAggregator,
63    topic: MStr<Topic>,
64    quote_handlers: Vec<TypedHandler<QuoteTick>>,
65    greeks_handlers: Vec<TypedHandler<OptionGreeks>>,
66    timer_name: Option<Ustr>,
67    msgbus_priority: u8,
68    /// Whether the first ATM price has been received and the active set bootstrapped.
69    bootstrapped: bool,
70    /// Shared deferred command queue — the `DataEngine` drains this on each data tick.
71    deferred_cmd_queue: DeferredCommandQueue,
72    /// Clock reference for constructing command timestamps.
73    clock: Rc<RefCell<dyn Clock>>,
74    /// When `true`, every quote/greeks update for an active instrument immediately publishes a snapshot.
75    raw_mode: bool,
76}
77
78impl OptionChainManager {
79    /// Factory method that creates a per-series manager, registers all msgbus
80    /// handlers, forwards subscribe commands to the data client, and sets up
81    /// the snapshot timer.
82    ///
83    /// Returns the manager wrapped in `Rc<RefCell<>>` (needed for `WeakCell`
84    /// handler pattern).
85    #[allow(clippy::too_many_arguments)]
86    pub(crate) fn create_and_setup(
87        series_id: OptionSeriesId,
88        cache: &Rc<RefCell<Cache>>,
89        cmd: &SubscribeOptionChain,
90        clock: &Rc<RefCell<dyn Clock>>,
91        msgbus_priority: u8,
92        client: Option<&mut DataClientAdapter>,
93        initial_atm_price: Option<Price>,
94        deferred_cmd_queue: DeferredCommandQueue,
95    ) -> Rc<RefCell<Self>> {
96        let topic = switchboard::get_option_chain_topic(series_id);
97        let instruments = Self::resolve_instruments(cache, &series_id);
98
99        let mut tracker = AtmTracker::new();
100
101        // Derive forward price precision from instrument strike prices
102        if let Some((strike, _)) = instruments.values().next() {
103            tracker.set_forward_precision(strike.precision);
104        }
105
106        if let Some(price) = initial_atm_price {
107            tracker.set_initial_price(price);
108            log::info!("Pre-populated ATM with forward price: {price}");
109        }
110        let aggregator =
111            OptionChainAggregator::new(series_id, cmd.strike_range.clone(), tracker, instruments);
112
113        // Initial active set for msgbus handlers (subset of all instruments).
114        // When ATM is unknown (ATM-based ranges), this is empty — deferred until bootstrap.
115        let active_instrument_ids = aggregator.instrument_ids();
116        let all_instrument_ids = aggregator.all_instrument_ids();
117        // If active set is already populated (Fixed range or ATM provided), we're bootstrapped
118        let bootstrapped = !active_instrument_ids.is_empty() || all_instrument_ids.is_empty();
119
120        let raw_mode = cmd.snapshot_interval_ms.is_none();
121
122        let manager = Self {
123            aggregator,
124            topic,
125            quote_handlers: Vec::new(),
126            greeks_handlers: Vec::new(),
127            timer_name: None,
128            msgbus_priority,
129            bootstrapped,
130            deferred_cmd_queue,
131            clock: clock.clone(),
132            raw_mode,
133        };
134        let manager_rc = Rc::new(RefCell::new(manager));
135
136        // Register msgbus handlers for initial active set only
137        let (quote_handlers, _quote_handler) = Self::register_quote_handlers(
138            &manager_rc,
139            &active_instrument_ids,
140            series_id,
141            msgbus_priority,
142        );
143        let greeks_handlers = Self::register_greeks_handlers(
144            &manager_rc,
145            &active_instrument_ids,
146            series_id,
147            msgbus_priority,
148        );
149
150        // Forward wire-level subscriptions for the active set.
151        // When ATM is unknown, active set is empty — deferred until bootstrap.
152        Self::forward_client_subscriptions(
153            client,
154            &active_instrument_ids,
155            cmd,
156            series_id.venue,
157            clock,
158        );
159
160        let timer_name = cmd
161            .snapshot_interval_ms
162            .map(|ms| Self::setup_timer(&manager_rc, series_id, ms, clock));
163
164        {
165            let mut mgr = manager_rc.borrow_mut();
166            mgr.quote_handlers = quote_handlers;
167            mgr.greeks_handlers = greeks_handlers;
168            mgr.timer_name = timer_name;
169        }
170
171        let mode_str = match cmd.snapshot_interval_ms {
172            Some(ms) => format!("interval={ms}ms"),
173            None => "mode=raw".to_string(),
174        };
175        log::info!(
176            "Subscribed option chain for {series_id} ({} active/{} total instruments, {mode_str})",
177            active_instrument_ids.len(),
178            all_instrument_ids.len(),
179        );
180
181        manager_rc
182    }
183
184    /// Registers quote handlers on the msgbus for each instrument.
185    ///
186    /// Always stores the handler prototype as the first element so that
187    /// `register_handlers_for_instrument` can clone it during deferred bootstrap.
188    fn register_quote_handlers(
189        manager_rc: &Rc<RefCell<Self>>,
190        instrument_ids: &[InstrumentId],
191        series_id: OptionSeriesId,
192        priority: u8,
193    ) -> (Vec<TypedHandler<QuoteTick>>, TypedHandler<QuoteTick>) {
194        let quote_handler = TypedHandler::new(OptionChainQuoteHandler::new(manager_rc, series_id));
195        // Always store prototype as first element for bootstrap cloning
196        let mut handlers = Vec::with_capacity(instrument_ids.len() + 1);
197        handlers.push(quote_handler.clone());
198        for instrument_id in instrument_ids {
199            let topic = switchboard::get_quotes_topic(*instrument_id);
200            msgbus::subscribe_quotes(topic.into(), quote_handler.clone(), Some(priority));
201            handlers.push(quote_handler.clone());
202        }
203        (handlers, quote_handler)
204    }
205
206    /// Registers greeks handlers on the msgbus for each instrument.
207    ///
208    /// Always stores the handler prototype as the first element so that
209    /// `register_handlers_for_instrument` can clone it during deferred bootstrap.
210    fn register_greeks_handlers(
211        manager_rc: &Rc<RefCell<Self>>,
212        instrument_ids: &[InstrumentId],
213        series_id: OptionSeriesId,
214        priority: u8,
215    ) -> Vec<TypedHandler<OptionGreeks>> {
216        let greeks_handler =
217            TypedHandler::new(OptionChainGreeksHandler::new(manager_rc, series_id));
218        // Always store prototype as first element for bootstrap cloning
219        let mut handlers = Vec::with_capacity(instrument_ids.len() + 1);
220        handlers.push(greeks_handler.clone());
221        for instrument_id in instrument_ids {
222            let topic = switchboard::get_option_greeks_topic(*instrument_id);
223            msgbus::subscribe_option_greeks(topic.into(), greeks_handler.clone(), Some(priority));
224            handlers.push(greeks_handler.clone());
225        }
226        handlers
227    }
228
229    /// Forwards subscribe commands to the data client for all instruments.
230    fn forward_client_subscriptions(
231        client: Option<&mut DataClientAdapter>,
232        instrument_ids: &[InstrumentId],
233        cmd: &SubscribeOptionChain,
234        venue: Venue,
235        clock: &Rc<RefCell<dyn Clock>>,
236    ) {
237        let ts_init = clock.borrow().timestamp_ns();
238
239        let Some(client) = client else {
240            log::error!(
241                "Cannot forward option chain subscriptions: no client found for venue={venue}",
242            );
243            return;
244        };
245
246        for instrument_id in instrument_ids {
247            client.execute_subscribe(&SubscribeCommand::Quotes(SubscribeQuotes {
248                instrument_id: *instrument_id,
249                client_id: cmd.client_id,
250                venue: Some(venue),
251                command_id: UUID4::new(),
252                ts_init,
253                correlation_id: None,
254                params: None,
255            }));
256            client.execute_subscribe(&SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
257                instrument_id: *instrument_id,
258                client_id: cmd.client_id,
259                venue: Some(venue),
260                command_id: UUID4::new(),
261                ts_init,
262                correlation_id: None,
263                params: None,
264            }));
265            client.execute_subscribe(&SubscribeCommand::InstrumentStatus(
266                SubscribeInstrumentStatus {
267                    instrument_id: *instrument_id,
268                    client_id: cmd.client_id,
269                    venue: Some(venue),
270                    command_id: UUID4::new(),
271                    ts_init,
272                    correlation_id: None,
273                    params: None,
274                },
275            ));
276        }
277
278        log::info!(
279            "Forwarded {} quote + greeks + instrument status subscriptions to DataClient",
280            instrument_ids.len(),
281        );
282    }
283
284    /// Sets up the snapshot timer for periodic publishing.
285    fn setup_timer(
286        manager_rc: &Rc<RefCell<Self>>,
287        series_id: OptionSeriesId,
288        interval_ms: u64,
289        clock: &Rc<RefCell<dyn Clock>>,
290    ) -> Ustr {
291        let interval_ns = millis_to_nanos_unchecked(interval_ms as f64);
292        let publisher = OptionChainSlicePublisher::new(manager_rc);
293        let timer_name = Ustr::from(&format!("OptionChain|{series_id}|{interval_ms}"));
294
295        let now_ns = clock.borrow().timestamp_ns().as_u64();
296        let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
297
298        let callback_fn: Rc<dyn Fn(TimeEvent)> = Rc::new(move |event| publisher.publish(&event));
299        let callback = TimeEventCallback::from(callback_fn);
300
301        clock
302            .borrow_mut()
303            .set_timer_ns(
304                &timer_name,
305                interval_ns,
306                Some(start_time_ns.into()),
307                None,
308                Some(callback),
309                None,
310                None,
311            )
312            .expect(FAILED);
313
314        timer_name
315    }
316
317    /// Returns all instrument IDs in the full catalog (not just the active set).
318    #[must_use]
319    pub fn all_instrument_ids(&self) -> Vec<InstrumentId> {
320        self.aggregator.all_instrument_ids()
321    }
322
323    /// Returns the venue for this option chain.
324    #[must_use]
325    pub fn venue(&self) -> Venue {
326        self.aggregator.series_id().venue
327    }
328
329    /// Tears down this manager: unregisters all msgbus handlers and cancels the timer.
330    pub fn teardown(&mut self, clock: &Rc<RefCell<dyn Clock>>) {
331        // Unsubscribe from all currently active instruments
332        let instrument_ids = self.aggregator.instrument_ids();
333
334        // Unregister quote handlers
335        if let Some(handler) = self.quote_handlers.first() {
336            for instrument_id in &instrument_ids {
337                let topic = switchboard::get_quotes_topic(*instrument_id);
338                msgbus::unsubscribe_quotes(topic.into(), handler);
339            }
340        }
341
342        // Unregister greeks handlers
343        if let Some(handler) = self.greeks_handlers.first() {
344            for instrument_id in &instrument_ids {
345                let topic = switchboard::get_option_greeks_topic(*instrument_id);
346                msgbus::unsubscribe_option_greeks(topic.into(), handler);
347            }
348        }
349
350        // Cancel timer
351        if let Some(timer_name) = self.timer_name.take() {
352            let mut clk = clock.borrow_mut();
353            if clk.timer_exists(&timer_name) {
354                clk.cancel_timer(&timer_name);
355            }
356        }
357
358        self.quote_handlers.clear();
359        self.greeks_handlers.clear();
360    }
361
362    /// Routes incoming greeks to the aggregator.
363    ///
364    /// Also updates the ATM tracker from the forward price if `ForwardPrice` source is active,
365    /// and triggers deferred bootstrap on the first arrival.
366    pub fn handle_greeks(&mut self, greeks: &OptionGreeks) {
367        // Update ATM tracker from forward price (ForwardPrice source only)
368        self.aggregator
369            .atm_tracker_mut()
370            .update_from_option_greeks(greeks);
371        // Route greeks to aggregator for storage
372        self.aggregator.update_greeks(greeks);
373        // Check if first ATM arrival triggers deferred bootstrap
374        self.maybe_bootstrap();
375
376        if self.raw_mode
377            && self.bootstrapped
378            && self.aggregator.active_ids().contains(&greeks.instrument_id)
379        {
380            self.publish_slice(greeks.ts_event);
381        }
382    }
383
384    /// Handles an expired/settled instrument by removing it from the aggregator,
385    /// unregistering msgbus handlers, and pushing deferred wire unsubscribes.
386    ///
387    /// Returns `true` if the aggregator catalog is now empty (all instruments expired),
388    /// signaling the engine to tear down this entire manager.
389    pub fn handle_instrument_expired(&mut self, instrument_id: &InstrumentId) -> bool {
390        let was_active = self.aggregator.active_ids().contains(instrument_id);
391
392        if !self.aggregator.remove_instrument(instrument_id) {
393            return self.aggregator.is_catalog_empty();
394        }
395
396        if was_active {
397            // Unregister msgbus handlers for this instrument
398            if let Some(qh) = self.quote_handlers.first() {
399                let topic = switchboard::get_quotes_topic(*instrument_id);
400                msgbus::unsubscribe_quotes(topic.into(), qh);
401            }
402
403            if let Some(gh) = self.greeks_handlers.first() {
404                let topic = switchboard::get_option_greeks_topic(*instrument_id);
405                msgbus::unsubscribe_option_greeks(topic.into(), gh);
406            }
407
408            // Push deferred wire unsubscribes
409            self.push_unsubscribe_commands(*instrument_id);
410        }
411
412        log::info!(
413            "Removed expired instrument {instrument_id} from option chain {} (was_active={was_active}, remaining={})",
414            self.aggregator.series_id(),
415            self.aggregator.instruments().len(),
416        );
417
418        self.aggregator.is_catalog_empty()
419    }
420
421    /// Routes an incoming quote tick to the aggregator, then bootstraps if ready.
422    ///
423    /// This handles both option instrument quotes (aggregator) and ATM source quotes
424    /// (the aggregator's ATM tracker handles filtering internally).
425    pub fn handle_quote(&mut self, quote: &QuoteTick) {
426        self.aggregator.update_quote(quote);
427        self.maybe_bootstrap();
428
429        if self.raw_mode
430            && self.bootstrapped
431            && self.aggregator.active_ids().contains(&quote.instrument_id)
432        {
433            self.publish_slice(quote.ts_event);
434        }
435    }
436
437    /// Bootstraps the active instrument set on the first ATM price arrival.
438    ///
439    /// Computes active strikes, registers msgbus handlers for those instruments,
440    /// and pushes deferred wire subscriptions into the shared command queue.
441    fn maybe_bootstrap(&mut self) {
442        if self.bootstrapped {
443            return;
444        }
445
446        if self.aggregator.atm_tracker().atm_price().is_none() {
447            return;
448        }
449
450        // First ATM received — compute active set and register handlers
451        let active_ids = self.aggregator.recompute_active_set();
452        self.register_handlers_for_instruments_bulk(&active_ids);
453
454        for &id in &active_ids {
455            self.push_subscribe_commands(id);
456        }
457
458        self.bootstrapped = true;
459
460        log::info!(
461            "Bootstrapped option chain for {} ({} active instruments)",
462            self.aggregator.series_id(),
463            active_ids.len(),
464        );
465    }
466
467    /// Registers msgbus handlers for a batch of instruments.
468    fn register_handlers_for_instruments_bulk(&self, instrument_ids: &[InstrumentId]) {
469        for &id in instrument_ids {
470            self.register_handlers_for_instrument(id);
471        }
472    }
473
474    /// Adds a dynamically discovered instrument to this option chain.
475    ///
476    /// Registers msgbus handlers when the instrument falls in the active
477    /// range and forwards wire-level subscriptions via `client`.
478    /// Returns `true` if the instrument was newly inserted.
479    pub fn add_instrument(
480        &mut self,
481        instrument_id: InstrumentId,
482        strike: Price,
483        kind: OptionKind,
484        client: Option<&mut DataClientAdapter>,
485        clock: &Rc<RefCell<dyn Clock>>,
486    ) -> bool {
487        if !self.aggregator.add_instrument(instrument_id, strike, kind) {
488            return false;
489        }
490
491        if self.aggregator.active_ids().contains(&instrument_id) {
492            self.register_handlers_for_instrument(instrument_id);
493        }
494
495        let venue = self.aggregator.series_id().venue;
496        Self::forward_instrument_subscriptions(client, instrument_id, venue, clock);
497
498        log::info!(
499            "Added instrument {instrument_id} to option chain {} (active={})",
500            self.aggregator.series_id(),
501            self.aggregator.active_ids().contains(&instrument_id),
502        );
503
504        true
505    }
506
507    fn register_handlers_for_instrument(&self, instrument_id: InstrumentId) {
508        if let Some(qh) = self.quote_handlers.first().cloned() {
509            let topic = switchboard::get_quotes_topic(instrument_id);
510            msgbus::subscribe_quotes(topic.into(), qh, Some(self.msgbus_priority));
511        }
512
513        if let Some(gh) = self.greeks_handlers.first().cloned() {
514            let topic = switchboard::get_option_greeks_topic(instrument_id);
515            msgbus::subscribe_option_greeks(topic.into(), gh, Some(self.msgbus_priority));
516        }
517    }
518
519    /// Pushes deferred subscribe commands (quotes, greeks, instrument status) for a single instrument.
520    fn push_subscribe_commands(&self, instrument_id: InstrumentId) {
521        let venue = self.aggregator.series_id().venue;
522        let ts_init = self.clock.borrow().timestamp_ns();
523        let mut queue = self.deferred_cmd_queue.borrow_mut();
524        queue.push_back(DeferredCommand::Subscribe(SubscribeCommand::Quotes(
525            SubscribeQuotes {
526                instrument_id,
527                client_id: None,
528                venue: Some(venue),
529                command_id: UUID4::new(),
530                ts_init,
531                correlation_id: None,
532                params: None,
533            },
534        )));
535        queue.push_back(DeferredCommand::Subscribe(SubscribeCommand::OptionGreeks(
536            SubscribeOptionGreeks {
537                instrument_id,
538                client_id: None,
539                venue: Some(venue),
540                command_id: UUID4::new(),
541                ts_init,
542                correlation_id: None,
543                params: None,
544            },
545        )));
546        queue.push_back(DeferredCommand::Subscribe(
547            SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
548                instrument_id,
549                client_id: None,
550                venue: Some(venue),
551                command_id: UUID4::new(),
552                ts_init,
553                correlation_id: None,
554                params: None,
555            }),
556        ));
557    }
558
559    /// Pushes deferred unsubscribe commands (quotes, greeks, instrument status) for a single instrument.
560    fn push_unsubscribe_commands(&self, instrument_id: InstrumentId) {
561        let venue = self.aggregator.series_id().venue;
562        let ts_init = self.clock.borrow().timestamp_ns();
563        let mut queue = self.deferred_cmd_queue.borrow_mut();
564        queue.push_back(DeferredCommand::Unsubscribe(UnsubscribeCommand::Quotes(
565            UnsubscribeQuotes {
566                instrument_id,
567                client_id: None,
568                venue: Some(venue),
569                command_id: UUID4::new(),
570                ts_init,
571                correlation_id: None,
572                params: None,
573            },
574        )));
575        queue.push_back(DeferredCommand::Unsubscribe(
576            UnsubscribeCommand::OptionGreeks(UnsubscribeOptionGreeks {
577                instrument_id,
578                client_id: None,
579                venue: Some(venue),
580                command_id: UUID4::new(),
581                ts_init,
582                correlation_id: None,
583                params: None,
584            }),
585        ));
586        queue.push_back(DeferredCommand::Unsubscribe(
587            UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
588                instrument_id,
589                client_id: None,
590                venue: Some(venue),
591                command_id: UUID4::new(),
592                ts_init,
593                correlation_id: None,
594                params: None,
595            }),
596        ));
597    }
598
599    /// Forwards quote, greeks, and instrument status subscriptions for a single instrument.
600    fn forward_instrument_subscriptions(
601        client: Option<&mut DataClientAdapter>,
602        instrument_id: InstrumentId,
603        venue: Venue,
604        clock: &Rc<RefCell<dyn Clock>>,
605    ) {
606        let Some(client) = client else {
607            log::error!(
608                "Cannot forward subscriptions for {instrument_id}: no client for venue={venue}",
609            );
610            return;
611        };
612
613        let ts_init = clock.borrow().timestamp_ns();
614
615        client.execute_subscribe(&SubscribeCommand::Quotes(SubscribeQuotes {
616            instrument_id,
617            client_id: None,
618            venue: Some(venue),
619            command_id: UUID4::new(),
620            ts_init,
621            correlation_id: None,
622            params: None,
623        }));
624        client.execute_subscribe(&SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
625            instrument_id,
626            client_id: None,
627            venue: Some(venue),
628            command_id: UUID4::new(),
629            ts_init,
630            correlation_id: None,
631            params: None,
632        }));
633        client.execute_subscribe(&SubscribeCommand::InstrumentStatus(
634            SubscribeInstrumentStatus {
635                instrument_id,
636                client_id: None,
637                venue: Some(venue),
638                command_id: UUID4::new(),
639                ts_init,
640                correlation_id: None,
641                params: None,
642            },
643        ));
644    }
645
646    /// Checks if ATM has shifted and rebalances msgbus subscriptions if needed.
647    fn maybe_rebalance(&mut self, now_ns: nautilus_core::UnixNanos) {
648        let Some(action) = self.aggregator.check_rebalance(now_ns) else {
649            return;
650        };
651
652        // Unsubscribe removed instruments from msgbus
653        if let Some(qh) = self.quote_handlers.first() {
654            for id in &action.remove {
655                msgbus::unsubscribe_quotes(switchboard::get_quotes_topic(*id).into(), qh);
656            }
657        }
658
659        if let Some(gh) = self.greeks_handlers.first() {
660            for id in &action.remove {
661                msgbus::unsubscribe_option_greeks(
662                    switchboard::get_option_greeks_topic(*id).into(),
663                    gh,
664                );
665            }
666        }
667
668        // Subscribe new instruments on msgbus
669        if let Some(qh) = self.quote_handlers.first().cloned() {
670            for id in &action.add {
671                msgbus::subscribe_quotes(
672                    switchboard::get_quotes_topic(*id).into(),
673                    qh.clone(),
674                    Some(self.msgbus_priority),
675                );
676            }
677        }
678
679        if let Some(gh) = self.greeks_handlers.first().cloned() {
680            for id in &action.add {
681                msgbus::subscribe_option_greeks(
682                    switchboard::get_option_greeks_topic(*id).into(),
683                    gh.clone(),
684                    Some(self.msgbus_priority),
685                );
686            }
687        }
688
689        // Push deferred wire-level changes into the shared command queue
690        for &id in &action.add {
691            self.push_subscribe_commands(id);
692        }
693        for &id in &action.remove {
694            self.push_unsubscribe_commands(id);
695        }
696
697        if !action.add.is_empty() || !action.remove.is_empty() {
698            log::info!(
699                "Rebalanced option chain for {}: +{} -{} instruments",
700                self.aggregator.series_id(),
701                action.add.len(),
702                action.remove.len(),
703            );
704        }
705
706        // Apply state changes to aggregator
707        self.aggregator.apply_rebalance(&action, now_ns);
708    }
709
710    /// Takes the accumulated snapshot and publishes it to the msgbus.
711    pub fn publish_slice(&mut self, ts: nautilus_core::UnixNanos) {
712        // Proactive expiry safeguard
713        if self.aggregator.is_expired(ts) {
714            self.deferred_cmd_queue
715                .borrow_mut()
716                .push_back(DeferredCommand::ExpireSeries(self.aggregator.series_id()));
717            return;
718        }
719
720        self.maybe_rebalance(ts);
721
722        let series_id = self.aggregator.series_id();
723        let slice = self.aggregator.snapshot(ts);
724
725        if slice.is_empty() {
726            log::debug!("OptionChainSlice empty for {series_id}, skipping publish");
727            return;
728        }
729
730        log::debug!(
731            "Publishing OptionChainSlice for {} (calls={}, puts={})",
732            series_id,
733            slice.call_count(),
734            slice.put_count(),
735        );
736        msgbus::publish_option_chain(self.topic, &slice);
737    }
738
739    /// Resolves instruments from cache that match the given option series.
740    fn resolve_instruments(
741        cache: &Rc<RefCell<Cache>>,
742        series_id: &OptionSeriesId,
743    ) -> HashMap<InstrumentId, (Price, OptionKind)> {
744        let cache = cache.borrow();
745        let mut map = HashMap::new();
746
747        for instrument in cache.instruments(&series_id.venue, Some(&series_id.underlying)) {
748            let Some(expiration) = instrument.expiration_ns() else {
749                continue;
750            };
751
752            if expiration != series_id.expiration_ns {
753                continue;
754            }
755
756            if instrument.settlement_currency().code != series_id.settlement_currency {
757                continue;
758            }
759
760            let Some(strike) = instrument.strike_price() else {
761                continue;
762            };
763
764            let Some(kind) = instrument.option_kind() else {
765                continue;
766            };
767
768            map.insert(instrument.id(), (strike, kind));
769        }
770
771        map
772    }
773}
774
775#[cfg(test)]
776mod tests {
777    use std::collections::VecDeque;
778
779    use nautilus_common::clock::TestClock;
780    use nautilus_core::UnixNanos;
781    use nautilus_model::{data::option_chain::StrikeRange, identifiers::Venue, types::Quantity};
782    use rstest::*;
783
784    use super::*;
785
786    fn make_series_id() -> OptionSeriesId {
787        OptionSeriesId::new(
788            Venue::new("DERIBIT"),
789            ustr::Ustr::from("BTC"),
790            ustr::Ustr::from("BTC"),
791            UnixNanos::from(1_700_000_000_000_000_000u64),
792        )
793    }
794
795    fn make_test_queue() -> DeferredCommandQueue {
796        Rc::new(RefCell::new(VecDeque::new()))
797    }
798
799    fn make_manager() -> (OptionChainManager, DeferredCommandQueue) {
800        let series_id = make_series_id();
801        let topic = switchboard::get_option_chain_topic(series_id);
802        let tracker = AtmTracker::new();
803        let aggregator = OptionChainAggregator::new(
804            series_id,
805            StrikeRange::Fixed(vec![]),
806            tracker,
807            HashMap::new(),
808        );
809        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
810        let queue = make_test_queue();
811
812        let manager = OptionChainManager {
813            aggregator,
814            topic,
815            quote_handlers: Vec::new(),
816            greeks_handlers: Vec::new(),
817            timer_name: None,
818            msgbus_priority: 0,
819            bootstrapped: true,
820            deferred_cmd_queue: queue.clone(),
821            clock,
822            raw_mode: false,
823        };
824        (manager, queue)
825    }
826
827    #[rstest]
828    fn test_manager_handle_quote_no_instrument() {
829        let (mut manager, _queue) = make_manager();
830
831        // Should not panic — quote for unknown instrument
832        let quote = QuoteTick::new(
833            InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
834            Price::from("100.00"),
835            Price::from("101.00"),
836            Quantity::from("1.0"),
837            Quantity::from("1.0"),
838            UnixNanos::from(1u64),
839            UnixNanos::from(1u64),
840        );
841        manager.handle_quote(&quote);
842    }
843
844    #[rstest]
845    fn test_manager_publish_slice_empty() {
846        let (mut manager, _queue) = make_manager();
847        // Should not panic — empty slice skips publish
848        manager.publish_slice(UnixNanos::from(100u64));
849    }
850
851    #[rstest]
852    fn test_manager_teardown_no_handlers() {
853        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
854        let (mut manager, _queue) = make_manager();
855        // Should not panic — no handlers to unregister
856        manager.teardown(&clock);
857        assert!(manager.quote_handlers.is_empty());
858    }
859
860    fn make_option_chain_manager() -> (OptionChainManager, DeferredCommandQueue) {
861        let series_id = make_series_id();
862        let topic = switchboard::get_option_chain_topic(series_id);
863
864        let strikes = [45000, 47500, 50000, 52500, 55000];
865        let mut instruments = HashMap::new();
866        for s in &strikes {
867            let strike = Price::from(&s.to_string());
868            let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
869            let put_id = InstrumentId::from(&format!("BTC-20240101-{s}-P.DERIBIT"));
870            instruments.insert(call_id, (strike, OptionKind::Call));
871            instruments.insert(put_id, (strike, OptionKind::Put));
872        }
873
874        let tracker = AtmTracker::new();
875        let aggregator = OptionChainAggregator::new(
876            series_id,
877            StrikeRange::AtmRelative {
878                strikes_above: 1,
879                strikes_below: 1,
880            },
881            tracker,
882            instruments,
883        );
884        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
885        let queue = make_test_queue();
886
887        let manager = OptionChainManager {
888            aggregator,
889            topic,
890            quote_handlers: Vec::new(),
891            greeks_handlers: Vec::new(),
892            timer_name: None,
893            msgbus_priority: 0,
894            bootstrapped: false,
895            deferred_cmd_queue: queue.clone(),
896            clock,
897            raw_mode: false,
898        };
899        (manager, queue)
900    }
901
902    fn bootstrap_via_greeks(manager: &mut OptionChainManager) {
903        use nautilus_model::data::option_chain::OptionGreeks;
904        let greeks = OptionGreeks {
905            instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
906            underlying_price: Some(50000.0),
907            ..Default::default()
908        };
909        manager.handle_greeks(&greeks);
910    }
911
912    #[rstest]
913    fn test_manager_publish_slice_triggers_rebalance() {
914        let (mut manager, queue) = make_option_chain_manager();
915        // Initially no instruments active (ATM unknown, deferred)
916        assert_eq!(manager.aggregator.instrument_ids().len(), 0);
917
918        // Feed ATM near 50000 via greeks — bootstrap computes active set (3 strikes × 2 = 6)
919        bootstrap_via_greeks(&mut manager);
920        assert!(manager.bootstrapped);
921        assert_eq!(manager.aggregator.instrument_ids().len(), 6); // 3 strikes × 2
922
923        // Deferred queue should contain subscribe commands (6 instruments × 3 = 18 commands)
924        assert_eq!(queue.borrow().len(), 18);
925
926        // publish_slice should still work normally after bootstrap
927        manager.publish_slice(UnixNanos::from(100u64));
928        assert!(manager.aggregator.last_atm_strike().is_some());
929    }
930
931    #[rstest]
932    fn test_manager_add_instrument_new() {
933        let (mut manager, _queue) = make_option_chain_manager();
934        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
935        let new_id = InstrumentId::from("BTC-20240101-57500-C.DERIBIT");
936        let strike = Price::from("57500");
937        let count_before = manager.aggregator.instruments().len();
938
939        let result = manager.add_instrument(new_id, strike, OptionKind::Call, None, &clock);
940
941        assert!(result);
942        assert_eq!(manager.aggregator.instruments().len(), count_before + 1);
943    }
944
945    #[rstest]
946    fn test_manager_add_instrument_already_known() {
947        let (mut manager, _queue) = make_option_chain_manager();
948        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
949        let existing_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
950        let strike = Price::from("50000");
951        let count_before = manager.aggregator.instruments().len();
952
953        let result = manager.add_instrument(existing_id, strike, OptionKind::Call, None, &clock);
954
955        assert!(!result);
956        assert_eq!(manager.aggregator.instruments().len(), count_before);
957    }
958
959    #[rstest]
960    fn test_manager_deferred_bootstrap_on_first_atm() {
961        let (mut manager, queue) = make_option_chain_manager();
962        // Initially not bootstrapped, no active instruments
963        assert!(!manager.bootstrapped);
964        assert_eq!(manager.aggregator.instrument_ids().len(), 0);
965        assert!(queue.borrow().is_empty());
966
967        // Feed ATM via greeks → triggers bootstrap
968        bootstrap_via_greeks(&mut manager);
969
970        assert!(manager.bootstrapped);
971        assert_eq!(manager.aggregator.instrument_ids().len(), 6); // 3 strikes × 2
972        // 6 instruments × 3 commands each (quotes + greeks + instrument_status) = 18 deferred commands
973        assert_eq!(queue.borrow().len(), 18);
974
975        // All commands should be Subscribe variants
976        assert!(
977            queue
978                .borrow()
979                .iter()
980                .all(|cmd| matches!(cmd, DeferredCommand::Subscribe(_)))
981        );
982    }
983
984    #[rstest]
985    fn test_manager_bootstrap_idempotent() {
986        use nautilus_model::data::option_chain::OptionGreeks;
987
988        let (mut manager, _queue) = make_option_chain_manager();
989        bootstrap_via_greeks(&mut manager);
990        assert!(manager.bootstrapped);
991        let count = manager.aggregator.instrument_ids().len();
992
993        // Feed another ATM update — bootstrap should not fire again
994        let greeks2 = OptionGreeks {
995            instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
996            underlying_price: Some(50200.0),
997            ..Default::default()
998        };
999        manager.handle_greeks(&greeks2);
1000        assert_eq!(manager.aggregator.instrument_ids().len(), count);
1001    }
1002
1003    #[rstest]
1004    fn test_manager_fixed_range_bootstrapped_immediately() {
1005        // Fixed range manager is bootstrapped at creation (no ATM needed)
1006        let (manager, queue) = make_manager();
1007        assert!(manager.bootstrapped);
1008        assert!(queue.borrow().is_empty());
1009    }
1010
1011    #[rstest]
1012    fn test_manager_forward_price_bootstrap_from_greeks() {
1013        use nautilus_model::data::option_chain::OptionGreeks;
1014
1015        let (mut manager, _queue) = make_option_chain_manager();
1016        assert!(!manager.bootstrapped);
1017
1018        // First greeks with underlying_price → updates ATM tracker and triggers bootstrap
1019        let greeks = OptionGreeks {
1020            instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
1021            underlying_price: Some(50000.0),
1022            ..Default::default()
1023        };
1024        manager.handle_greeks(&greeks);
1025        assert!(manager.bootstrapped);
1026        // 3 strikes × 2 sides = 6 active instruments
1027        assert_eq!(manager.aggregator.instrument_ids().len(), 6);
1028    }
1029
1030    #[rstest]
1031    fn test_manager_forward_price_no_bootstrap_without_underlying() {
1032        use nautilus_model::data::option_chain::OptionGreeks;
1033
1034        let (mut manager, _queue) = make_option_chain_manager();
1035        assert!(!manager.bootstrapped);
1036
1037        // Greeks with no underlying_price → should not bootstrap
1038        let greeks = OptionGreeks {
1039            instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
1040            underlying_price: None,
1041            ..Default::default()
1042        };
1043        manager.handle_greeks(&greeks);
1044        assert!(!manager.bootstrapped);
1045    }
1046
1047    #[rstest]
1048    fn test_handle_instrument_expired_removes_from_aggregator() {
1049        let (mut manager, queue) = make_option_chain_manager();
1050        // Bootstrap so instruments are active
1051        bootstrap_via_greeks(&mut manager);
1052        assert!(manager.bootstrapped);
1053        let initial_count = manager.aggregator.instruments().len();
1054        queue.borrow_mut().clear(); // clear bootstrap commands
1055
1056        let expired_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1057        let is_empty = manager.handle_instrument_expired(&expired_id);
1058
1059        assert!(!is_empty);
1060        assert_eq!(manager.aggregator.instruments().len(), initial_count - 1);
1061        assert!(!manager.aggregator.active_ids().contains(&expired_id));
1062    }
1063
1064    #[rstest]
1065    fn test_handle_instrument_expired_pushes_deferred_unsubscribes() {
1066        let (mut manager, queue) = make_option_chain_manager();
1067        bootstrap_via_greeks(&mut manager);
1068        queue.borrow_mut().clear();
1069
1070        let expired_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1071        manager.handle_instrument_expired(&expired_id);
1072
1073        // Should push 3 unsubscribe commands (quotes + greeks + instrument_status)
1074        let cmds: Vec<_> = queue.borrow().iter().cloned().collect();
1075        assert_eq!(cmds.len(), 3);
1076        assert!(
1077            cmds.iter()
1078                .all(|c| matches!(c, DeferredCommand::Unsubscribe(_)))
1079        );
1080    }
1081
1082    #[rstest]
1083    fn test_handle_instrument_expired_returns_true_when_last() {
1084        let series_id = make_series_id();
1085        let topic = switchboard::get_option_chain_topic(series_id);
1086        let call_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1087        let strike = Price::from("50000");
1088        let mut instruments = HashMap::new();
1089        instruments.insert(call_id, (strike, OptionKind::Call));
1090        let tracker = AtmTracker::new();
1091        let aggregator = OptionChainAggregator::new(
1092            series_id,
1093            StrikeRange::Fixed(vec![strike]),
1094            tracker,
1095            instruments,
1096        );
1097        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1098        let queue = make_test_queue();
1099
1100        let mut manager = OptionChainManager {
1101            aggregator,
1102            topic,
1103            quote_handlers: Vec::new(),
1104            greeks_handlers: Vec::new(),
1105            timer_name: None,
1106            msgbus_priority: 0,
1107            bootstrapped: true,
1108            deferred_cmd_queue: queue,
1109            clock,
1110            raw_mode: false,
1111        };
1112
1113        let is_empty = manager.handle_instrument_expired(&call_id);
1114        assert!(is_empty);
1115        assert!(manager.aggregator.is_catalog_empty());
1116    }
1117
1118    #[rstest]
1119    fn test_handle_instrument_expired_unknown_noop() {
1120        let (mut manager, queue) = make_manager();
1121        queue.borrow_mut().clear();
1122
1123        let unknown = InstrumentId::from("ETH-20240101-3000-C.DERIBIT");
1124        let is_empty = manager.handle_instrument_expired(&unknown);
1125
1126        // Empty manager returns true (catalog was already empty)
1127        assert!(is_empty);
1128        assert!(queue.borrow().is_empty()); // no deferred commands pushed
1129    }
1130
1131    #[rstest]
1132    fn test_publish_slice_pushes_expire_series_when_expired() {
1133        let (mut manager, queue) = make_option_chain_manager();
1134        bootstrap_via_greeks(&mut manager);
1135        queue.borrow_mut().clear();
1136
1137        // Publish at the expiration timestamp — should push ExpireSeries, not publish
1138        let expiry_ns = manager.aggregator.series_id().expiration_ns;
1139        manager.publish_slice(expiry_ns);
1140
1141        let cmds: Vec<_> = queue.borrow().iter().cloned().collect();
1142        assert_eq!(cmds.len(), 1);
1143        assert!(matches!(cmds[0], DeferredCommand::ExpireSeries(_)));
1144    }
1145
1146    #[rstest]
1147    fn test_expired_instrument_unsubscribes_include_instrument_status() {
1148        let (mut manager, queue) = make_option_chain_manager();
1149        bootstrap_via_greeks(&mut manager);
1150        queue.borrow_mut().clear();
1151
1152        let expired_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1153        manager.handle_instrument_expired(&expired_id);
1154
1155        let cmds: Vec<_> = queue.borrow().iter().cloned().collect();
1156        // Should have exactly one InstrumentStatus unsubscribe among the 3
1157        let status_unsubs = cmds
1158            .iter()
1159            .filter(|c| {
1160                matches!(
1161                    c,
1162                    DeferredCommand::Unsubscribe(UnsubscribeCommand::InstrumentStatus(_))
1163                )
1164            })
1165            .count();
1166        assert_eq!(status_unsubs, 1);
1167    }
1168}