Skip to main content

nautilus_data/option_chains/
aggregator.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Per-series option chain aggregator for event accumulation and snapshots.
17
18use std::collections::{BTreeMap, HashMap, HashSet};
19
20use nautilus_core::UnixNanos;
21use nautilus_model::{
22    data::{
23        QuoteTick,
24        option_chain::{OptionChainSlice, OptionGreeks, OptionStrikeData, StrikeRange},
25    },
26    enums::OptionKind,
27    identifiers::{InstrumentId, OptionSeriesId},
28    types::Price,
29};
30
31use super::{
32    AtmTracker,
33    constants::{DEFAULT_REBALANCE_COOLDOWN_NS, DEFAULT_REBALANCE_HYSTERESIS},
34};
35
36/// Per-series aggregator that accumulates quotes and greeks between snapshots.
37///
38/// Owns mutable accumulator buffers and produces immutable `OptionChainSlice`
39/// snapshots on each timer tick.
40#[derive(Debug)]
41pub struct OptionChainAggregator {
42    /// The option series identifier for this aggregator.
43    series_id: OptionSeriesId,
44    /// Defines which strikes to include in the active set.
45    strike_range: StrikeRange,
46    /// Tracks the current ATM price from market data events.
47    atm_tracker: AtmTracker,
48    /// All instruments for this series. Grows dynamically when the exchange
49    /// lists new strikes via [`Self::add_instrument`].
50    instruments: HashMap<InstrumentId, (Price, OptionKind)>,
51    /// Currently active instrument IDs (subset of `instruments`).
52    active_ids: HashSet<InstrumentId>,
53    /// The closest ATM strike at the time of the last rebalance.
54    last_atm_strike: Option<Price>,
55    /// Hysteresis band for ATM rebalancing.
56    hysteresis: f64,
57    /// Minimum nanoseconds between rebalances.
58    cooldown_ns: u64,
59    /// Timestamp of the last rebalance.
60    last_rebalance_ns: Option<UnixNanos>,
61    /// Maximum `ts_event` seen across all quote updates.
62    max_ts_event: UnixNanos,
63    /// Greeks received before the corresponding quote arrived.
64    pending_greeks: HashMap<InstrumentId, OptionGreeks>,
65    /// Call option accumulator buffer keyed by strike price.
66    call_buffer: BTreeMap<Price, OptionStrikeData>,
67    /// Put option accumulator buffer keyed by strike price.
68    put_buffer: BTreeMap<Price, OptionStrikeData>,
69}
70
71impl OptionChainAggregator {
72    /// Creates a new aggregator for the given series.
73    ///
74    /// `instruments` contains ALL instruments for the series. The initial
75    /// `active_ids` subset is resolved from the strike range and the current
76    /// ATM price (if available). When no ATM price is set for ATM-based
77    /// ranges, all instruments are active.
78    pub fn new(
79        series_id: OptionSeriesId,
80        strike_range: StrikeRange,
81        atm_tracker: AtmTracker,
82        instruments: HashMap<InstrumentId, (Price, OptionKind)>,
83    ) -> Self {
84        let mut aggregator = Self {
85            series_id,
86            strike_range,
87            atm_tracker,
88            instruments,
89            active_ids: HashSet::new(),
90            last_atm_strike: None,
91            hysteresis: DEFAULT_REBALANCE_HYSTERESIS,
92            cooldown_ns: DEFAULT_REBALANCE_COOLDOWN_NS,
93            last_rebalance_ns: None,
94            max_ts_event: UnixNanos::default(),
95            pending_greeks: HashMap::new(),
96            call_buffer: BTreeMap::new(),
97            put_buffer: BTreeMap::new(),
98        };
99        // No Greeks exist at construction, so a `Delta` range resolves to its ATM fallback.
100        aggregator.recompute_active_set();
101        aggregator
102    }
103
104    /// Returns a mutable reference to the ATM tracker.
105    pub fn atm_tracker_mut(&mut self) -> &mut AtmTracker {
106        &mut self.atm_tracker
107    }
108
109    /// Returns the currently active instrument IDs.
110    #[must_use]
111    pub fn instrument_ids(&self) -> Vec<InstrumentId> {
112        self.active_ids.iter().copied().collect()
113    }
114
115    /// Returns a reference to the active instrument ID set.
116    #[must_use]
117    pub fn active_ids(&self) -> &HashSet<InstrumentId> {
118        &self.active_ids
119    }
120
121    /// Returns the series ID.
122    #[must_use]
123    pub fn series_id(&self) -> OptionSeriesId {
124        self.series_id
125    }
126
127    /// Returns `true` if the given timestamp is at or past the series expiration.
128    #[must_use]
129    pub fn is_expired(&self, now_ns: UnixNanos) -> bool {
130        now_ns >= self.series_id.expiration_ns
131    }
132
133    /// Returns a reference to the full instrument set.
134    #[must_use]
135    pub fn instruments(&self) -> &HashMap<InstrumentId, (Price, OptionKind)> {
136        &self.instruments
137    }
138
139    /// Returns all instrument IDs in the full set.
140    #[must_use]
141    pub fn all_instrument_ids(&self) -> Vec<InstrumentId> {
142        self.instruments.keys().copied().collect()
143    }
144
145    /// Returns `true` if the instrument catalog is empty.
146    #[must_use]
147    pub fn is_catalog_empty(&self) -> bool {
148        self.instruments.is_empty()
149    }
150
151    /// Permanently removes an instrument from the catalog.
152    ///
153    /// Removes from `instruments`, `active_ids`, `pending_greeks`, and cleans
154    /// buffer entries (only if no other instrument shares the same strike+kind).
155    /// Returns `true` if the instrument was found and removed.
156    #[must_use]
157    pub fn remove_instrument(&mut self, instrument_id: &InstrumentId) -> bool {
158        let Some((strike, kind)) = self.instruments.remove(instrument_id) else {
159            return false;
160        };
161
162        self.active_ids.remove(instrument_id);
163        self.pending_greeks.remove(instrument_id);
164
165        // Only remove buffer entry if no sibling instrument shares the same strike+kind
166        let has_sibling = self
167            .instruments
168            .values()
169            .any(|(s, k)| *s == strike && *k == kind);
170
171        if !has_sibling {
172            let buffer = match kind {
173                OptionKind::Call => &mut self.call_buffer,
174                OptionKind::Put => &mut self.put_buffer,
175            };
176            buffer.remove(&strike);
177        }
178
179        true
180    }
181
182    /// Returns a reference to the ATM tracker.
183    #[must_use]
184    pub fn atm_tracker(&self) -> &AtmTracker {
185        &self.atm_tracker
186    }
187
188    /// Recomputes the active instrument set from the current ATM price.
189    ///
190    /// Returns the new active instrument IDs. Used during bootstrap when the
191    /// first ATM price arrives after deferred subscription setup.
192    pub fn recompute_active_set(&mut self) -> Vec<InstrumentId> {
193        let atm_price = self.atm_tracker.atm_price();
194        let all_strikes = Self::sorted_strikes(&self.instruments);
195        let active_strikes: HashSet<Price> = self
196            .resolve_active_strikes(atm_price, &all_strikes)
197            .into_iter()
198            .collect();
199        self.active_ids = self
200            .instruments
201            .iter()
202            .filter(|(_, (strike, _))| active_strikes.contains(strike))
203            .map(|(id, _)| *id)
204            .collect();
205        self.last_atm_strike =
206            atm_price.and_then(|atm| Self::find_closest_strike(&all_strikes, atm));
207        self.active_ids.iter().copied().collect()
208    }
209
210    /// Resolves the active strikes for the current strike range.
211    ///
212    /// `Delta` is resolved here from stored Greeks (see [`Self::resolve_delta`]);
213    /// the price-based variants delegate to [`StrikeRange::resolve`].
214    fn resolve_active_strikes(
215        &self,
216        atm_price: Option<Price>,
217        all_strikes: &[Price],
218    ) -> Vec<Price> {
219        match &self.strike_range {
220            StrikeRange::Delta { target, tolerance } => {
221                self.resolve_delta(*target, *tolerance, atm_price, all_strikes)
222            }
223            _ => self.strike_range.resolve(atm_price, all_strikes),
224        }
225    }
226
227    /// Resolves strikes whose buffered or pending Greeks have an absolute delta
228    /// within `tolerance` of `target`.
229    ///
230    /// A strike qualifies when either its call or put delta magnitude matches
231    /// (calls have positive delta, puts negative; both are compared by absolute
232    /// value), so a typical target selects an OTM strike on each side of ATM.
233    /// Strikes with only pending Greeks (received before their first quote) are
234    /// eligible. When no Greeks are available yet, or none fall in the band, this
235    /// falls back to the ATM-relative window from [`StrikeRange::resolve`].
236    fn resolve_delta(
237        &self,
238        target: f64,
239        tolerance: f64,
240        atm_price: Option<Price>,
241        all_strikes: &[Price],
242    ) -> Vec<Price> {
243        let selected: Vec<Price> = self
244            .deltas_by_strike()
245            .into_iter()
246            .filter(|(_, deltas)| {
247                deltas
248                    .iter()
249                    .any(|delta| Self::delta_within_band(*delta, target, tolerance))
250            })
251            .map(|(strike, _)| strike)
252            .collect();
253
254        if selected.is_empty() {
255            return self.strike_range.resolve(atm_price, all_strikes);
256        }
257
258        selected
259    }
260
261    /// Collects every reported delta per strike, from buffered Greeks and from
262    /// Greeks still pending their first quote.
263    fn deltas_by_strike(&self) -> BTreeMap<Price, Vec<f64>> {
264        let mut deltas_by_strike: BTreeMap<Price, Vec<f64>> = BTreeMap::new();
265
266        for (strike, data) in self.call_buffer.iter().chain(self.put_buffer.iter()) {
267            if let Some(greeks) = data.greeks.as_ref() {
268                deltas_by_strike
269                    .entry(*strike)
270                    .or_default()
271                    .push(greeks.delta);
272            }
273        }
274
275        for (id, greeks) in &self.pending_greeks {
276            if let Some((strike, _)) = self.instruments.get(id) {
277                deltas_by_strike
278                    .entry(*strike)
279                    .or_default()
280                    .push(greeks.delta);
281            }
282        }
283
284        deltas_by_strike
285    }
286
287    /// Returns `true` when `delta`'s magnitude is within `tolerance` of `target`.
288    ///
289    /// Compares by absolute value so a put (negative delta) matches the same
290    /// target as the equivalent call.
291    fn delta_within_band(delta: f64, target: f64, tolerance: f64) -> bool {
292        (delta.abs() - target).abs() <= tolerance
293    }
294
295    /// Adds a newly discovered instrument to the series.
296    ///
297    /// Returns `true` if the instrument was newly inserted. Returns `false`
298    /// if it was already known (no-op). When the new instrument's strike
299    /// falls within the current active range, it is immediately added to
300    /// `active_ids`.
301    #[must_use]
302    pub fn add_instrument(
303        &mut self,
304        instrument_id: InstrumentId,
305        strike: Price,
306        kind: OptionKind,
307    ) -> bool {
308        if self.instruments.contains_key(&instrument_id) {
309            return false;
310        }
311
312        self.instruments.insert(instrument_id, (strike, kind));
313
314        // Determine if the new strike is in the current active range
315        let all_strikes = Self::sorted_strikes(&self.instruments);
316        let atm_price = self.atm_tracker.atm_price();
317        let active_strikes: HashSet<Price> = self
318            .resolve_active_strikes(atm_price, &all_strikes)
319            .into_iter()
320            .collect();
321
322        if active_strikes.contains(&strike) {
323            self.active_ids.insert(instrument_id);
324        }
325
326        true
327    }
328
329    /// Returns sorted, deduplicated strikes from the given instruments.
330    fn sorted_strikes(instruments: &HashMap<InstrumentId, (Price, OptionKind)>) -> Vec<Price> {
331        let mut strikes: Vec<Price> = instruments.values().map(|(s, _)| *s).collect();
332        strikes.sort();
333        strikes.dedup();
334        strikes
335    }
336
337    /// Finds the strike in `all_strikes` closest to `atm`.
338    fn find_closest_strike(all_strikes: &[Price], atm: Price) -> Option<Price> {
339        all_strikes
340            .iter()
341            .min_by(|a, b| {
342                let da = (a.as_f64() - atm.as_f64()).abs();
343                let db = (b.as_f64() - atm.as_f64()).abs();
344                da.partial_cmp(&db).unwrap_or(std::cmp::Ordering::Equal)
345            })
346            .copied()
347    }
348
349    /// Handles an incoming quote tick by updating the accumulator buffers.
350    pub fn update_quote(&mut self, quote: &QuoteTick) {
351        if self.is_expired(quote.ts_event) {
352            log::warn!(
353                "Dropping quote for {}, series {} expired at {}",
354                quote.instrument_id,
355                self.series_id,
356                self.series_id.expiration_ns,
357            );
358            return;
359        }
360
361        if !self.active_ids.contains(&quote.instrument_id) {
362            return;
363        }
364
365        if let Some(&(strike, kind)) = self.instruments.get(&quote.instrument_id) {
366            // Track max ts_event across all quotes
367            if quote.ts_event > self.max_ts_event {
368                self.max_ts_event = quote.ts_event;
369            }
370
371            let buffer = match kind {
372                OptionKind::Call => &mut self.call_buffer,
373                OptionKind::Put => &mut self.put_buffer,
374            };
375
376            match buffer.get_mut(&strike) {
377                Some(data) => data.quote = *quote,
378                None => {
379                    // Check for pending greeks that arrived before this first quote
380                    let greeks = self.pending_greeks.remove(&quote.instrument_id);
381                    buffer.insert(
382                        strike,
383                        OptionStrikeData {
384                            quote: *quote,
385                            greeks,
386                        },
387                    );
388                }
389            }
390        }
391    }
392
393    /// Handles incoming greeks by updating the accumulator buffers.
394    ///
395    /// If no quote has arrived yet for this instrument (no buffer entry),
396    /// the greeks are stored in `pending_greeks` and will be attached when
397    /// the first quote arrives.
398    pub fn update_greeks(&mut self, greeks: &OptionGreeks) {
399        if self.is_expired(greeks.ts_event) {
400            log::warn!(
401                "Dropping greeks for {}, series {} expired at {}",
402                greeks.instrument_id,
403                self.series_id,
404                self.series_id.expiration_ns,
405            );
406            return;
407        }
408
409        if !self.active_ids.contains(&greeks.instrument_id) {
410            return;
411        }
412
413        if let Some(&(strike, kind)) = self.instruments.get(&greeks.instrument_id) {
414            let buffer = match kind {
415                OptionKind::Call => &mut self.call_buffer,
416                OptionKind::Put => &mut self.put_buffer,
417            };
418
419            match buffer.get_mut(&strike) {
420                Some(data) => data.greeks = Some(*greeks),
421                None => {
422                    // No quote yet: park the greeks for later
423                    self.pending_greeks.insert(greeks.instrument_id, *greeks);
424                }
425            }
426        }
427    }
428
429    /// Creates a point-in-time snapshot from accumulated buffers, applying strike filtering.
430    ///
431    /// Buffers are preserved (keep-latest semantics) so instruments that didn't
432    /// quote since the last tick are still included in subsequent snapshots.
433    ///
434    /// # Panics
435    ///
436    /// Panics if strike prices cannot be compared (NaN values).
437    pub fn snapshot(&self, ts_init: UnixNanos) -> OptionChainSlice {
438        let atm_price = self.atm_tracker.atm_price();
439
440        // Use catalog strikes for ATM strike (most accurate closest-strike lookup)
441        let catalog_strikes = Self::sorted_strikes(&self.instruments);
442        let atm_strike = atm_price.and_then(|atm| Self::find_closest_strike(&catalog_strikes, atm));
443
444        // Filter buffers using active set strikes directly. The active set is already
445        // the result of strike range resolution from the last rebalance. Re-resolving
446        // here would shift the window during hysteresis/cooldown, dropping buffered data.
447        let active_strikes: HashSet<Price> = self
448            .active_ids
449            .iter()
450            .filter_map(|id| self.instruments.get(id).map(|(s, _)| *s))
451            .collect();
452
453        // Build filtered snapshot (clone from buffers)
454        let mut calls = BTreeMap::new();
455
456        for (strike, data) in &self.call_buffer {
457            if active_strikes.contains(strike) {
458                calls.insert(*strike, data.clone());
459            }
460        }
461        let mut puts = BTreeMap::new();
462
463        for (strike, data) in &self.put_buffer {
464            if active_strikes.contains(strike) {
465                puts.insert(*strike, data.clone());
466            }
467        }
468
469        // Use the max observed ts_event from quotes, falling back to ts_init
470        let ts_event = if self.max_ts_event == UnixNanos::default() {
471            ts_init
472        } else {
473            self.max_ts_event
474        };
475
476        OptionChainSlice {
477            series_id: self.series_id,
478            atm_strike,
479            calls,
480            puts,
481            ts_event,
482            ts_init,
483        }
484    }
485
486    /// Returns `true` if both buffers are empty.
487    #[must_use]
488    pub fn is_buffer_empty(&self) -> bool {
489        self.call_buffer.is_empty() && self.put_buffer.is_empty()
490    }
491
492    /// Checks whether the instrument set should be rebalanced around the current ATM.
493    ///
494    /// Returns `None` when no rebalancing is needed (fixed ranges, no ATM price,
495    /// ATM strike unchanged, hysteresis not exceeded, or cooldown not elapsed).
496    /// Returns `Some(RebalanceAction)` with instrument add/remove lists when the
497    /// closest ATM strike shifts past the hysteresis threshold.
498    ///
499    /// `Delta` ranges resolve from Greeks rather than an ATM window, so their
500    /// active set can change while the closest ATM strike is unchanged. They skip
501    /// the ATM-shift and hysteresis gates and rebalance on any resolved-set change,
502    /// with the cooldown still applied to throttle churn.
503    #[must_use]
504    pub fn check_rebalance(&self, now_ns: UnixNanos) -> Option<RebalanceAction> {
505        // Fixed ranges never rebalance
506        if matches!(self.strike_range, StrikeRange::Fixed(_)) {
507            return None;
508        }
509
510        let atm_price = self.atm_tracker.atm_price()?;
511        let all_strikes = Self::sorted_strikes(&self.instruments);
512        let current_atm_strike = Self::find_closest_strike(&all_strikes, atm_price)?;
513
514        let is_delta = matches!(self.strike_range, StrikeRange::Delta { .. });
515
516        if !is_delta {
517            // No change: no rebalance
518            if self.last_atm_strike == Some(current_atm_strike) {
519                return None;
520            }
521
522            // Hysteresis check: price must cross hysteresis fraction of the gap to next strike
523            if let Some(last_strike) = self.last_atm_strike
524                && self.hysteresis > 0.0
525            {
526                let last_f = last_strike.as_f64();
527                let atm_f = atm_price.as_f64();
528                let direction = atm_f - last_f;
529
530                // Find the next strike in the direction of price movement
531                let next_strike = if direction > 0.0 {
532                    all_strikes.iter().find(|s| s.as_f64() > last_f)
533                } else {
534                    all_strikes.iter().rev().find(|s| s.as_f64() < last_f)
535                };
536
537                if let Some(next) = next_strike {
538                    let gap = (next.as_f64() - last_f).abs();
539                    let threshold = last_f + direction.signum() * self.hysteresis * gap;
540                    // Check if price has not crossed the threshold
541                    if direction > 0.0 && atm_f < threshold {
542                        return None;
543                    }
544
545                    if direction < 0.0 && atm_f > threshold {
546                        return None;
547                    }
548                }
549            }
550        }
551
552        // Cooldown check
553        if self.cooldown_ns > 0
554            && let Some(last_ts) = self.last_rebalance_ns
555            && now_ns.as_u64().saturating_sub(last_ts.as_u64()) < self.cooldown_ns
556        {
557            return None;
558        }
559
560        // Compute new active set
561        let new_active_strikes: HashSet<Price> = self
562            .resolve_active_strikes(Some(atm_price), &all_strikes)
563            .into_iter()
564            .collect();
565        let new_active: HashSet<InstrumentId> = self
566            .instruments
567            .iter()
568            .filter(|(_, (s, _))| new_active_strikes.contains(s))
569            .map(|(id, _)| *id)
570            .collect();
571
572        let add: Vec<InstrumentId> = new_active.difference(&self.active_ids).copied().collect();
573        let remove: Vec<InstrumentId> = self.active_ids.difference(&new_active).copied().collect();
574
575        // Suppress no-op delta rebalances so the cooldown timestamp is not reset on
576        // every snapshot while the resolved set is stable.
577        if is_delta && add.is_empty() && remove.is_empty() {
578            return None;
579        }
580
581        Some(RebalanceAction { add, remove })
582    }
583
584    /// Applies a rebalance action: updates the active ID set, cleans stale buffers,
585    /// and records the rebalance timestamp.
586    pub fn apply_rebalance(&mut self, action: &RebalanceAction, now_ns: UnixNanos) {
587        for id in &action.add {
588            self.active_ids.insert(*id);
589        }
590
591        for id in &action.remove {
592            self.active_ids.remove(id);
593        }
594
595        // Clean buffers for strikes no longer in active set
596        let active_strikes: HashSet<Price> = self
597            .active_ids
598            .iter()
599            .filter_map(|id| self.instruments.get(id))
600            .map(|(s, _)| *s)
601            .collect();
602        self.call_buffer
603            .retain(|strike, _| active_strikes.contains(strike));
604        self.put_buffer
605            .retain(|strike, _| active_strikes.contains(strike));
606        self.pending_greeks
607            .retain(|id, _| self.active_ids.contains(id));
608
609        // Update last_atm_strike and record rebalance timestamp
610        if let Some(atm) = self.atm_tracker.atm_price() {
611            let all_strikes = Self::sorted_strikes(&self.instruments);
612            self.last_atm_strike = Self::find_closest_strike(&all_strikes, atm);
613        }
614        self.last_rebalance_ns = Some(now_ns);
615    }
616}
617
618/// Describes instruments to add and remove during an ATM rebalance.
619#[derive(Clone, Debug, PartialEq, Eq)]
620pub struct RebalanceAction {
621    /// Instruments to subscribe to (newly in range).
622    pub add: Vec<InstrumentId>,
623    /// Instruments to unsubscribe from (no longer in range).
624    pub remove: Vec<InstrumentId>,
625}
626
627#[cfg(test)]
628impl OptionChainAggregator {
629    fn call_buffer_len(&self) -> usize {
630        self.call_buffer.len()
631    }
632
633    fn put_buffer_len(&self) -> usize {
634        self.put_buffer.len()
635    }
636
637    fn get_call_greeks_from_buffer(&self, strike: &Price) -> Option<&OptionGreeks> {
638        self.call_buffer.get(strike).and_then(|d| d.greeks.as_ref())
639    }
640
641    pub(crate) fn last_atm_strike(&self) -> Option<Price> {
642        self.last_atm_strike
643    }
644
645    fn set_hysteresis(&mut self, h: f64) {
646        self.hysteresis = h;
647    }
648
649    fn set_cooldown_ns(&mut self, ns: u64) {
650        self.cooldown_ns = ns;
651    }
652
653    fn pending_greeks_count(&self) -> usize {
654        self.pending_greeks.len()
655    }
656}
657
658#[cfg(test)]
659mod tests {
660    use nautilus_model::{data::greeks::OptionGreekValues, identifiers::Venue, types::Quantity};
661    use rstest::*;
662
663    use super::*;
664
665    fn make_series_id() -> OptionSeriesId {
666        OptionSeriesId::new(
667            Venue::new("DERIBIT"),
668            ustr::Ustr::from("BTC"),
669            ustr::Ustr::from("BTC"),
670            UnixNanos::from(1_700_000_000_000_000_000u64),
671        )
672    }
673
674    fn make_quote(instrument_id: InstrumentId, bid: &str, ask: &str) -> QuoteTick {
675        QuoteTick::new(
676            instrument_id,
677            Price::from(bid),
678            Price::from(ask),
679            Quantity::from("1.0"),
680            Quantity::from("1.0"),
681            UnixNanos::from(1u64),
682            UnixNanos::from(1u64),
683        )
684    }
685
686    fn now() -> UnixNanos {
687        // A base timestamp for tests (far enough from zero to avoid edge cases)
688        UnixNanos::from(1_000_000_000_000_000_000u64)
689    }
690
691    /// Sets ATM price on an aggregator via a synthetic `OptionGreeks` with the given forward price.
692    fn set_atm_via_greeks(agg: &mut OptionChainAggregator, price: f64) {
693        let greeks = OptionGreeks {
694            instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
695            underlying_price: Some(price),
696            ..Default::default()
697        };
698        agg.atm_tracker_mut().update_from_option_greeks(&greeks);
699    }
700
701    fn make_aggregator() -> (OptionChainAggregator, InstrumentId, InstrumentId) {
702        let call_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
703        let put_id = InstrumentId::from("BTC-20240101-50000-P.DERIBIT");
704        let strike = Price::from("50000");
705
706        let mut instrument_map = HashMap::new();
707        instrument_map.insert(call_id, (strike, OptionKind::Call));
708        instrument_map.insert(put_id, (strike, OptionKind::Put));
709
710        let tracker = AtmTracker::new();
711        let agg = OptionChainAggregator::new(
712            make_series_id(),
713            StrikeRange::Fixed(vec![strike]),
714            tracker,
715            instrument_map,
716        );
717
718        (agg, call_id, put_id)
719    }
720
721    #[rstest]
722    fn test_aggregator_instrument_ids() {
723        let (agg, call_id, put_id) = make_aggregator();
724        let ids = agg.instrument_ids();
725        assert_eq!(ids.len(), 2);
726        assert!(ids.contains(&call_id));
727        assert!(ids.contains(&put_id));
728    }
729
730    #[rstest]
731    fn test_aggregator_update_quote() {
732        let (mut agg, call_id, _) = make_aggregator();
733        let quote = make_quote(call_id, "100.00", "101.00");
734
735        agg.update_quote(&quote);
736
737        assert_eq!(agg.call_buffer_len(), 1);
738        assert_eq!(agg.put_buffer_len(), 0);
739    }
740
741    #[rstest]
742    fn test_aggregator_update_greeks() {
743        let (mut agg, call_id, _) = make_aggregator();
744        let quote = make_quote(call_id, "100.00", "101.00");
745        agg.update_quote(&quote);
746
747        let greeks = OptionGreeks {
748            instrument_id: call_id,
749            greeks: OptionGreekValues {
750                delta: 0.55,
751                ..Default::default()
752            },
753            ..Default::default()
754        };
755        agg.update_greeks(&greeks);
756
757        let strike = Price::from("50000");
758        let data = agg.get_call_greeks_from_buffer(&strike);
759        assert!(data.is_some());
760        assert_eq!(data.unwrap().delta, 0.55);
761    }
762
763    #[rstest]
764    fn test_aggregator_snapshot_preserves_state() {
765        let (mut agg, call_id, _) = make_aggregator();
766        let quote = make_quote(call_id, "100.00", "101.00");
767        agg.update_quote(&quote);
768
769        let slice = agg.snapshot(UnixNanos::from(100u64));
770        assert_eq!(slice.call_count(), 1);
771        assert_eq!(slice.ts_init, UnixNanos::from(100u64));
772
773        // Buffers should still contain data (keep-latest semantics)
774        assert!(!agg.is_buffer_empty());
775
776        // Second snapshot should return the same data
777        let slice2 = agg.snapshot(UnixNanos::from(200u64));
778        assert_eq!(slice2.call_count(), 1);
779        assert_eq!(slice2.ts_init, UnixNanos::from(200u64));
780    }
781
782    #[rstest]
783    fn test_aggregator_ignores_unknown_instrument() {
784        let (mut agg, _, _) = make_aggregator();
785        let unknown_id = InstrumentId::from("ETH-20240101-3000-C.DERIBIT");
786        let quote = make_quote(unknown_id, "100.00", "101.00");
787
788        agg.update_quote(&quote);
789
790        assert!(agg.is_buffer_empty());
791    }
792
793    #[rstest]
794    fn test_check_rebalance_returns_none() {
795        let (agg, _, _) = make_aggregator();
796        assert!(agg.check_rebalance(now()).is_none());
797    }
798
799    // -- Rebalance tests --
800
801    /// Builds instruments with 5 strike prices (45000..55000 step 2500) and `AtmRelative` +-1.
802    /// Hysteresis and cooldown are disabled so existing rebalance tests pass unchanged.
803    fn make_multi_strike_aggregator() -> OptionChainAggregator {
804        let strikes = [45000, 47500, 50000, 52500, 55000];
805        let mut instruments = HashMap::new();
806
807        for s in &strikes {
808            let strike = Price::from(&s.to_string());
809            let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
810            let put_id = InstrumentId::from(&format!("BTC-20240101-{s}-P.DERIBIT"));
811            instruments.insert(call_id, (strike, OptionKind::Call));
812            instruments.insert(put_id, (strike, OptionKind::Put));
813        }
814
815        let tracker = AtmTracker::new();
816        let mut agg = OptionChainAggregator::new(
817            make_series_id(),
818            StrikeRange::AtmRelative {
819                strikes_above: 1,
820                strikes_below: 1,
821            },
822            tracker,
823            instruments,
824        );
825        // Disable guards so existing tests exercise pure rebalance logic
826        agg.set_hysteresis(0.0);
827        agg.set_cooldown_ns(0);
828        agg
829    }
830
831    #[rstest]
832    fn test_check_rebalance_fixed_always_none() {
833        // Fixed range + ATM price set: still returns None
834        let (mut agg, _, _) = make_aggregator();
835        set_atm_via_greeks(&mut agg, 50000.0);
836        assert!(agg.check_rebalance(now()).is_none());
837    }
838
839    #[rstest]
840    fn test_check_rebalance_no_atm_returns_none() {
841        let agg = make_multi_strike_aggregator();
842        // No ATM price set: None
843        assert!(agg.check_rebalance(now()).is_none());
844    }
845
846    #[rstest]
847    fn test_check_rebalance_atm_unchanged_returns_none() {
848        let mut agg = make_multi_strike_aggregator();
849        // Set ATM to 50000 and apply initial rebalance
850        set_atm_via_greeks(&mut agg, 50000.0);
851        // First check detects ATM shift from None to 50000
852        let action = agg.check_rebalance(now()).unwrap();
853        agg.apply_rebalance(&action, now());
854
855        // ATM moves slightly but stays closest to 50000
856        set_atm_via_greeks(&mut agg, 50200.0);
857        assert!(agg.check_rebalance(now()).is_none());
858    }
859
860    #[rstest]
861    fn test_check_rebalance_detects_atm_shift() {
862        let mut agg = make_multi_strike_aggregator();
863        // Set ATM near 50000
864        set_atm_via_greeks(&mut agg, 50000.0);
865        let action = agg.check_rebalance(now()).unwrap();
866        agg.apply_rebalance(&action, now());
867        // Active: 47500, 50000, 52500 (ATM=50000, +-1 strike)
868        assert_eq!(agg.instrument_ids().len(), 6); // 3 strikes * 2
869
870        // Now shift ATM to 55000
871        set_atm_via_greeks(&mut agg, 55000.0);
872        let action2 = agg.check_rebalance(now()).unwrap();
873        // Should have instruments to add (55000) and remove (47500)
874        assert!(!action2.add.is_empty() || !action2.remove.is_empty());
875    }
876
877    #[rstest]
878    fn test_apply_rebalance_updates_instrument_map() {
879        let mut agg = make_multi_strike_aggregator();
880        // Set ATM near 50000
881        set_atm_via_greeks(&mut agg, 50000.0);
882        let action = agg.check_rebalance(now()).unwrap();
883        agg.apply_rebalance(&action, now());
884
885        // Active should be 3 strikes (47500, 50000, 52500)
886        let active_ids = agg.instrument_ids();
887        assert_eq!(active_ids.len(), 6); // 3 strikes * 2 (call + put)
888
889        // Now shift to 55000
890        set_atm_via_greeks(&mut agg, 55000.0);
891        let action2 = agg.check_rebalance(now()).unwrap();
892        agg.apply_rebalance(&action2, now());
893
894        // Active should now be (52500, 55000): 2 strikes at the top end
895        let active_ids2 = agg.instrument_ids();
896        assert_eq!(active_ids2.len(), 4); // 2 strikes * 2
897    }
898
899    #[rstest]
900    fn test_apply_rebalance_cleans_buffers() {
901        let mut agg = make_multi_strike_aggregator();
902        // Set ATM near 50000
903        set_atm_via_greeks(&mut agg, 50000.0);
904        let action = agg.check_rebalance(now()).unwrap();
905        agg.apply_rebalance(&action, now());
906
907        // Feed quotes for the 47500 call
908        let call_47500 = InstrumentId::from("BTC-20240101-47500-C.DERIBIT");
909        let quote = make_quote(call_47500, "100.00", "101.00");
910        agg.update_quote(&quote);
911        assert_eq!(agg.call_buffer_len(), 1);
912
913        // Now shift ATM up so 47500 is out of range
914        set_atm_via_greeks(&mut agg, 55000.0);
915        let action2 = agg.check_rebalance(now()).unwrap();
916        agg.apply_rebalance(&action2, now());
917
918        // Buffer for 47500 should be cleaned
919        assert_eq!(agg.call_buffer_len(), 0);
920    }
921
922    #[rstest]
923    fn test_initial_active_set_empty_when_no_atm() {
924        let agg = make_multi_strike_aggregator();
925        // AtmRelative with no ATM price: empty active set (deferred)
926        assert_eq!(agg.instrument_ids().len(), 0);
927        assert_eq!(agg.all_instrument_ids().len(), 10);
928    }
929
930    #[rstest]
931    fn test_catalog_vs_active_separation() {
932        let mut agg = make_multi_strike_aggregator();
933        // Set ATM near 50000 to narrow active set
934        set_atm_via_greeks(&mut agg, 50000.0);
935        let action = agg.check_rebalance(now()).unwrap();
936        agg.apply_rebalance(&action, now());
937
938        // Catalog should still have all 10 instruments
939        assert_eq!(agg.instruments().len(), 10);
940        // Active should be a subset
941        assert_eq!(agg.instrument_ids().len(), 6);
942    }
943
944    // -- add_instrument tests --
945
946    #[rstest]
947    fn test_add_instrument_already_known() {
948        let (mut agg, call_id, _) = make_aggregator();
949        let strike = Price::from("50000");
950        let count_before = agg.instruments().len();
951
952        let result = agg.add_instrument(call_id, strike, OptionKind::Call);
953
954        assert!(!result);
955        assert_eq!(agg.instruments().len(), count_before);
956    }
957
958    #[rstest]
959    fn test_add_instrument_new_in_active_range() {
960        let (mut agg, _, _) = make_aggregator();
961        // Fixed range includes strike 50000; adding another instrument at same strike
962        let new_id = InstrumentId::from("BTC-20240101-50000-C2.DERIBIT");
963        let strike = Price::from("50000");
964
965        let result = agg.add_instrument(new_id, strike, OptionKind::Call);
966
967        assert!(result);
968        assert_eq!(agg.instruments().len(), 3);
969        assert!(agg.active_ids().contains(&new_id));
970    }
971
972    #[rstest]
973    fn test_add_instrument_new_out_of_range() {
974        let (mut agg, _, _) = make_aggregator();
975        // Fixed range only includes 50000; adding instrument at 60000
976        let new_id = InstrumentId::from("BTC-20240101-60000-C.DERIBIT");
977        let strike = Price::from("60000");
978
979        let result = agg.add_instrument(new_id, strike, OptionKind::Call);
980
981        assert!(result);
982        assert_eq!(agg.instruments().len(), 3);
983        assert!(!agg.active_ids().contains(&new_id));
984    }
985
986    #[rstest]
987    fn test_add_instrument_available_for_rebalance() {
988        let mut agg = make_multi_strike_aggregator();
989        // Set ATM near 50000 and apply initial rebalance
990        set_atm_via_greeks(&mut agg, 50000.0);
991        let action = agg.check_rebalance(now()).unwrap();
992        agg.apply_rebalance(&action, now());
993        // Active: 47500, 50000, 52500 (6 instruments)
994        assert_eq!(agg.instrument_ids().len(), 6);
995
996        // Add a new instrument at strike 57500 (out of current range)
997        let new_id = InstrumentId::from("BTC-20240101-57500-C.DERIBIT");
998        let strike = Price::from("57500");
999        let result = agg.add_instrument(new_id, strike, OptionKind::Call);
1000        assert!(result);
1001        assert!(!agg.active_ids().contains(&new_id));
1002
1003        // Shift ATM to 57500: rebalance should pick up the new instrument
1004        set_atm_via_greeks(&mut agg, 57500.0);
1005        let action2 = agg.check_rebalance(now()).unwrap();
1006        agg.apply_rebalance(&action2, now());
1007
1008        assert!(agg.active_ids().contains(&new_id));
1009    }
1010
1011    // -- Hysteresis tests --
1012
1013    #[rstest]
1014    fn test_hysteresis_blocks_small_movement() {
1015        let strikes = [47500, 50000, 52500];
1016        let mut instruments = HashMap::new();
1017
1018        for s in &strikes {
1019            let strike = Price::from(&s.to_string());
1020            let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
1021            instruments.insert(call_id, (strike, OptionKind::Call));
1022        }
1023        let tracker = AtmTracker::new();
1024        let mut agg = OptionChainAggregator::new(
1025            make_series_id(),
1026            StrikeRange::AtmRelative {
1027                strikes_above: 1,
1028                strikes_below: 1,
1029            },
1030            tracker,
1031            instruments,
1032        );
1033        agg.set_hysteresis(0.6);
1034        agg.set_cooldown_ns(0);
1035
1036        // Set ATM to 50000
1037        set_atm_via_greeks(&mut agg, 50000.0);
1038        let action = agg.check_rebalance(now()).unwrap();
1039        agg.apply_rebalance(&action, now());
1040        assert_eq!(agg.last_atm_strike(), Some(Price::from("50000")));
1041
1042        // Move ATM slightly toward 52500: gap=2500, threshold=50000+0.6*2500=51500
1043        // 51000 does NOT cross 51500
1044        set_atm_via_greeks(&mut agg, 51000.0);
1045        assert!(agg.check_rebalance(now()).is_none());
1046    }
1047
1048    #[rstest]
1049    fn test_hysteresis_allows_large_movement() {
1050        let strikes = [47500, 50000, 52500];
1051        let mut instruments = HashMap::new();
1052
1053        for s in &strikes {
1054            let strike = Price::from(&s.to_string());
1055            let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
1056            instruments.insert(call_id, (strike, OptionKind::Call));
1057        }
1058        let tracker = AtmTracker::new();
1059        let mut agg = OptionChainAggregator::new(
1060            make_series_id(),
1061            StrikeRange::AtmRelative {
1062                strikes_above: 1,
1063                strikes_below: 1,
1064            },
1065            tracker,
1066            instruments,
1067        );
1068        agg.set_hysteresis(0.6);
1069        agg.set_cooldown_ns(0);
1070
1071        // Set ATM to 50000
1072        set_atm_via_greeks(&mut agg, 50000.0);
1073        let action = agg.check_rebalance(now()).unwrap();
1074        agg.apply_rebalance(&action, now());
1075
1076        // Move ATM well past threshold: 52000 > 51500
1077        set_atm_via_greeks(&mut agg, 52000.0);
1078        assert!(agg.check_rebalance(now()).is_some());
1079    }
1080
1081    #[rstest]
1082    fn test_zero_hysteresis_disables_guard() {
1083        let mut agg = make_multi_strike_aggregator();
1084        agg.set_hysteresis(0.0);
1085        agg.set_cooldown_ns(0);
1086
1087        set_atm_via_greeks(&mut agg, 50000.0);
1088        let action = agg.check_rebalance(now()).unwrap();
1089        agg.apply_rebalance(&action, now());
1090
1091        // Any shift past the strike boundary triggers rebalance
1092        set_atm_via_greeks(&mut agg, 52500.0);
1093        assert!(agg.check_rebalance(now()).is_some());
1094    }
1095
1096    // -- Cooldown tests --
1097
1098    #[rstest]
1099    fn test_cooldown_blocks_rapid_rebalance() {
1100        let mut agg = make_multi_strike_aggregator();
1101        agg.set_hysteresis(0.0);
1102        agg.set_cooldown_ns(5_000_000_000); // 5s
1103
1104        set_atm_via_greeks(&mut agg, 50000.0);
1105        let t0 = now();
1106        let action = agg.check_rebalance(t0).unwrap();
1107        agg.apply_rebalance(&action, t0);
1108
1109        // Shift ATM immediately: cooldown blocks
1110        set_atm_via_greeks(&mut agg, 55000.0);
1111        let t1 = UnixNanos::from(t0.as_u64() + 1_000_000_000); // 1s later
1112        assert!(agg.check_rebalance(t1).is_none());
1113    }
1114
1115    #[rstest]
1116    fn test_cooldown_allows_after_elapsed() {
1117        let mut agg = make_multi_strike_aggregator();
1118        agg.set_hysteresis(0.0);
1119        agg.set_cooldown_ns(5_000_000_000); // 5s
1120
1121        set_atm_via_greeks(&mut agg, 50000.0);
1122        let t0 = now();
1123        let action = agg.check_rebalance(t0).unwrap();
1124        agg.apply_rebalance(&action, t0);
1125
1126        // Shift ATM after cooldown elapses
1127        set_atm_via_greeks(&mut agg, 55000.0);
1128        let t1 = UnixNanos::from(t0.as_u64() + 6_000_000_000); // 6s later
1129        assert!(agg.check_rebalance(t1).is_some());
1130    }
1131
1132    #[rstest]
1133    fn test_zero_cooldown_disables_guard() {
1134        let mut agg = make_multi_strike_aggregator();
1135        agg.set_hysteresis(0.0);
1136        agg.set_cooldown_ns(0);
1137
1138        set_atm_via_greeks(&mut agg, 50000.0);
1139        let t0 = now();
1140        let action = agg.check_rebalance(t0).unwrap();
1141        agg.apply_rebalance(&action, t0);
1142
1143        // Shift ATM immediately: no cooldown block
1144        set_atm_via_greeks(&mut agg, 55000.0);
1145        assert!(agg.check_rebalance(t0).is_some());
1146    }
1147
1148    // -- Pending greeks tests --
1149
1150    #[rstest]
1151    fn test_pending_greeks_consumed_on_first_quote() {
1152        let (mut agg, call_id, _) = make_aggregator();
1153
1154        // Send greeks before any quote
1155        let greeks = OptionGreeks {
1156            instrument_id: call_id,
1157            greeks: OptionGreekValues {
1158                delta: 0.55,
1159                ..Default::default()
1160            },
1161            ..Default::default()
1162        };
1163        agg.update_greeks(&greeks);
1164        assert_eq!(agg.pending_greeks_count(), 1);
1165
1166        // Now send the first quote: pending greeks should be consumed
1167        let quote = make_quote(call_id, "100.00", "101.00");
1168        agg.update_quote(&quote);
1169        assert_eq!(agg.pending_greeks_count(), 0);
1170
1171        // Verify greeks were attached
1172        let strike = Price::from("50000");
1173        let data = agg.get_call_greeks_from_buffer(&strike);
1174        assert!(data.is_some());
1175        assert_eq!(data.unwrap().delta, 0.55);
1176    }
1177
1178    // -- ts_event tracking tests --
1179
1180    #[rstest]
1181    fn test_snapshot_ts_event_reflects_max_quote_timestamp() {
1182        let (mut agg, call_id, put_id) = make_aggregator();
1183
1184        let quote1 = QuoteTick::new(
1185            call_id,
1186            Price::from("100.00"),
1187            Price::from("101.00"),
1188            Quantity::from("1.0"),
1189            Quantity::from("1.0"),
1190            UnixNanos::from(500u64), // ts_event
1191            UnixNanos::from(500u64),
1192        );
1193        agg.update_quote(&quote1);
1194
1195        let quote2 = QuoteTick::new(
1196            put_id,
1197            Price::from("50.00"),
1198            Price::from("51.00"),
1199            Quantity::from("1.0"),
1200            Quantity::from("1.0"),
1201            UnixNanos::from(800u64), // ts_event: later
1202            UnixNanos::from(800u64),
1203        );
1204        agg.update_quote(&quote2);
1205
1206        let slice = agg.snapshot(UnixNanos::from(1000u64));
1207        assert_eq!(slice.ts_event, UnixNanos::from(800u64));
1208        assert_eq!(slice.ts_init, UnixNanos::from(1000u64));
1209    }
1210
1211    #[rstest]
1212    fn test_snapshot_ts_event_fallback_when_no_quotes() {
1213        let (agg, _, _) = make_aggregator();
1214        let slice = agg.snapshot(UnixNanos::from(1000u64));
1215        // No quotes: ts_event falls back to ts_init
1216        assert_eq!(slice.ts_event, UnixNanos::from(1000u64));
1217    }
1218
1219    #[rstest]
1220    fn test_snapshot_retains_buffered_data_during_hysteresis_window() {
1221        // Setup: 3 strikes at 47500/50000/52500, AtmRelative +-1, hysteresis enabled
1222        let strikes = [47500, 50000, 52500];
1223        let mut instruments = HashMap::new();
1224
1225        for s in &strikes {
1226            let strike = Price::from(&s.to_string());
1227            let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
1228            instruments.insert(call_id, (strike, OptionKind::Call));
1229        }
1230        let tracker = AtmTracker::new();
1231        let mut agg = OptionChainAggregator::new(
1232            make_series_id(),
1233            StrikeRange::AtmRelative {
1234                strikes_above: 1,
1235                strikes_below: 1,
1236            },
1237            tracker,
1238            instruments,
1239        );
1240        agg.set_hysteresis(0.6);
1241        agg.set_cooldown_ns(0);
1242
1243        // Set ATM to 50000, rebalance -> active: {47500, 50000, 52500}
1244        set_atm_via_greeks(&mut agg, 50000.0);
1245        let action = agg.check_rebalance(now()).unwrap();
1246        agg.apply_rebalance(&action, now());
1247        assert_eq!(agg.instrument_ids().len(), 3);
1248
1249        // Buffer quotes for all active strikes
1250        let q1 = make_quote(
1251            InstrumentId::from("BTC-20240101-47500-C.DERIBIT"),
1252            "3000.00",
1253            "3100.00",
1254        );
1255        let q2 = make_quote(
1256            InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
1257            "1500.00",
1258            "1600.00",
1259        );
1260        let q3 = make_quote(
1261            InstrumentId::from("BTC-20240101-52500-C.DERIBIT"),
1262            "500.00",
1263            "600.00",
1264        );
1265        agg.update_quote(&q1);
1266        agg.update_quote(&q2);
1267        agg.update_quote(&q3);
1268        assert_eq!(agg.call_buffer_len(), 3);
1269
1270        // Move ATM slightly toward 52500 but within hysteresis band (no rebalance)
1271        set_atm_via_greeks(&mut agg, 51000.0);
1272        assert!(agg.check_rebalance(now()).is_none());
1273
1274        // Snapshot must still include all 3 buffered strikes
1275        let slice = agg.snapshot(UnixNanos::from(100u64));
1276        assert_eq!(slice.call_count(), 3);
1277    }
1278
1279    #[rstest]
1280    fn test_remove_instrument_from_catalog() {
1281        let (mut agg, call_id, put_id) = make_aggregator();
1282        assert_eq!(agg.instruments().len(), 2);
1283
1284        let removed = agg.remove_instrument(&call_id);
1285        assert!(removed);
1286        assert_eq!(agg.instruments().len(), 1);
1287        assert!(!agg.active_ids().contains(&call_id));
1288        assert!(agg.instruments().contains_key(&put_id));
1289    }
1290
1291    #[rstest]
1292    fn test_remove_instrument_cleans_buffer() {
1293        let (mut agg, call_id, _) = make_aggregator();
1294        let quote = make_quote(call_id, "100.00", "101.00");
1295        agg.update_quote(&quote);
1296        assert_eq!(agg.call_buffer_len(), 1);
1297
1298        let _ = agg.remove_instrument(&call_id);
1299        // No sibling call at same strike, buffer entry should be removed
1300        assert_eq!(agg.call_buffer_len(), 0);
1301    }
1302
1303    #[rstest]
1304    fn test_remove_instrument_preserves_sibling_buffer() {
1305        let (mut agg, call_id, _) = make_aggregator();
1306        // Add a second call at the same strike
1307        let sibling_id = InstrumentId::from("BTC-20240101-50000-C2.DERIBIT");
1308        let strike = Price::from("50000");
1309        let _ = agg.add_instrument(sibling_id, strike, OptionKind::Call);
1310
1311        let quote = make_quote(call_id, "100.00", "101.00");
1312        agg.update_quote(&quote);
1313        assert_eq!(agg.call_buffer_len(), 1);
1314
1315        // Remove original: sibling still shares the strike+kind
1316        let _ = agg.remove_instrument(&call_id);
1317        assert_eq!(agg.call_buffer_len(), 1); // buffer preserved
1318        assert!(agg.instruments().contains_key(&sibling_id));
1319    }
1320
1321    #[rstest]
1322    fn test_remove_instrument_unknown_noop() {
1323        let (mut agg, _, _) = make_aggregator();
1324        let unknown = InstrumentId::from("ETH-20240101-3000-C.DERIBIT");
1325        assert!(!agg.remove_instrument(&unknown));
1326        assert_eq!(agg.instruments().len(), 2);
1327    }
1328
1329    #[rstest]
1330    fn test_remove_instrument_cleans_pending_greeks() {
1331        let (mut agg, call_id, _) = make_aggregator();
1332        let greeks = OptionGreeks {
1333            instrument_id: call_id,
1334            greeks: OptionGreekValues {
1335                delta: 0.55,
1336                ..Default::default()
1337            },
1338            ..Default::default()
1339        };
1340        agg.update_greeks(&greeks);
1341        assert_eq!(agg.pending_greeks_count(), 1);
1342
1343        let _ = agg.remove_instrument(&call_id);
1344        assert_eq!(agg.pending_greeks_count(), 0);
1345    }
1346
1347    #[rstest]
1348    fn test_is_catalog_empty_after_full_removal() {
1349        let (mut agg, call_id, put_id) = make_aggregator();
1350        assert!(!agg.is_catalog_empty());
1351
1352        let _ = agg.remove_instrument(&call_id);
1353        assert!(!agg.is_catalog_empty());
1354
1355        let _ = agg.remove_instrument(&put_id);
1356        assert!(agg.is_catalog_empty());
1357    }
1358
1359    // -- Expiry guard tests --
1360
1361    #[rstest]
1362    fn test_expired_quote_is_dropped() {
1363        let (mut agg, call_id, _) = make_aggregator();
1364        // Series expires at 1_700_000_000_000_000_000; send quote AT that timestamp
1365        let expired_quote = QuoteTick::new(
1366            call_id,
1367            Price::from("100.00"),
1368            Price::from("101.00"),
1369            Quantity::from("1.0"),
1370            Quantity::from("1.0"),
1371            UnixNanos::from(1_700_000_000_000_000_000u64),
1372            UnixNanos::from(1_700_000_000_000_000_000u64),
1373        );
1374        agg.update_quote(&expired_quote);
1375        assert!(agg.is_buffer_empty());
1376    }
1377
1378    #[rstest]
1379    fn test_expired_greeks_are_dropped() {
1380        let (mut agg, call_id, _) = make_aggregator();
1381        // First add a valid quote so greeks would normally land in the buffer
1382        let quote = make_quote(call_id, "100.00", "101.00");
1383        agg.update_quote(&quote);
1384        assert_eq!(agg.call_buffer_len(), 1);
1385
1386        // Send greeks at expiry timestamp: should be dropped
1387        let greeks = OptionGreeks {
1388            instrument_id: call_id,
1389            ts_event: UnixNanos::from(1_700_000_000_000_000_000u64),
1390            greeks: OptionGreekValues {
1391                delta: 0.55,
1392                ..Default::default()
1393            },
1394            ..Default::default()
1395        };
1396        agg.update_greeks(&greeks);
1397
1398        let strike = Price::from("50000");
1399        assert!(agg.get_call_greeks_from_buffer(&strike).is_none());
1400    }
1401
1402    // -- Delta range tests --
1403
1404    /// Builds a `Delta`-range aggregator over `strikes` (call + put per strike),
1405    /// with hysteresis and cooldown disabled so rebalance decisions reflect pure
1406    /// delta resolution.
1407    fn make_delta_aggregator(
1408        strikes: &[i64],
1409        target: f64,
1410        tolerance: f64,
1411    ) -> OptionChainAggregator {
1412        let mut instruments = HashMap::new();
1413
1414        for s in strikes {
1415            let strike = Price::from(&s.to_string());
1416            instruments.insert(option_id(*s, OptionKind::Call), (strike, OptionKind::Call));
1417            instruments.insert(option_id(*s, OptionKind::Put), (strike, OptionKind::Put));
1418        }
1419        let tracker = AtmTracker::new();
1420        let mut agg = OptionChainAggregator::new(
1421            make_series_id(),
1422            StrikeRange::Delta { target, tolerance },
1423            tracker,
1424            instruments,
1425        );
1426        agg.set_hysteresis(0.0);
1427        agg.set_cooldown_ns(0);
1428        agg
1429    }
1430
1431    fn option_id(strike: i64, kind: OptionKind) -> InstrumentId {
1432        let suffix = match kind {
1433            OptionKind::Call => "C",
1434            OptionKind::Put => "P",
1435        };
1436        InstrumentId::from(&format!("BTC-20240101-{strike}-{suffix}.DERIBIT"))
1437    }
1438
1439    /// Feeds a quote then greeks (with the given `delta`) for one option leg.
1440    fn feed_quote_and_greeks(
1441        agg: &mut OptionChainAggregator,
1442        strike: i64,
1443        kind: OptionKind,
1444        delta: f64,
1445    ) {
1446        let id = option_id(strike, kind);
1447        agg.update_quote(&make_quote(id, "100.00", "101.00"));
1448        agg.update_greeks(&OptionGreeks {
1449            instrument_id: id,
1450            greeks: OptionGreekValues {
1451                delta,
1452                ..Default::default()
1453            },
1454            ..Default::default()
1455        });
1456    }
1457
1458    #[rstest]
1459    #[case(0.30, 0.30, 0.03, true)] // exact target
1460    #[case(-0.30, 0.30, 0.03, true)] // negative delta, magnitude matches
1461    #[case(0.28, 0.30, 0.03, true)] // inside band, below target
1462    #[case(0.32, 0.30, 0.03, true)] // inside band, above target
1463    #[case(0.20, 0.30, 0.03, false)] // below band
1464    #[case(0.40, 0.30, 0.03, false)] // above band
1465    fn test_delta_within_band(
1466        #[case] delta: f64,
1467        #[case] target: f64,
1468        #[case] tolerance: f64,
1469        #[case] expected: bool,
1470    ) {
1471        assert_eq!(
1472            OptionChainAggregator::delta_within_band(delta, target, tolerance),
1473            expected
1474        );
1475    }
1476
1477    #[rstest]
1478    fn test_delta_target_hit() {
1479        let strikes = [40000, 45000, 50000, 55000, 60000];
1480        let mut agg = make_delta_aggregator(&strikes, 0.30, 0.03);
1481        // Bootstrap the ATM-relative fallback so the window is active and greeks can land.
1482        set_atm_via_greeks(&mut agg, 50000.0);
1483        agg.recompute_active_set();
1484        assert_eq!(agg.instrument_ids().len(), 10); // all 5 strikes (fallback)
1485
1486        // Only the 55000 call sits at the 0.30 target.
1487        feed_quote_and_greeks(&mut agg, 40000, OptionKind::Call, 0.95);
1488        feed_quote_and_greeks(&mut agg, 45000, OptionKind::Call, 0.80);
1489        feed_quote_and_greeks(&mut agg, 50000, OptionKind::Call, 0.55);
1490        feed_quote_and_greeks(&mut agg, 55000, OptionKind::Call, 0.30);
1491        feed_quote_and_greeks(&mut agg, 60000, OptionKind::Call, 0.12);
1492
1493        let active = agg.recompute_active_set();
1494        assert_eq!(active.len(), 2); // 55000 call + put
1495        assert!(active.contains(&option_id(55000, OptionKind::Call)));
1496        assert!(active.contains(&option_id(55000, OptionKind::Put)));
1497    }
1498
1499    #[rstest]
1500    fn test_delta_tolerance_band() {
1501        let strikes = [45000, 50000, 55000, 60000];
1502        let mut agg = make_delta_aggregator(&strikes, 0.30, 0.05);
1503        set_atm_via_greeks(&mut agg, 50000.0);
1504        agg.recompute_active_set();
1505
1506        // Band is [0.25, 0.35]: 0.50 and 0.10 are outside, 0.32 and 0.30 inside.
1507        feed_quote_and_greeks(&mut agg, 45000, OptionKind::Call, 0.50);
1508        feed_quote_and_greeks(&mut agg, 50000, OptionKind::Call, 0.32);
1509        feed_quote_and_greeks(&mut agg, 55000, OptionKind::Call, 0.30);
1510        feed_quote_and_greeks(&mut agg, 60000, OptionKind::Call, 0.10);
1511
1512        let active = agg.recompute_active_set();
1513        assert_eq!(active.len(), 4); // 50000 + 55000, both legs each
1514        assert!(active.contains(&option_id(50000, OptionKind::Call)));
1515        assert!(active.contains(&option_id(55000, OptionKind::Call)));
1516        assert!(!active.contains(&option_id(45000, OptionKind::Call)));
1517        assert!(!active.contains(&option_id(60000, OptionKind::Call)));
1518    }
1519
1520    #[rstest]
1521    fn test_delta_matches_put_by_magnitude() {
1522        let strikes = [45000, 50000, 55000];
1523        let mut agg = make_delta_aggregator(&strikes, 0.30, 0.03);
1524        set_atm_via_greeks(&mut agg, 50000.0);
1525        agg.recompute_active_set();
1526
1527        // Only the 45000 put reports greeks, isolating put-side matching.
1528        feed_quote_and_greeks(&mut agg, 45000, OptionKind::Put, -0.30);
1529
1530        let active = agg.recompute_active_set();
1531        // |-0.30| == target, so the 45000 strike (both legs) is selected.
1532        assert_eq!(active.len(), 2);
1533        assert!(active.contains(&option_id(45000, OptionKind::Put)));
1534        assert!(active.contains(&option_id(45000, OptionKind::Call)));
1535    }
1536
1537    #[rstest]
1538    fn test_delta_no_greeks_falls_back_to_atm_window() {
1539        // 13 strikes so the ATM-relative fallback window is a proper subset.
1540        let strikes: Vec<i64> = (0..13).map(|i| 40000 + i * 1000).collect();
1541        let mut agg = make_delta_aggregator(&strikes, 0.25, 0.05);
1542        set_atm_via_greeks(&mut agg, 46000.0); // centered
1543
1544        let active = agg.recompute_active_set();
1545
1546        // No greeks -> a bounded ATM-relative window: neither empty nor the full chain.
1547        // The exact window width is asserted in the model-level resolve test.
1548        assert!(active.len() > 2);
1549        assert!(active.len() < strikes.len() * 2);
1550        assert!(active.contains(&option_id(46000, OptionKind::Call))); // ATM included
1551        assert!(!active.contains(&option_id(40000, OptionKind::Call))); // extreme excluded
1552        assert!(!active.contains(&option_id(52000, OptionKind::Call))); // extreme excluded
1553    }
1554
1555    #[rstest]
1556    fn test_delta_pending_only_greeks_eligible() {
1557        let strikes = [45000, 50000, 55000];
1558        let mut agg = make_delta_aggregator(&strikes, 0.30, 0.03);
1559        set_atm_via_greeks(&mut agg, 50000.0);
1560        agg.recompute_active_set();
1561
1562        // Greeks arrive before any quote, so they land in pending_greeks.
1563        agg.update_greeks(&OptionGreeks {
1564            instrument_id: option_id(55000, OptionKind::Call),
1565            greeks: OptionGreekValues {
1566                delta: 0.30,
1567                ..Default::default()
1568            },
1569            ..Default::default()
1570        });
1571        assert_eq!(agg.pending_greeks_count(), 1);
1572
1573        let active = agg.recompute_active_set();
1574        // Pending-only greeks are eligible for delta resolution.
1575        assert_eq!(active.len(), 2);
1576        assert!(active.contains(&option_id(55000, OptionKind::Call)));
1577    }
1578
1579    #[rstest]
1580    fn test_delta_rebalances_on_greeks_with_atm_unchanged() {
1581        let strikes = [45000, 50000, 55000];
1582        let mut agg = make_delta_aggregator(&strikes, 0.30, 0.03);
1583        set_atm_via_greeks(&mut agg, 50000.0);
1584        agg.recompute_active_set();
1585        assert_eq!(agg.last_atm_strike(), Some(Price::from("50000")));
1586        assert_eq!(agg.instrument_ids().len(), 6); // fallback: all 3 strikes
1587
1588        // Greeks arrive; only 55000 matches. The closest ATM strike is unchanged.
1589        feed_quote_and_greeks(&mut agg, 45000, OptionKind::Call, 0.55);
1590        feed_quote_and_greeks(&mut agg, 50000, OptionKind::Call, 0.45);
1591        feed_quote_and_greeks(&mut agg, 55000, OptionKind::Call, 0.30);
1592
1593        let action = agg
1594            .check_rebalance(now())
1595            .expect("delta range should rebalance when greeks narrow the set");
1596        assert!(action.add.is_empty());
1597        assert!(!action.remove.is_empty());
1598
1599        agg.apply_rebalance(&action, now());
1600        assert_eq!(agg.instrument_ids().len(), 2); // narrowed to the 55000 legs
1601        assert!(
1602            agg.active_ids()
1603                .contains(&option_id(55000, OptionKind::Call))
1604        );
1605    }
1606
1607    #[rstest]
1608    fn test_delta_no_op_rebalance_returns_none() {
1609        let strikes = [45000, 50000, 55000];
1610        let mut agg = make_delta_aggregator(&strikes, 0.30, 0.03);
1611        set_atm_via_greeks(&mut agg, 50000.0);
1612        agg.recompute_active_set();
1613        feed_quote_and_greeks(&mut agg, 45000, OptionKind::Call, 0.55);
1614        feed_quote_and_greeks(&mut agg, 50000, OptionKind::Call, 0.45);
1615        feed_quote_and_greeks(&mut agg, 55000, OptionKind::Call, 0.30);
1616
1617        // First rebalance narrows to the 55000 legs.
1618        let action = agg.check_rebalance(now()).unwrap();
1619        agg.apply_rebalance(&action, now());
1620        assert_eq!(agg.instrument_ids().len(), 2);
1621
1622        // Greeks unchanged -> stable set -> no-op suppressed (cooldown disabled).
1623        assert!(agg.check_rebalance(now()).is_none());
1624    }
1625}