1use std::collections::{BTreeMap, HashMap, HashSet};
19
20use nautilus_core::UnixNanos;
21use nautilus_model::{
22 data::{
23 QuoteTick,
24 option_chain::{OptionChainSlice, OptionGreeks, OptionStrikeData, StrikeRange},
25 },
26 enums::OptionKind,
27 identifiers::{InstrumentId, OptionSeriesId},
28 types::Price,
29};
30
31use super::{
32 AtmTracker,
33 constants::{DEFAULT_REBALANCE_COOLDOWN_NS, DEFAULT_REBALANCE_HYSTERESIS},
34};
35
36#[derive(Debug)]
41pub struct OptionChainAggregator {
42 series_id: OptionSeriesId,
44 strike_range: StrikeRange,
46 atm_tracker: AtmTracker,
48 instruments: HashMap<InstrumentId, (Price, OptionKind)>,
51 active_ids: HashSet<InstrumentId>,
53 last_atm_strike: Option<Price>,
55 hysteresis: f64,
57 cooldown_ns: u64,
59 last_rebalance_ns: Option<UnixNanos>,
61 max_ts_event: UnixNanos,
63 pending_greeks: HashMap<InstrumentId, OptionGreeks>,
65 call_buffer: BTreeMap<Price, OptionStrikeData>,
67 put_buffer: BTreeMap<Price, OptionStrikeData>,
69}
70
71impl OptionChainAggregator {
72 pub fn new(
79 series_id: OptionSeriesId,
80 strike_range: StrikeRange,
81 atm_tracker: AtmTracker,
82 instruments: HashMap<InstrumentId, (Price, OptionKind)>,
83 ) -> Self {
84 let mut aggregator = Self {
85 series_id,
86 strike_range,
87 atm_tracker,
88 instruments,
89 active_ids: HashSet::new(),
90 last_atm_strike: None,
91 hysteresis: DEFAULT_REBALANCE_HYSTERESIS,
92 cooldown_ns: DEFAULT_REBALANCE_COOLDOWN_NS,
93 last_rebalance_ns: None,
94 max_ts_event: UnixNanos::default(),
95 pending_greeks: HashMap::new(),
96 call_buffer: BTreeMap::new(),
97 put_buffer: BTreeMap::new(),
98 };
99 aggregator.recompute_active_set();
101 aggregator
102 }
103
104 pub fn atm_tracker_mut(&mut self) -> &mut AtmTracker {
106 &mut self.atm_tracker
107 }
108
109 #[must_use]
111 pub fn instrument_ids(&self) -> Vec<InstrumentId> {
112 self.active_ids.iter().copied().collect()
113 }
114
115 #[must_use]
117 pub fn active_ids(&self) -> &HashSet<InstrumentId> {
118 &self.active_ids
119 }
120
121 #[must_use]
123 pub fn series_id(&self) -> OptionSeriesId {
124 self.series_id
125 }
126
127 #[must_use]
129 pub fn is_expired(&self, now_ns: UnixNanos) -> bool {
130 now_ns >= self.series_id.expiration_ns
131 }
132
133 #[must_use]
135 pub fn instruments(&self) -> &HashMap<InstrumentId, (Price, OptionKind)> {
136 &self.instruments
137 }
138
139 #[must_use]
141 pub fn all_instrument_ids(&self) -> Vec<InstrumentId> {
142 self.instruments.keys().copied().collect()
143 }
144
145 #[must_use]
147 pub fn is_catalog_empty(&self) -> bool {
148 self.instruments.is_empty()
149 }
150
151 #[must_use]
157 pub fn remove_instrument(&mut self, instrument_id: &InstrumentId) -> bool {
158 let Some((strike, kind)) = self.instruments.remove(instrument_id) else {
159 return false;
160 };
161
162 self.active_ids.remove(instrument_id);
163 self.pending_greeks.remove(instrument_id);
164
165 let has_sibling = self
167 .instruments
168 .values()
169 .any(|(s, k)| *s == strike && *k == kind);
170
171 if !has_sibling {
172 let buffer = match kind {
173 OptionKind::Call => &mut self.call_buffer,
174 OptionKind::Put => &mut self.put_buffer,
175 };
176 buffer.remove(&strike);
177 }
178
179 true
180 }
181
182 #[must_use]
184 pub fn atm_tracker(&self) -> &AtmTracker {
185 &self.atm_tracker
186 }
187
188 pub fn recompute_active_set(&mut self) -> Vec<InstrumentId> {
193 let atm_price = self.atm_tracker.atm_price();
194 let all_strikes = Self::sorted_strikes(&self.instruments);
195 let active_strikes: HashSet<Price> = self
196 .resolve_active_strikes(atm_price, &all_strikes)
197 .into_iter()
198 .collect();
199 self.active_ids = self
200 .instruments
201 .iter()
202 .filter(|(_, (strike, _))| active_strikes.contains(strike))
203 .map(|(id, _)| *id)
204 .collect();
205 self.last_atm_strike =
206 atm_price.and_then(|atm| Self::find_closest_strike(&all_strikes, atm));
207 self.active_ids.iter().copied().collect()
208 }
209
210 fn resolve_active_strikes(
215 &self,
216 atm_price: Option<Price>,
217 all_strikes: &[Price],
218 ) -> Vec<Price> {
219 match &self.strike_range {
220 StrikeRange::Delta { target, tolerance } => {
221 self.resolve_delta(*target, *tolerance, atm_price, all_strikes)
222 }
223 _ => self.strike_range.resolve(atm_price, all_strikes),
224 }
225 }
226
227 fn resolve_delta(
237 &self,
238 target: f64,
239 tolerance: f64,
240 atm_price: Option<Price>,
241 all_strikes: &[Price],
242 ) -> Vec<Price> {
243 let selected: Vec<Price> = self
244 .deltas_by_strike()
245 .into_iter()
246 .filter(|(_, deltas)| {
247 deltas
248 .iter()
249 .any(|delta| Self::delta_within_band(*delta, target, tolerance))
250 })
251 .map(|(strike, _)| strike)
252 .collect();
253
254 if selected.is_empty() {
255 return self.strike_range.resolve(atm_price, all_strikes);
256 }
257
258 selected
259 }
260
261 fn deltas_by_strike(&self) -> BTreeMap<Price, Vec<f64>> {
264 let mut deltas_by_strike: BTreeMap<Price, Vec<f64>> = BTreeMap::new();
265
266 for (strike, data) in self.call_buffer.iter().chain(self.put_buffer.iter()) {
267 if let Some(greeks) = data.greeks.as_ref() {
268 deltas_by_strike
269 .entry(*strike)
270 .or_default()
271 .push(greeks.delta);
272 }
273 }
274
275 for (id, greeks) in &self.pending_greeks {
276 if let Some((strike, _)) = self.instruments.get(id) {
277 deltas_by_strike
278 .entry(*strike)
279 .or_default()
280 .push(greeks.delta);
281 }
282 }
283
284 deltas_by_strike
285 }
286
287 fn delta_within_band(delta: f64, target: f64, tolerance: f64) -> bool {
292 (delta.abs() - target).abs() <= tolerance
293 }
294
295 #[must_use]
302 pub fn add_instrument(
303 &mut self,
304 instrument_id: InstrumentId,
305 strike: Price,
306 kind: OptionKind,
307 ) -> bool {
308 if self.instruments.contains_key(&instrument_id) {
309 return false;
310 }
311
312 self.instruments.insert(instrument_id, (strike, kind));
313
314 let all_strikes = Self::sorted_strikes(&self.instruments);
316 let atm_price = self.atm_tracker.atm_price();
317 let active_strikes: HashSet<Price> = self
318 .resolve_active_strikes(atm_price, &all_strikes)
319 .into_iter()
320 .collect();
321
322 if active_strikes.contains(&strike) {
323 self.active_ids.insert(instrument_id);
324 }
325
326 true
327 }
328
329 fn sorted_strikes(instruments: &HashMap<InstrumentId, (Price, OptionKind)>) -> Vec<Price> {
331 let mut strikes: Vec<Price> = instruments.values().map(|(s, _)| *s).collect();
332 strikes.sort();
333 strikes.dedup();
334 strikes
335 }
336
337 fn find_closest_strike(all_strikes: &[Price], atm: Price) -> Option<Price> {
339 all_strikes
340 .iter()
341 .min_by(|a, b| {
342 let da = (a.as_f64() - atm.as_f64()).abs();
343 let db = (b.as_f64() - atm.as_f64()).abs();
344 da.partial_cmp(&db).unwrap_or(std::cmp::Ordering::Equal)
345 })
346 .copied()
347 }
348
349 pub fn update_quote(&mut self, quote: &QuoteTick) {
351 if self.is_expired(quote.ts_event) {
352 log::warn!(
353 "Dropping quote for {}, series {} expired at {}",
354 quote.instrument_id,
355 self.series_id,
356 self.series_id.expiration_ns,
357 );
358 return;
359 }
360
361 if !self.active_ids.contains("e.instrument_id) {
362 return;
363 }
364
365 if let Some(&(strike, kind)) = self.instruments.get("e.instrument_id) {
366 if quote.ts_event > self.max_ts_event {
368 self.max_ts_event = quote.ts_event;
369 }
370
371 let buffer = match kind {
372 OptionKind::Call => &mut self.call_buffer,
373 OptionKind::Put => &mut self.put_buffer,
374 };
375
376 match buffer.get_mut(&strike) {
377 Some(data) => data.quote = *quote,
378 None => {
379 let greeks = self.pending_greeks.remove("e.instrument_id);
381 buffer.insert(
382 strike,
383 OptionStrikeData {
384 quote: *quote,
385 greeks,
386 },
387 );
388 }
389 }
390 }
391 }
392
393 pub fn update_greeks(&mut self, greeks: &OptionGreeks) {
399 if self.is_expired(greeks.ts_event) {
400 log::warn!(
401 "Dropping greeks for {}, series {} expired at {}",
402 greeks.instrument_id,
403 self.series_id,
404 self.series_id.expiration_ns,
405 );
406 return;
407 }
408
409 if !self.active_ids.contains(&greeks.instrument_id) {
410 return;
411 }
412
413 if let Some(&(strike, kind)) = self.instruments.get(&greeks.instrument_id) {
414 let buffer = match kind {
415 OptionKind::Call => &mut self.call_buffer,
416 OptionKind::Put => &mut self.put_buffer,
417 };
418
419 match buffer.get_mut(&strike) {
420 Some(data) => data.greeks = Some(*greeks),
421 None => {
422 self.pending_greeks.insert(greeks.instrument_id, *greeks);
424 }
425 }
426 }
427 }
428
429 pub fn snapshot(&self, ts_init: UnixNanos) -> OptionChainSlice {
438 let atm_price = self.atm_tracker.atm_price();
439
440 let catalog_strikes = Self::sorted_strikes(&self.instruments);
442 let atm_strike = atm_price.and_then(|atm| Self::find_closest_strike(&catalog_strikes, atm));
443
444 let active_strikes: HashSet<Price> = self
448 .active_ids
449 .iter()
450 .filter_map(|id| self.instruments.get(id).map(|(s, _)| *s))
451 .collect();
452
453 let mut calls = BTreeMap::new();
455
456 for (strike, data) in &self.call_buffer {
457 if active_strikes.contains(strike) {
458 calls.insert(*strike, data.clone());
459 }
460 }
461 let mut puts = BTreeMap::new();
462
463 for (strike, data) in &self.put_buffer {
464 if active_strikes.contains(strike) {
465 puts.insert(*strike, data.clone());
466 }
467 }
468
469 let ts_event = if self.max_ts_event == UnixNanos::default() {
471 ts_init
472 } else {
473 self.max_ts_event
474 };
475
476 OptionChainSlice {
477 series_id: self.series_id,
478 atm_strike,
479 calls,
480 puts,
481 ts_event,
482 ts_init,
483 }
484 }
485
486 #[must_use]
488 pub fn is_buffer_empty(&self) -> bool {
489 self.call_buffer.is_empty() && self.put_buffer.is_empty()
490 }
491
492 #[must_use]
504 pub fn check_rebalance(&self, now_ns: UnixNanos) -> Option<RebalanceAction> {
505 if matches!(self.strike_range, StrikeRange::Fixed(_)) {
507 return None;
508 }
509
510 let atm_price = self.atm_tracker.atm_price()?;
511 let all_strikes = Self::sorted_strikes(&self.instruments);
512 let current_atm_strike = Self::find_closest_strike(&all_strikes, atm_price)?;
513
514 let is_delta = matches!(self.strike_range, StrikeRange::Delta { .. });
515
516 if !is_delta {
517 if self.last_atm_strike == Some(current_atm_strike) {
519 return None;
520 }
521
522 if let Some(last_strike) = self.last_atm_strike
524 && self.hysteresis > 0.0
525 {
526 let last_f = last_strike.as_f64();
527 let atm_f = atm_price.as_f64();
528 let direction = atm_f - last_f;
529
530 let next_strike = if direction > 0.0 {
532 all_strikes.iter().find(|s| s.as_f64() > last_f)
533 } else {
534 all_strikes.iter().rev().find(|s| s.as_f64() < last_f)
535 };
536
537 if let Some(next) = next_strike {
538 let gap = (next.as_f64() - last_f).abs();
539 let threshold = last_f + direction.signum() * self.hysteresis * gap;
540 if direction > 0.0 && atm_f < threshold {
542 return None;
543 }
544
545 if direction < 0.0 && atm_f > threshold {
546 return None;
547 }
548 }
549 }
550 }
551
552 if self.cooldown_ns > 0
554 && let Some(last_ts) = self.last_rebalance_ns
555 && now_ns.as_u64().saturating_sub(last_ts.as_u64()) < self.cooldown_ns
556 {
557 return None;
558 }
559
560 let new_active_strikes: HashSet<Price> = self
562 .resolve_active_strikes(Some(atm_price), &all_strikes)
563 .into_iter()
564 .collect();
565 let new_active: HashSet<InstrumentId> = self
566 .instruments
567 .iter()
568 .filter(|(_, (s, _))| new_active_strikes.contains(s))
569 .map(|(id, _)| *id)
570 .collect();
571
572 let add: Vec<InstrumentId> = new_active.difference(&self.active_ids).copied().collect();
573 let remove: Vec<InstrumentId> = self.active_ids.difference(&new_active).copied().collect();
574
575 if is_delta && add.is_empty() && remove.is_empty() {
578 return None;
579 }
580
581 Some(RebalanceAction { add, remove })
582 }
583
584 pub fn apply_rebalance(&mut self, action: &RebalanceAction, now_ns: UnixNanos) {
587 for id in &action.add {
588 self.active_ids.insert(*id);
589 }
590
591 for id in &action.remove {
592 self.active_ids.remove(id);
593 }
594
595 let active_strikes: HashSet<Price> = self
597 .active_ids
598 .iter()
599 .filter_map(|id| self.instruments.get(id))
600 .map(|(s, _)| *s)
601 .collect();
602 self.call_buffer
603 .retain(|strike, _| active_strikes.contains(strike));
604 self.put_buffer
605 .retain(|strike, _| active_strikes.contains(strike));
606 self.pending_greeks
607 .retain(|id, _| self.active_ids.contains(id));
608
609 if let Some(atm) = self.atm_tracker.atm_price() {
611 let all_strikes = Self::sorted_strikes(&self.instruments);
612 self.last_atm_strike = Self::find_closest_strike(&all_strikes, atm);
613 }
614 self.last_rebalance_ns = Some(now_ns);
615 }
616}
617
618#[derive(Clone, Debug, PartialEq, Eq)]
620pub struct RebalanceAction {
621 pub add: Vec<InstrumentId>,
623 pub remove: Vec<InstrumentId>,
625}
626
627#[cfg(test)]
628impl OptionChainAggregator {
629 fn call_buffer_len(&self) -> usize {
630 self.call_buffer.len()
631 }
632
633 fn put_buffer_len(&self) -> usize {
634 self.put_buffer.len()
635 }
636
637 fn get_call_greeks_from_buffer(&self, strike: &Price) -> Option<&OptionGreeks> {
638 self.call_buffer.get(strike).and_then(|d| d.greeks.as_ref())
639 }
640
641 pub(crate) fn last_atm_strike(&self) -> Option<Price> {
642 self.last_atm_strike
643 }
644
645 fn set_hysteresis(&mut self, h: f64) {
646 self.hysteresis = h;
647 }
648
649 fn set_cooldown_ns(&mut self, ns: u64) {
650 self.cooldown_ns = ns;
651 }
652
653 fn pending_greeks_count(&self) -> usize {
654 self.pending_greeks.len()
655 }
656}
657
658#[cfg(test)]
659mod tests {
660 use nautilus_model::{data::greeks::OptionGreekValues, identifiers::Venue, types::Quantity};
661 use rstest::*;
662
663 use super::*;
664
665 fn make_series_id() -> OptionSeriesId {
666 OptionSeriesId::new(
667 Venue::new("DERIBIT"),
668 ustr::Ustr::from("BTC"),
669 ustr::Ustr::from("BTC"),
670 UnixNanos::from(1_700_000_000_000_000_000u64),
671 )
672 }
673
674 fn make_quote(instrument_id: InstrumentId, bid: &str, ask: &str) -> QuoteTick {
675 QuoteTick::new(
676 instrument_id,
677 Price::from(bid),
678 Price::from(ask),
679 Quantity::from("1.0"),
680 Quantity::from("1.0"),
681 UnixNanos::from(1u64),
682 UnixNanos::from(1u64),
683 )
684 }
685
686 fn now() -> UnixNanos {
687 UnixNanos::from(1_000_000_000_000_000_000u64)
689 }
690
691 fn set_atm_via_greeks(agg: &mut OptionChainAggregator, price: f64) {
693 let greeks = OptionGreeks {
694 instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
695 underlying_price: Some(price),
696 ..Default::default()
697 };
698 agg.atm_tracker_mut().update_from_option_greeks(&greeks);
699 }
700
701 fn make_aggregator() -> (OptionChainAggregator, InstrumentId, InstrumentId) {
702 let call_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
703 let put_id = InstrumentId::from("BTC-20240101-50000-P.DERIBIT");
704 let strike = Price::from("50000");
705
706 let mut instrument_map = HashMap::new();
707 instrument_map.insert(call_id, (strike, OptionKind::Call));
708 instrument_map.insert(put_id, (strike, OptionKind::Put));
709
710 let tracker = AtmTracker::new();
711 let agg = OptionChainAggregator::new(
712 make_series_id(),
713 StrikeRange::Fixed(vec![strike]),
714 tracker,
715 instrument_map,
716 );
717
718 (agg, call_id, put_id)
719 }
720
721 #[rstest]
722 fn test_aggregator_instrument_ids() {
723 let (agg, call_id, put_id) = make_aggregator();
724 let ids = agg.instrument_ids();
725 assert_eq!(ids.len(), 2);
726 assert!(ids.contains(&call_id));
727 assert!(ids.contains(&put_id));
728 }
729
730 #[rstest]
731 fn test_aggregator_update_quote() {
732 let (mut agg, call_id, _) = make_aggregator();
733 let quote = make_quote(call_id, "100.00", "101.00");
734
735 agg.update_quote("e);
736
737 assert_eq!(agg.call_buffer_len(), 1);
738 assert_eq!(agg.put_buffer_len(), 0);
739 }
740
741 #[rstest]
742 fn test_aggregator_update_greeks() {
743 let (mut agg, call_id, _) = make_aggregator();
744 let quote = make_quote(call_id, "100.00", "101.00");
745 agg.update_quote("e);
746
747 let greeks = OptionGreeks {
748 instrument_id: call_id,
749 greeks: OptionGreekValues {
750 delta: 0.55,
751 ..Default::default()
752 },
753 ..Default::default()
754 };
755 agg.update_greeks(&greeks);
756
757 let strike = Price::from("50000");
758 let data = agg.get_call_greeks_from_buffer(&strike);
759 assert!(data.is_some());
760 assert_eq!(data.unwrap().delta, 0.55);
761 }
762
763 #[rstest]
764 fn test_aggregator_snapshot_preserves_state() {
765 let (mut agg, call_id, _) = make_aggregator();
766 let quote = make_quote(call_id, "100.00", "101.00");
767 agg.update_quote("e);
768
769 let slice = agg.snapshot(UnixNanos::from(100u64));
770 assert_eq!(slice.call_count(), 1);
771 assert_eq!(slice.ts_init, UnixNanos::from(100u64));
772
773 assert!(!agg.is_buffer_empty());
775
776 let slice2 = agg.snapshot(UnixNanos::from(200u64));
778 assert_eq!(slice2.call_count(), 1);
779 assert_eq!(slice2.ts_init, UnixNanos::from(200u64));
780 }
781
782 #[rstest]
783 fn test_aggregator_ignores_unknown_instrument() {
784 let (mut agg, _, _) = make_aggregator();
785 let unknown_id = InstrumentId::from("ETH-20240101-3000-C.DERIBIT");
786 let quote = make_quote(unknown_id, "100.00", "101.00");
787
788 agg.update_quote("e);
789
790 assert!(agg.is_buffer_empty());
791 }
792
793 #[rstest]
794 fn test_check_rebalance_returns_none() {
795 let (agg, _, _) = make_aggregator();
796 assert!(agg.check_rebalance(now()).is_none());
797 }
798
799 fn make_multi_strike_aggregator() -> OptionChainAggregator {
804 let strikes = [45000, 47500, 50000, 52500, 55000];
805 let mut instruments = HashMap::new();
806
807 for s in &strikes {
808 let strike = Price::from(&s.to_string());
809 let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
810 let put_id = InstrumentId::from(&format!("BTC-20240101-{s}-P.DERIBIT"));
811 instruments.insert(call_id, (strike, OptionKind::Call));
812 instruments.insert(put_id, (strike, OptionKind::Put));
813 }
814
815 let tracker = AtmTracker::new();
816 let mut agg = OptionChainAggregator::new(
817 make_series_id(),
818 StrikeRange::AtmRelative {
819 strikes_above: 1,
820 strikes_below: 1,
821 },
822 tracker,
823 instruments,
824 );
825 agg.set_hysteresis(0.0);
827 agg.set_cooldown_ns(0);
828 agg
829 }
830
831 #[rstest]
832 fn test_check_rebalance_fixed_always_none() {
833 let (mut agg, _, _) = make_aggregator();
835 set_atm_via_greeks(&mut agg, 50000.0);
836 assert!(agg.check_rebalance(now()).is_none());
837 }
838
839 #[rstest]
840 fn test_check_rebalance_no_atm_returns_none() {
841 let agg = make_multi_strike_aggregator();
842 assert!(agg.check_rebalance(now()).is_none());
844 }
845
846 #[rstest]
847 fn test_check_rebalance_atm_unchanged_returns_none() {
848 let mut agg = make_multi_strike_aggregator();
849 set_atm_via_greeks(&mut agg, 50000.0);
851 let action = agg.check_rebalance(now()).unwrap();
853 agg.apply_rebalance(&action, now());
854
855 set_atm_via_greeks(&mut agg, 50200.0);
857 assert!(agg.check_rebalance(now()).is_none());
858 }
859
860 #[rstest]
861 fn test_check_rebalance_detects_atm_shift() {
862 let mut agg = make_multi_strike_aggregator();
863 set_atm_via_greeks(&mut agg, 50000.0);
865 let action = agg.check_rebalance(now()).unwrap();
866 agg.apply_rebalance(&action, now());
867 assert_eq!(agg.instrument_ids().len(), 6); set_atm_via_greeks(&mut agg, 55000.0);
872 let action2 = agg.check_rebalance(now()).unwrap();
873 assert!(!action2.add.is_empty() || !action2.remove.is_empty());
875 }
876
877 #[rstest]
878 fn test_apply_rebalance_updates_instrument_map() {
879 let mut agg = make_multi_strike_aggregator();
880 set_atm_via_greeks(&mut agg, 50000.0);
882 let action = agg.check_rebalance(now()).unwrap();
883 agg.apply_rebalance(&action, now());
884
885 let active_ids = agg.instrument_ids();
887 assert_eq!(active_ids.len(), 6); set_atm_via_greeks(&mut agg, 55000.0);
891 let action2 = agg.check_rebalance(now()).unwrap();
892 agg.apply_rebalance(&action2, now());
893
894 let active_ids2 = agg.instrument_ids();
896 assert_eq!(active_ids2.len(), 4); }
898
899 #[rstest]
900 fn test_apply_rebalance_cleans_buffers() {
901 let mut agg = make_multi_strike_aggregator();
902 set_atm_via_greeks(&mut agg, 50000.0);
904 let action = agg.check_rebalance(now()).unwrap();
905 agg.apply_rebalance(&action, now());
906
907 let call_47500 = InstrumentId::from("BTC-20240101-47500-C.DERIBIT");
909 let quote = make_quote(call_47500, "100.00", "101.00");
910 agg.update_quote("e);
911 assert_eq!(agg.call_buffer_len(), 1);
912
913 set_atm_via_greeks(&mut agg, 55000.0);
915 let action2 = agg.check_rebalance(now()).unwrap();
916 agg.apply_rebalance(&action2, now());
917
918 assert_eq!(agg.call_buffer_len(), 0);
920 }
921
922 #[rstest]
923 fn test_initial_active_set_empty_when_no_atm() {
924 let agg = make_multi_strike_aggregator();
925 assert_eq!(agg.instrument_ids().len(), 0);
927 assert_eq!(agg.all_instrument_ids().len(), 10);
928 }
929
930 #[rstest]
931 fn test_catalog_vs_active_separation() {
932 let mut agg = make_multi_strike_aggregator();
933 set_atm_via_greeks(&mut agg, 50000.0);
935 let action = agg.check_rebalance(now()).unwrap();
936 agg.apply_rebalance(&action, now());
937
938 assert_eq!(agg.instruments().len(), 10);
940 assert_eq!(agg.instrument_ids().len(), 6);
942 }
943
944 #[rstest]
947 fn test_add_instrument_already_known() {
948 let (mut agg, call_id, _) = make_aggregator();
949 let strike = Price::from("50000");
950 let count_before = agg.instruments().len();
951
952 let result = agg.add_instrument(call_id, strike, OptionKind::Call);
953
954 assert!(!result);
955 assert_eq!(agg.instruments().len(), count_before);
956 }
957
958 #[rstest]
959 fn test_add_instrument_new_in_active_range() {
960 let (mut agg, _, _) = make_aggregator();
961 let new_id = InstrumentId::from("BTC-20240101-50000-C2.DERIBIT");
963 let strike = Price::from("50000");
964
965 let result = agg.add_instrument(new_id, strike, OptionKind::Call);
966
967 assert!(result);
968 assert_eq!(agg.instruments().len(), 3);
969 assert!(agg.active_ids().contains(&new_id));
970 }
971
972 #[rstest]
973 fn test_add_instrument_new_out_of_range() {
974 let (mut agg, _, _) = make_aggregator();
975 let new_id = InstrumentId::from("BTC-20240101-60000-C.DERIBIT");
977 let strike = Price::from("60000");
978
979 let result = agg.add_instrument(new_id, strike, OptionKind::Call);
980
981 assert!(result);
982 assert_eq!(agg.instruments().len(), 3);
983 assert!(!agg.active_ids().contains(&new_id));
984 }
985
986 #[rstest]
987 fn test_add_instrument_available_for_rebalance() {
988 let mut agg = make_multi_strike_aggregator();
989 set_atm_via_greeks(&mut agg, 50000.0);
991 let action = agg.check_rebalance(now()).unwrap();
992 agg.apply_rebalance(&action, now());
993 assert_eq!(agg.instrument_ids().len(), 6);
995
996 let new_id = InstrumentId::from("BTC-20240101-57500-C.DERIBIT");
998 let strike = Price::from("57500");
999 let result = agg.add_instrument(new_id, strike, OptionKind::Call);
1000 assert!(result);
1001 assert!(!agg.active_ids().contains(&new_id));
1002
1003 set_atm_via_greeks(&mut agg, 57500.0);
1005 let action2 = agg.check_rebalance(now()).unwrap();
1006 agg.apply_rebalance(&action2, now());
1007
1008 assert!(agg.active_ids().contains(&new_id));
1009 }
1010
1011 #[rstest]
1014 fn test_hysteresis_blocks_small_movement() {
1015 let strikes = [47500, 50000, 52500];
1016 let mut instruments = HashMap::new();
1017
1018 for s in &strikes {
1019 let strike = Price::from(&s.to_string());
1020 let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
1021 instruments.insert(call_id, (strike, OptionKind::Call));
1022 }
1023 let tracker = AtmTracker::new();
1024 let mut agg = OptionChainAggregator::new(
1025 make_series_id(),
1026 StrikeRange::AtmRelative {
1027 strikes_above: 1,
1028 strikes_below: 1,
1029 },
1030 tracker,
1031 instruments,
1032 );
1033 agg.set_hysteresis(0.6);
1034 agg.set_cooldown_ns(0);
1035
1036 set_atm_via_greeks(&mut agg, 50000.0);
1038 let action = agg.check_rebalance(now()).unwrap();
1039 agg.apply_rebalance(&action, now());
1040 assert_eq!(agg.last_atm_strike(), Some(Price::from("50000")));
1041
1042 set_atm_via_greeks(&mut agg, 51000.0);
1045 assert!(agg.check_rebalance(now()).is_none());
1046 }
1047
1048 #[rstest]
1049 fn test_hysteresis_allows_large_movement() {
1050 let strikes = [47500, 50000, 52500];
1051 let mut instruments = HashMap::new();
1052
1053 for s in &strikes {
1054 let strike = Price::from(&s.to_string());
1055 let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
1056 instruments.insert(call_id, (strike, OptionKind::Call));
1057 }
1058 let tracker = AtmTracker::new();
1059 let mut agg = OptionChainAggregator::new(
1060 make_series_id(),
1061 StrikeRange::AtmRelative {
1062 strikes_above: 1,
1063 strikes_below: 1,
1064 },
1065 tracker,
1066 instruments,
1067 );
1068 agg.set_hysteresis(0.6);
1069 agg.set_cooldown_ns(0);
1070
1071 set_atm_via_greeks(&mut agg, 50000.0);
1073 let action = agg.check_rebalance(now()).unwrap();
1074 agg.apply_rebalance(&action, now());
1075
1076 set_atm_via_greeks(&mut agg, 52000.0);
1078 assert!(agg.check_rebalance(now()).is_some());
1079 }
1080
1081 #[rstest]
1082 fn test_zero_hysteresis_disables_guard() {
1083 let mut agg = make_multi_strike_aggregator();
1084 agg.set_hysteresis(0.0);
1085 agg.set_cooldown_ns(0);
1086
1087 set_atm_via_greeks(&mut agg, 50000.0);
1088 let action = agg.check_rebalance(now()).unwrap();
1089 agg.apply_rebalance(&action, now());
1090
1091 set_atm_via_greeks(&mut agg, 52500.0);
1093 assert!(agg.check_rebalance(now()).is_some());
1094 }
1095
1096 #[rstest]
1099 fn test_cooldown_blocks_rapid_rebalance() {
1100 let mut agg = make_multi_strike_aggregator();
1101 agg.set_hysteresis(0.0);
1102 agg.set_cooldown_ns(5_000_000_000); set_atm_via_greeks(&mut agg, 50000.0);
1105 let t0 = now();
1106 let action = agg.check_rebalance(t0).unwrap();
1107 agg.apply_rebalance(&action, t0);
1108
1109 set_atm_via_greeks(&mut agg, 55000.0);
1111 let t1 = UnixNanos::from(t0.as_u64() + 1_000_000_000); assert!(agg.check_rebalance(t1).is_none());
1113 }
1114
1115 #[rstest]
1116 fn test_cooldown_allows_after_elapsed() {
1117 let mut agg = make_multi_strike_aggregator();
1118 agg.set_hysteresis(0.0);
1119 agg.set_cooldown_ns(5_000_000_000); set_atm_via_greeks(&mut agg, 50000.0);
1122 let t0 = now();
1123 let action = agg.check_rebalance(t0).unwrap();
1124 agg.apply_rebalance(&action, t0);
1125
1126 set_atm_via_greeks(&mut agg, 55000.0);
1128 let t1 = UnixNanos::from(t0.as_u64() + 6_000_000_000); assert!(agg.check_rebalance(t1).is_some());
1130 }
1131
1132 #[rstest]
1133 fn test_zero_cooldown_disables_guard() {
1134 let mut agg = make_multi_strike_aggregator();
1135 agg.set_hysteresis(0.0);
1136 agg.set_cooldown_ns(0);
1137
1138 set_atm_via_greeks(&mut agg, 50000.0);
1139 let t0 = now();
1140 let action = agg.check_rebalance(t0).unwrap();
1141 agg.apply_rebalance(&action, t0);
1142
1143 set_atm_via_greeks(&mut agg, 55000.0);
1145 assert!(agg.check_rebalance(t0).is_some());
1146 }
1147
1148 #[rstest]
1151 fn test_pending_greeks_consumed_on_first_quote() {
1152 let (mut agg, call_id, _) = make_aggregator();
1153
1154 let greeks = OptionGreeks {
1156 instrument_id: call_id,
1157 greeks: OptionGreekValues {
1158 delta: 0.55,
1159 ..Default::default()
1160 },
1161 ..Default::default()
1162 };
1163 agg.update_greeks(&greeks);
1164 assert_eq!(agg.pending_greeks_count(), 1);
1165
1166 let quote = make_quote(call_id, "100.00", "101.00");
1168 agg.update_quote("e);
1169 assert_eq!(agg.pending_greeks_count(), 0);
1170
1171 let strike = Price::from("50000");
1173 let data = agg.get_call_greeks_from_buffer(&strike);
1174 assert!(data.is_some());
1175 assert_eq!(data.unwrap().delta, 0.55);
1176 }
1177
1178 #[rstest]
1181 fn test_snapshot_ts_event_reflects_max_quote_timestamp() {
1182 let (mut agg, call_id, put_id) = make_aggregator();
1183
1184 let quote1 = QuoteTick::new(
1185 call_id,
1186 Price::from("100.00"),
1187 Price::from("101.00"),
1188 Quantity::from("1.0"),
1189 Quantity::from("1.0"),
1190 UnixNanos::from(500u64), UnixNanos::from(500u64),
1192 );
1193 agg.update_quote("e1);
1194
1195 let quote2 = QuoteTick::new(
1196 put_id,
1197 Price::from("50.00"),
1198 Price::from("51.00"),
1199 Quantity::from("1.0"),
1200 Quantity::from("1.0"),
1201 UnixNanos::from(800u64), UnixNanos::from(800u64),
1203 );
1204 agg.update_quote("e2);
1205
1206 let slice = agg.snapshot(UnixNanos::from(1000u64));
1207 assert_eq!(slice.ts_event, UnixNanos::from(800u64));
1208 assert_eq!(slice.ts_init, UnixNanos::from(1000u64));
1209 }
1210
1211 #[rstest]
1212 fn test_snapshot_ts_event_fallback_when_no_quotes() {
1213 let (agg, _, _) = make_aggregator();
1214 let slice = agg.snapshot(UnixNanos::from(1000u64));
1215 assert_eq!(slice.ts_event, UnixNanos::from(1000u64));
1217 }
1218
1219 #[rstest]
1220 fn test_snapshot_retains_buffered_data_during_hysteresis_window() {
1221 let strikes = [47500, 50000, 52500];
1223 let mut instruments = HashMap::new();
1224
1225 for s in &strikes {
1226 let strike = Price::from(&s.to_string());
1227 let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
1228 instruments.insert(call_id, (strike, OptionKind::Call));
1229 }
1230 let tracker = AtmTracker::new();
1231 let mut agg = OptionChainAggregator::new(
1232 make_series_id(),
1233 StrikeRange::AtmRelative {
1234 strikes_above: 1,
1235 strikes_below: 1,
1236 },
1237 tracker,
1238 instruments,
1239 );
1240 agg.set_hysteresis(0.6);
1241 agg.set_cooldown_ns(0);
1242
1243 set_atm_via_greeks(&mut agg, 50000.0);
1245 let action = agg.check_rebalance(now()).unwrap();
1246 agg.apply_rebalance(&action, now());
1247 assert_eq!(agg.instrument_ids().len(), 3);
1248
1249 let q1 = make_quote(
1251 InstrumentId::from("BTC-20240101-47500-C.DERIBIT"),
1252 "3000.00",
1253 "3100.00",
1254 );
1255 let q2 = make_quote(
1256 InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
1257 "1500.00",
1258 "1600.00",
1259 );
1260 let q3 = make_quote(
1261 InstrumentId::from("BTC-20240101-52500-C.DERIBIT"),
1262 "500.00",
1263 "600.00",
1264 );
1265 agg.update_quote(&q1);
1266 agg.update_quote(&q2);
1267 agg.update_quote(&q3);
1268 assert_eq!(agg.call_buffer_len(), 3);
1269
1270 set_atm_via_greeks(&mut agg, 51000.0);
1272 assert!(agg.check_rebalance(now()).is_none());
1273
1274 let slice = agg.snapshot(UnixNanos::from(100u64));
1276 assert_eq!(slice.call_count(), 3);
1277 }
1278
1279 #[rstest]
1280 fn test_remove_instrument_from_catalog() {
1281 let (mut agg, call_id, put_id) = make_aggregator();
1282 assert_eq!(agg.instruments().len(), 2);
1283
1284 let removed = agg.remove_instrument(&call_id);
1285 assert!(removed);
1286 assert_eq!(agg.instruments().len(), 1);
1287 assert!(!agg.active_ids().contains(&call_id));
1288 assert!(agg.instruments().contains_key(&put_id));
1289 }
1290
1291 #[rstest]
1292 fn test_remove_instrument_cleans_buffer() {
1293 let (mut agg, call_id, _) = make_aggregator();
1294 let quote = make_quote(call_id, "100.00", "101.00");
1295 agg.update_quote("e);
1296 assert_eq!(agg.call_buffer_len(), 1);
1297
1298 let _ = agg.remove_instrument(&call_id);
1299 assert_eq!(agg.call_buffer_len(), 0);
1301 }
1302
1303 #[rstest]
1304 fn test_remove_instrument_preserves_sibling_buffer() {
1305 let (mut agg, call_id, _) = make_aggregator();
1306 let sibling_id = InstrumentId::from("BTC-20240101-50000-C2.DERIBIT");
1308 let strike = Price::from("50000");
1309 let _ = agg.add_instrument(sibling_id, strike, OptionKind::Call);
1310
1311 let quote = make_quote(call_id, "100.00", "101.00");
1312 agg.update_quote("e);
1313 assert_eq!(agg.call_buffer_len(), 1);
1314
1315 let _ = agg.remove_instrument(&call_id);
1317 assert_eq!(agg.call_buffer_len(), 1); assert!(agg.instruments().contains_key(&sibling_id));
1319 }
1320
1321 #[rstest]
1322 fn test_remove_instrument_unknown_noop() {
1323 let (mut agg, _, _) = make_aggregator();
1324 let unknown = InstrumentId::from("ETH-20240101-3000-C.DERIBIT");
1325 assert!(!agg.remove_instrument(&unknown));
1326 assert_eq!(agg.instruments().len(), 2);
1327 }
1328
1329 #[rstest]
1330 fn test_remove_instrument_cleans_pending_greeks() {
1331 let (mut agg, call_id, _) = make_aggregator();
1332 let greeks = OptionGreeks {
1333 instrument_id: call_id,
1334 greeks: OptionGreekValues {
1335 delta: 0.55,
1336 ..Default::default()
1337 },
1338 ..Default::default()
1339 };
1340 agg.update_greeks(&greeks);
1341 assert_eq!(agg.pending_greeks_count(), 1);
1342
1343 let _ = agg.remove_instrument(&call_id);
1344 assert_eq!(agg.pending_greeks_count(), 0);
1345 }
1346
1347 #[rstest]
1348 fn test_is_catalog_empty_after_full_removal() {
1349 let (mut agg, call_id, put_id) = make_aggregator();
1350 assert!(!agg.is_catalog_empty());
1351
1352 let _ = agg.remove_instrument(&call_id);
1353 assert!(!agg.is_catalog_empty());
1354
1355 let _ = agg.remove_instrument(&put_id);
1356 assert!(agg.is_catalog_empty());
1357 }
1358
1359 #[rstest]
1362 fn test_expired_quote_is_dropped() {
1363 let (mut agg, call_id, _) = make_aggregator();
1364 let expired_quote = QuoteTick::new(
1366 call_id,
1367 Price::from("100.00"),
1368 Price::from("101.00"),
1369 Quantity::from("1.0"),
1370 Quantity::from("1.0"),
1371 UnixNanos::from(1_700_000_000_000_000_000u64),
1372 UnixNanos::from(1_700_000_000_000_000_000u64),
1373 );
1374 agg.update_quote(&expired_quote);
1375 assert!(agg.is_buffer_empty());
1376 }
1377
1378 #[rstest]
1379 fn test_expired_greeks_are_dropped() {
1380 let (mut agg, call_id, _) = make_aggregator();
1381 let quote = make_quote(call_id, "100.00", "101.00");
1383 agg.update_quote("e);
1384 assert_eq!(agg.call_buffer_len(), 1);
1385
1386 let greeks = OptionGreeks {
1388 instrument_id: call_id,
1389 ts_event: UnixNanos::from(1_700_000_000_000_000_000u64),
1390 greeks: OptionGreekValues {
1391 delta: 0.55,
1392 ..Default::default()
1393 },
1394 ..Default::default()
1395 };
1396 agg.update_greeks(&greeks);
1397
1398 let strike = Price::from("50000");
1399 assert!(agg.get_call_greeks_from_buffer(&strike).is_none());
1400 }
1401
1402 fn make_delta_aggregator(
1408 strikes: &[i64],
1409 target: f64,
1410 tolerance: f64,
1411 ) -> OptionChainAggregator {
1412 let mut instruments = HashMap::new();
1413
1414 for s in strikes {
1415 let strike = Price::from(&s.to_string());
1416 instruments.insert(option_id(*s, OptionKind::Call), (strike, OptionKind::Call));
1417 instruments.insert(option_id(*s, OptionKind::Put), (strike, OptionKind::Put));
1418 }
1419 let tracker = AtmTracker::new();
1420 let mut agg = OptionChainAggregator::new(
1421 make_series_id(),
1422 StrikeRange::Delta { target, tolerance },
1423 tracker,
1424 instruments,
1425 );
1426 agg.set_hysteresis(0.0);
1427 agg.set_cooldown_ns(0);
1428 agg
1429 }
1430
1431 fn option_id(strike: i64, kind: OptionKind) -> InstrumentId {
1432 let suffix = match kind {
1433 OptionKind::Call => "C",
1434 OptionKind::Put => "P",
1435 };
1436 InstrumentId::from(&format!("BTC-20240101-{strike}-{suffix}.DERIBIT"))
1437 }
1438
1439 fn feed_quote_and_greeks(
1441 agg: &mut OptionChainAggregator,
1442 strike: i64,
1443 kind: OptionKind,
1444 delta: f64,
1445 ) {
1446 let id = option_id(strike, kind);
1447 agg.update_quote(&make_quote(id, "100.00", "101.00"));
1448 agg.update_greeks(&OptionGreeks {
1449 instrument_id: id,
1450 greeks: OptionGreekValues {
1451 delta,
1452 ..Default::default()
1453 },
1454 ..Default::default()
1455 });
1456 }
1457
1458 #[rstest]
1459 #[case(0.30, 0.30, 0.03, true)] #[case(-0.30, 0.30, 0.03, true)] #[case(0.28, 0.30, 0.03, true)] #[case(0.32, 0.30, 0.03, true)] #[case(0.20, 0.30, 0.03, false)] #[case(0.40, 0.30, 0.03, false)] fn test_delta_within_band(
1466 #[case] delta: f64,
1467 #[case] target: f64,
1468 #[case] tolerance: f64,
1469 #[case] expected: bool,
1470 ) {
1471 assert_eq!(
1472 OptionChainAggregator::delta_within_band(delta, target, tolerance),
1473 expected
1474 );
1475 }
1476
1477 #[rstest]
1478 fn test_delta_target_hit() {
1479 let strikes = [40000, 45000, 50000, 55000, 60000];
1480 let mut agg = make_delta_aggregator(&strikes, 0.30, 0.03);
1481 set_atm_via_greeks(&mut agg, 50000.0);
1483 agg.recompute_active_set();
1484 assert_eq!(agg.instrument_ids().len(), 10); feed_quote_and_greeks(&mut agg, 40000, OptionKind::Call, 0.95);
1488 feed_quote_and_greeks(&mut agg, 45000, OptionKind::Call, 0.80);
1489 feed_quote_and_greeks(&mut agg, 50000, OptionKind::Call, 0.55);
1490 feed_quote_and_greeks(&mut agg, 55000, OptionKind::Call, 0.30);
1491 feed_quote_and_greeks(&mut agg, 60000, OptionKind::Call, 0.12);
1492
1493 let active = agg.recompute_active_set();
1494 assert_eq!(active.len(), 2); assert!(active.contains(&option_id(55000, OptionKind::Call)));
1496 assert!(active.contains(&option_id(55000, OptionKind::Put)));
1497 }
1498
1499 #[rstest]
1500 fn test_delta_tolerance_band() {
1501 let strikes = [45000, 50000, 55000, 60000];
1502 let mut agg = make_delta_aggregator(&strikes, 0.30, 0.05);
1503 set_atm_via_greeks(&mut agg, 50000.0);
1504 agg.recompute_active_set();
1505
1506 feed_quote_and_greeks(&mut agg, 45000, OptionKind::Call, 0.50);
1508 feed_quote_and_greeks(&mut agg, 50000, OptionKind::Call, 0.32);
1509 feed_quote_and_greeks(&mut agg, 55000, OptionKind::Call, 0.30);
1510 feed_quote_and_greeks(&mut agg, 60000, OptionKind::Call, 0.10);
1511
1512 let active = agg.recompute_active_set();
1513 assert_eq!(active.len(), 4); assert!(active.contains(&option_id(50000, OptionKind::Call)));
1515 assert!(active.contains(&option_id(55000, OptionKind::Call)));
1516 assert!(!active.contains(&option_id(45000, OptionKind::Call)));
1517 assert!(!active.contains(&option_id(60000, OptionKind::Call)));
1518 }
1519
1520 #[rstest]
1521 fn test_delta_matches_put_by_magnitude() {
1522 let strikes = [45000, 50000, 55000];
1523 let mut agg = make_delta_aggregator(&strikes, 0.30, 0.03);
1524 set_atm_via_greeks(&mut agg, 50000.0);
1525 agg.recompute_active_set();
1526
1527 feed_quote_and_greeks(&mut agg, 45000, OptionKind::Put, -0.30);
1529
1530 let active = agg.recompute_active_set();
1531 assert_eq!(active.len(), 2);
1533 assert!(active.contains(&option_id(45000, OptionKind::Put)));
1534 assert!(active.contains(&option_id(45000, OptionKind::Call)));
1535 }
1536
1537 #[rstest]
1538 fn test_delta_no_greeks_falls_back_to_atm_window() {
1539 let strikes: Vec<i64> = (0..13).map(|i| 40000 + i * 1000).collect();
1541 let mut agg = make_delta_aggregator(&strikes, 0.25, 0.05);
1542 set_atm_via_greeks(&mut agg, 46000.0); let active = agg.recompute_active_set();
1545
1546 assert!(active.len() > 2);
1549 assert!(active.len() < strikes.len() * 2);
1550 assert!(active.contains(&option_id(46000, OptionKind::Call))); assert!(!active.contains(&option_id(40000, OptionKind::Call))); assert!(!active.contains(&option_id(52000, OptionKind::Call))); }
1554
1555 #[rstest]
1556 fn test_delta_pending_only_greeks_eligible() {
1557 let strikes = [45000, 50000, 55000];
1558 let mut agg = make_delta_aggregator(&strikes, 0.30, 0.03);
1559 set_atm_via_greeks(&mut agg, 50000.0);
1560 agg.recompute_active_set();
1561
1562 agg.update_greeks(&OptionGreeks {
1564 instrument_id: option_id(55000, OptionKind::Call),
1565 greeks: OptionGreekValues {
1566 delta: 0.30,
1567 ..Default::default()
1568 },
1569 ..Default::default()
1570 });
1571 assert_eq!(agg.pending_greeks_count(), 1);
1572
1573 let active = agg.recompute_active_set();
1574 assert_eq!(active.len(), 2);
1576 assert!(active.contains(&option_id(55000, OptionKind::Call)));
1577 }
1578
1579 #[rstest]
1580 fn test_delta_rebalances_on_greeks_with_atm_unchanged() {
1581 let strikes = [45000, 50000, 55000];
1582 let mut agg = make_delta_aggregator(&strikes, 0.30, 0.03);
1583 set_atm_via_greeks(&mut agg, 50000.0);
1584 agg.recompute_active_set();
1585 assert_eq!(agg.last_atm_strike(), Some(Price::from("50000")));
1586 assert_eq!(agg.instrument_ids().len(), 6); feed_quote_and_greeks(&mut agg, 45000, OptionKind::Call, 0.55);
1590 feed_quote_and_greeks(&mut agg, 50000, OptionKind::Call, 0.45);
1591 feed_quote_and_greeks(&mut agg, 55000, OptionKind::Call, 0.30);
1592
1593 let action = agg
1594 .check_rebalance(now())
1595 .expect("delta range should rebalance when greeks narrow the set");
1596 assert!(action.add.is_empty());
1597 assert!(!action.remove.is_empty());
1598
1599 agg.apply_rebalance(&action, now());
1600 assert_eq!(agg.instrument_ids().len(), 2); assert!(
1602 agg.active_ids()
1603 .contains(&option_id(55000, OptionKind::Call))
1604 );
1605 }
1606
1607 #[rstest]
1608 fn test_delta_no_op_rebalance_returns_none() {
1609 let strikes = [45000, 50000, 55000];
1610 let mut agg = make_delta_aggregator(&strikes, 0.30, 0.03);
1611 set_atm_via_greeks(&mut agg, 50000.0);
1612 agg.recompute_active_set();
1613 feed_quote_and_greeks(&mut agg, 45000, OptionKind::Call, 0.55);
1614 feed_quote_and_greeks(&mut agg, 50000, OptionKind::Call, 0.45);
1615 feed_quote_and_greeks(&mut agg, 55000, OptionKind::Call, 0.30);
1616
1617 let action = agg.check_rebalance(now()).unwrap();
1619 agg.apply_rebalance(&action, now());
1620 assert_eq!(agg.instrument_ids().len(), 2);
1621
1622 assert!(agg.check_rebalance(now()).is_none());
1624 }
1625}