1use std::collections::{BTreeMap, HashMap, HashSet};
19
20use nautilus_core::UnixNanos;
21use nautilus_model::{
22 data::{
23 QuoteTick,
24 option_chain::{OptionChainSlice, OptionGreeks, OptionStrikeData, StrikeRange},
25 },
26 enums::OptionKind,
27 identifiers::{InstrumentId, OptionSeriesId},
28 types::Price,
29};
30
31use super::{
32 AtmTracker,
33 constants::{DEFAULT_REBALANCE_COOLDOWN_NS, DEFAULT_REBALANCE_HYSTERESIS},
34};
35
36#[derive(Debug)]
41pub struct OptionChainAggregator {
42 series_id: OptionSeriesId,
44 strike_range: StrikeRange,
46 atm_tracker: AtmTracker,
48 instruments: HashMap<InstrumentId, (Price, OptionKind)>,
51 active_ids: HashSet<InstrumentId>,
53 last_atm_strike: Option<Price>,
55 hysteresis: f64,
57 cooldown_ns: u64,
59 last_rebalance_ns: Option<UnixNanos>,
61 max_ts_event: UnixNanos,
63 pending_greeks: HashMap<InstrumentId, OptionGreeks>,
65 call_buffer: BTreeMap<Price, OptionStrikeData>,
67 put_buffer: BTreeMap<Price, OptionStrikeData>,
69}
70
71impl OptionChainAggregator {
72 pub fn new(
79 series_id: OptionSeriesId,
80 strike_range: StrikeRange,
81 atm_tracker: AtmTracker,
82 instruments: HashMap<InstrumentId, (Price, OptionKind)>,
83 ) -> Self {
84 let all_strikes = Self::sorted_strikes(&instruments);
85 let atm_price = atm_tracker.atm_price();
86 let active_strikes: HashSet<Price> = strike_range
87 .resolve(atm_price, &all_strikes)
88 .into_iter()
89 .collect();
90 let active_ids: HashSet<InstrumentId> = instruments
91 .iter()
92 .filter(|(_, (strike, _))| active_strikes.contains(strike))
93 .map(|(id, _)| *id)
94 .collect();
95 let last_atm_strike =
96 atm_price.and_then(|atm| Self::find_closest_strike(&all_strikes, atm));
97
98 Self {
99 series_id,
100 strike_range,
101 atm_tracker,
102 instruments,
103 active_ids,
104 last_atm_strike,
105 hysteresis: DEFAULT_REBALANCE_HYSTERESIS,
106 cooldown_ns: DEFAULT_REBALANCE_COOLDOWN_NS,
107 last_rebalance_ns: None,
108 max_ts_event: UnixNanos::default(),
109 pending_greeks: HashMap::new(),
110 call_buffer: BTreeMap::new(),
111 put_buffer: BTreeMap::new(),
112 }
113 }
114
115 pub fn atm_tracker_mut(&mut self) -> &mut AtmTracker {
117 &mut self.atm_tracker
118 }
119
120 #[must_use]
122 pub fn instrument_ids(&self) -> Vec<InstrumentId> {
123 self.active_ids.iter().copied().collect()
124 }
125
126 #[must_use]
128 pub fn active_ids(&self) -> &HashSet<InstrumentId> {
129 &self.active_ids
130 }
131
132 #[must_use]
134 pub fn series_id(&self) -> OptionSeriesId {
135 self.series_id
136 }
137
138 #[must_use]
140 pub fn is_expired(&self, now_ns: UnixNanos) -> bool {
141 now_ns >= self.series_id.expiration_ns
142 }
143
144 #[must_use]
146 pub fn instruments(&self) -> &HashMap<InstrumentId, (Price, OptionKind)> {
147 &self.instruments
148 }
149
150 #[must_use]
152 pub fn all_instrument_ids(&self) -> Vec<InstrumentId> {
153 self.instruments.keys().copied().collect()
154 }
155
156 #[must_use]
158 pub fn is_catalog_empty(&self) -> bool {
159 self.instruments.is_empty()
160 }
161
162 #[must_use]
168 pub fn remove_instrument(&mut self, instrument_id: &InstrumentId) -> bool {
169 let Some((strike, kind)) = self.instruments.remove(instrument_id) else {
170 return false;
171 };
172
173 self.active_ids.remove(instrument_id);
174 self.pending_greeks.remove(instrument_id);
175
176 let has_sibling = self
178 .instruments
179 .values()
180 .any(|(s, k)| *s == strike && *k == kind);
181
182 if !has_sibling {
183 let buffer = match kind {
184 OptionKind::Call => &mut self.call_buffer,
185 OptionKind::Put => &mut self.put_buffer,
186 };
187 buffer.remove(&strike);
188 }
189
190 true
191 }
192
193 #[must_use]
195 pub fn atm_tracker(&self) -> &AtmTracker {
196 &self.atm_tracker
197 }
198
199 pub fn recompute_active_set(&mut self) -> Vec<InstrumentId> {
204 let atm_price = self.atm_tracker.atm_price();
205 let all_strikes = Self::sorted_strikes(&self.instruments);
206 let active_strikes: HashSet<Price> = self
207 .strike_range
208 .resolve(atm_price, &all_strikes)
209 .into_iter()
210 .collect();
211 self.active_ids = self
212 .instruments
213 .iter()
214 .filter(|(_, (strike, _))| active_strikes.contains(strike))
215 .map(|(id, _)| *id)
216 .collect();
217 self.last_atm_strike =
218 atm_price.and_then(|atm| Self::find_closest_strike(&all_strikes, atm));
219 self.active_ids.iter().copied().collect()
220 }
221
222 #[must_use]
229 pub fn add_instrument(
230 &mut self,
231 instrument_id: InstrumentId,
232 strike: Price,
233 kind: OptionKind,
234 ) -> bool {
235 if self.instruments.contains_key(&instrument_id) {
236 return false;
237 }
238
239 self.instruments.insert(instrument_id, (strike, kind));
240
241 let all_strikes = Self::sorted_strikes(&self.instruments);
243 let atm_price = self.atm_tracker.atm_price();
244 let active_strikes: HashSet<Price> = self
245 .strike_range
246 .resolve(atm_price, &all_strikes)
247 .into_iter()
248 .collect();
249
250 if active_strikes.contains(&strike) {
251 self.active_ids.insert(instrument_id);
252 }
253
254 true
255 }
256
257 fn sorted_strikes(instruments: &HashMap<InstrumentId, (Price, OptionKind)>) -> Vec<Price> {
259 let mut strikes: Vec<Price> = instruments.values().map(|(s, _)| *s).collect();
260 strikes.sort();
261 strikes.dedup();
262 strikes
263 }
264
265 fn find_closest_strike(all_strikes: &[Price], atm: Price) -> Option<Price> {
267 all_strikes
268 .iter()
269 .min_by(|a, b| {
270 let da = (a.as_f64() - atm.as_f64()).abs();
271 let db = (b.as_f64() - atm.as_f64()).abs();
272 da.partial_cmp(&db).unwrap_or(std::cmp::Ordering::Equal)
273 })
274 .copied()
275 }
276
277 pub fn update_quote(&mut self, quote: &QuoteTick) {
279 if self.is_expired(quote.ts_event) {
280 log::warn!(
281 "Dropping quote for {}, series {} expired at {}",
282 quote.instrument_id,
283 self.series_id,
284 self.series_id.expiration_ns,
285 );
286 return;
287 }
288
289 if !self.active_ids.contains("e.instrument_id) {
290 return;
291 }
292
293 if let Some(&(strike, kind)) = self.instruments.get("e.instrument_id) {
294 if quote.ts_event > self.max_ts_event {
296 self.max_ts_event = quote.ts_event;
297 }
298
299 let buffer = match kind {
300 OptionKind::Call => &mut self.call_buffer,
301 OptionKind::Put => &mut self.put_buffer,
302 };
303 match buffer.get_mut(&strike) {
304 Some(data) => data.quote = *quote,
305 None => {
306 let greeks = self.pending_greeks.remove("e.instrument_id);
308 buffer.insert(
309 strike,
310 OptionStrikeData {
311 quote: *quote,
312 greeks,
313 },
314 );
315 }
316 }
317 }
318 }
319
320 pub fn update_greeks(&mut self, greeks: &OptionGreeks) {
326 if self.is_expired(greeks.ts_event) {
327 log::warn!(
328 "Dropping greeks for {}, series {} expired at {}",
329 greeks.instrument_id,
330 self.series_id,
331 self.series_id.expiration_ns,
332 );
333 return;
334 }
335
336 if !self.active_ids.contains(&greeks.instrument_id) {
337 return;
338 }
339
340 if let Some(&(strike, kind)) = self.instruments.get(&greeks.instrument_id) {
341 let buffer = match kind {
342 OptionKind::Call => &mut self.call_buffer,
343 OptionKind::Put => &mut self.put_buffer,
344 };
345 match buffer.get_mut(&strike) {
346 Some(data) => data.greeks = Some(*greeks),
347 None => {
348 self.pending_greeks.insert(greeks.instrument_id, *greeks);
350 }
351 }
352 }
353 }
354
355 pub fn snapshot(&self, ts_init: UnixNanos) -> OptionChainSlice {
364 let atm_price = self.atm_tracker.atm_price();
365
366 let catalog_strikes = Self::sorted_strikes(&self.instruments);
368 let atm_strike = atm_price.and_then(|atm| Self::find_closest_strike(&catalog_strikes, atm));
369
370 let active_strikes: HashSet<Price> = self
374 .active_ids
375 .iter()
376 .filter_map(|id| self.instruments.get(id).map(|(s, _)| *s))
377 .collect();
378
379 let mut calls = BTreeMap::new();
381 for (strike, data) in &self.call_buffer {
382 if active_strikes.contains(strike) {
383 calls.insert(*strike, data.clone());
384 }
385 }
386 let mut puts = BTreeMap::new();
387 for (strike, data) in &self.put_buffer {
388 if active_strikes.contains(strike) {
389 puts.insert(*strike, data.clone());
390 }
391 }
392
393 let ts_event = if self.max_ts_event == UnixNanos::default() {
395 ts_init
396 } else {
397 self.max_ts_event
398 };
399
400 OptionChainSlice {
401 series_id: self.series_id,
402 atm_strike,
403 calls,
404 puts,
405 ts_event,
406 ts_init,
407 }
408 }
409
410 #[must_use]
412 pub fn is_buffer_empty(&self) -> bool {
413 self.call_buffer.is_empty() && self.put_buffer.is_empty()
414 }
415
416 #[must_use]
423 pub fn check_rebalance(&self, now_ns: UnixNanos) -> Option<RebalanceAction> {
424 if matches!(self.strike_range, StrikeRange::Fixed(_)) {
426 return None;
427 }
428
429 let atm_price = self.atm_tracker.atm_price()?;
430 let all_strikes = Self::sorted_strikes(&self.instruments);
431 let current_atm_strike = Self::find_closest_strike(&all_strikes, atm_price)?;
432
433 if self.last_atm_strike == Some(current_atm_strike) {
435 return None;
436 }
437
438 if let Some(last_strike) = self.last_atm_strike
440 && self.hysteresis > 0.0
441 {
442 let last_f = last_strike.as_f64();
443 let atm_f = atm_price.as_f64();
444 let direction = atm_f - last_f;
445
446 let next_strike = if direction > 0.0 {
448 all_strikes.iter().find(|s| s.as_f64() > last_f)
449 } else {
450 all_strikes.iter().rev().find(|s| s.as_f64() < last_f)
451 };
452
453 if let Some(next) = next_strike {
454 let gap = (next.as_f64() - last_f).abs();
455 let threshold = last_f + direction.signum() * self.hysteresis * gap;
456 if direction > 0.0 && atm_f < threshold {
458 return None;
459 }
460
461 if direction < 0.0 && atm_f > threshold {
462 return None;
463 }
464 }
465 }
466
467 if self.cooldown_ns > 0
469 && let Some(last_ts) = self.last_rebalance_ns
470 && now_ns.as_u64().saturating_sub(last_ts.as_u64()) < self.cooldown_ns
471 {
472 return None;
473 }
474
475 let new_active_strikes: HashSet<Price> = self
477 .strike_range
478 .resolve(Some(atm_price), &all_strikes)
479 .into_iter()
480 .collect();
481 let new_active: HashSet<InstrumentId> = self
482 .instruments
483 .iter()
484 .filter(|(_, (s, _))| new_active_strikes.contains(s))
485 .map(|(id, _)| *id)
486 .collect();
487
488 let add = new_active.difference(&self.active_ids).copied().collect();
489 let remove = self.active_ids.difference(&new_active).copied().collect();
490
491 Some(RebalanceAction { add, remove })
492 }
493
494 pub fn apply_rebalance(&mut self, action: &RebalanceAction, now_ns: UnixNanos) {
497 for id in &action.add {
498 self.active_ids.insert(*id);
499 }
500 for id in &action.remove {
501 self.active_ids.remove(id);
502 }
503
504 let active_strikes: HashSet<Price> = self
506 .active_ids
507 .iter()
508 .filter_map(|id| self.instruments.get(id))
509 .map(|(s, _)| *s)
510 .collect();
511 self.call_buffer
512 .retain(|strike, _| active_strikes.contains(strike));
513 self.put_buffer
514 .retain(|strike, _| active_strikes.contains(strike));
515 self.pending_greeks
516 .retain(|id, _| self.active_ids.contains(id));
517
518 if let Some(atm) = self.atm_tracker.atm_price() {
520 let all_strikes = Self::sorted_strikes(&self.instruments);
521 self.last_atm_strike = Self::find_closest_strike(&all_strikes, atm);
522 }
523 self.last_rebalance_ns = Some(now_ns);
524 }
525}
526
527#[derive(Clone, Debug, PartialEq, Eq)]
529pub struct RebalanceAction {
530 pub add: Vec<InstrumentId>,
532 pub remove: Vec<InstrumentId>,
534}
535
536#[cfg(test)]
537impl OptionChainAggregator {
538 fn call_buffer_len(&self) -> usize {
539 self.call_buffer.len()
540 }
541
542 fn put_buffer_len(&self) -> usize {
543 self.put_buffer.len()
544 }
545
546 fn get_call_greeks_from_buffer(&self, strike: &Price) -> Option<&OptionGreeks> {
547 self.call_buffer.get(strike).and_then(|d| d.greeks.as_ref())
548 }
549
550 pub(crate) fn last_atm_strike(&self) -> Option<Price> {
551 self.last_atm_strike
552 }
553
554 fn set_hysteresis(&mut self, h: f64) {
555 self.hysteresis = h;
556 }
557
558 fn set_cooldown_ns(&mut self, ns: u64) {
559 self.cooldown_ns = ns;
560 }
561
562 fn pending_greeks_count(&self) -> usize {
563 self.pending_greeks.len()
564 }
565}
566
567#[cfg(test)]
568mod tests {
569 use nautilus_model::{data::greeks::OptionGreekValues, identifiers::Venue, types::Quantity};
570 use rstest::*;
571
572 use super::*;
573
574 fn make_series_id() -> OptionSeriesId {
575 OptionSeriesId::new(
576 Venue::new("DERIBIT"),
577 ustr::Ustr::from("BTC"),
578 ustr::Ustr::from("BTC"),
579 UnixNanos::from(1_700_000_000_000_000_000u64),
580 )
581 }
582
583 fn make_quote(instrument_id: InstrumentId, bid: &str, ask: &str) -> QuoteTick {
584 QuoteTick::new(
585 instrument_id,
586 Price::from(bid),
587 Price::from(ask),
588 Quantity::from("1.0"),
589 Quantity::from("1.0"),
590 UnixNanos::from(1u64),
591 UnixNanos::from(1u64),
592 )
593 }
594
595 fn now() -> UnixNanos {
596 UnixNanos::from(1_000_000_000_000_000_000u64)
598 }
599
600 fn set_atm_via_greeks(agg: &mut OptionChainAggregator, price: f64) {
602 let greeks = OptionGreeks {
603 instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
604 underlying_price: Some(price),
605 ..Default::default()
606 };
607 agg.atm_tracker_mut().update_from_option_greeks(&greeks);
608 }
609
610 fn make_aggregator() -> (OptionChainAggregator, InstrumentId, InstrumentId) {
611 let call_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
612 let put_id = InstrumentId::from("BTC-20240101-50000-P.DERIBIT");
613 let strike = Price::from("50000");
614
615 let mut instrument_map = HashMap::new();
616 instrument_map.insert(call_id, (strike, OptionKind::Call));
617 instrument_map.insert(put_id, (strike, OptionKind::Put));
618
619 let tracker = AtmTracker::new();
620 let agg = OptionChainAggregator::new(
621 make_series_id(),
622 StrikeRange::Fixed(vec![strike]),
623 tracker,
624 instrument_map,
625 );
626
627 (agg, call_id, put_id)
628 }
629
630 #[rstest]
631 fn test_aggregator_instrument_ids() {
632 let (agg, call_id, put_id) = make_aggregator();
633 let ids = agg.instrument_ids();
634 assert_eq!(ids.len(), 2);
635 assert!(ids.contains(&call_id));
636 assert!(ids.contains(&put_id));
637 }
638
639 #[rstest]
640 fn test_aggregator_update_quote() {
641 let (mut agg, call_id, _) = make_aggregator();
642 let quote = make_quote(call_id, "100.00", "101.00");
643
644 agg.update_quote("e);
645
646 assert_eq!(agg.call_buffer_len(), 1);
647 assert_eq!(agg.put_buffer_len(), 0);
648 }
649
650 #[rstest]
651 fn test_aggregator_update_greeks() {
652 let (mut agg, call_id, _) = make_aggregator();
653 let quote = make_quote(call_id, "100.00", "101.00");
654 agg.update_quote("e);
655
656 let greeks = OptionGreeks {
657 instrument_id: call_id,
658 greeks: OptionGreekValues {
659 delta: 0.55,
660 ..Default::default()
661 },
662 ..Default::default()
663 };
664 agg.update_greeks(&greeks);
665
666 let strike = Price::from("50000");
667 let data = agg.get_call_greeks_from_buffer(&strike);
668 assert!(data.is_some());
669 assert_eq!(data.unwrap().delta, 0.55);
670 }
671
672 #[rstest]
673 fn test_aggregator_snapshot_preserves_state() {
674 let (mut agg, call_id, _) = make_aggregator();
675 let quote = make_quote(call_id, "100.00", "101.00");
676 agg.update_quote("e);
677
678 let slice = agg.snapshot(UnixNanos::from(100u64));
679 assert_eq!(slice.call_count(), 1);
680 assert_eq!(slice.ts_init, UnixNanos::from(100u64));
681
682 assert!(!agg.is_buffer_empty());
684
685 let slice2 = agg.snapshot(UnixNanos::from(200u64));
687 assert_eq!(slice2.call_count(), 1);
688 assert_eq!(slice2.ts_init, UnixNanos::from(200u64));
689 }
690
691 #[rstest]
692 fn test_aggregator_ignores_unknown_instrument() {
693 let (mut agg, _, _) = make_aggregator();
694 let unknown_id = InstrumentId::from("ETH-20240101-3000-C.DERIBIT");
695 let quote = make_quote(unknown_id, "100.00", "101.00");
696
697 agg.update_quote("e);
698
699 assert!(agg.is_buffer_empty());
700 }
701
702 #[rstest]
703 fn test_check_rebalance_returns_none() {
704 let (agg, _, _) = make_aggregator();
705 assert!(agg.check_rebalance(now()).is_none());
706 }
707
708 fn make_multi_strike_aggregator() -> OptionChainAggregator {
713 let strikes = [45000, 47500, 50000, 52500, 55000];
714 let mut instruments = HashMap::new();
715 for s in &strikes {
716 let strike = Price::from(&s.to_string());
717 let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
718 let put_id = InstrumentId::from(&format!("BTC-20240101-{s}-P.DERIBIT"));
719 instruments.insert(call_id, (strike, OptionKind::Call));
720 instruments.insert(put_id, (strike, OptionKind::Put));
721 }
722
723 let tracker = AtmTracker::new();
724 let mut agg = OptionChainAggregator::new(
725 make_series_id(),
726 StrikeRange::AtmRelative {
727 strikes_above: 1,
728 strikes_below: 1,
729 },
730 tracker,
731 instruments,
732 );
733 agg.set_hysteresis(0.0);
735 agg.set_cooldown_ns(0);
736 agg
737 }
738
739 #[rstest]
740 fn test_check_rebalance_fixed_always_none() {
741 let (mut agg, _, _) = make_aggregator();
743 set_atm_via_greeks(&mut agg, 50000.0);
744 assert!(agg.check_rebalance(now()).is_none());
745 }
746
747 #[rstest]
748 fn test_check_rebalance_no_atm_returns_none() {
749 let agg = make_multi_strike_aggregator();
750 assert!(agg.check_rebalance(now()).is_none());
752 }
753
754 #[rstest]
755 fn test_check_rebalance_atm_unchanged_returns_none() {
756 let mut agg = make_multi_strike_aggregator();
757 set_atm_via_greeks(&mut agg, 50000.0);
759 let action = agg.check_rebalance(now()).unwrap();
761 agg.apply_rebalance(&action, now());
762
763 set_atm_via_greeks(&mut agg, 50200.0);
765 assert!(agg.check_rebalance(now()).is_none());
766 }
767
768 #[rstest]
769 fn test_check_rebalance_detects_atm_shift() {
770 let mut agg = make_multi_strike_aggregator();
771 set_atm_via_greeks(&mut agg, 50000.0);
773 let action = agg.check_rebalance(now()).unwrap();
774 agg.apply_rebalance(&action, now());
775 assert_eq!(agg.instrument_ids().len(), 6); set_atm_via_greeks(&mut agg, 55000.0);
780 let action2 = agg.check_rebalance(now()).unwrap();
781 assert!(!action2.add.is_empty() || !action2.remove.is_empty());
783 }
784
785 #[rstest]
786 fn test_apply_rebalance_updates_instrument_map() {
787 let mut agg = make_multi_strike_aggregator();
788 set_atm_via_greeks(&mut agg, 50000.0);
790 let action = agg.check_rebalance(now()).unwrap();
791 agg.apply_rebalance(&action, now());
792
793 let active_ids = agg.instrument_ids();
795 assert_eq!(active_ids.len(), 6); set_atm_via_greeks(&mut agg, 55000.0);
799 let action2 = agg.check_rebalance(now()).unwrap();
800 agg.apply_rebalance(&action2, now());
801
802 let active_ids2 = agg.instrument_ids();
804 assert_eq!(active_ids2.len(), 4); }
806
807 #[rstest]
808 fn test_apply_rebalance_cleans_buffers() {
809 let mut agg = make_multi_strike_aggregator();
810 set_atm_via_greeks(&mut agg, 50000.0);
812 let action = agg.check_rebalance(now()).unwrap();
813 agg.apply_rebalance(&action, now());
814
815 let call_47500 = InstrumentId::from("BTC-20240101-47500-C.DERIBIT");
817 let quote = make_quote(call_47500, "100.00", "101.00");
818 agg.update_quote("e);
819 assert_eq!(agg.call_buffer_len(), 1);
820
821 set_atm_via_greeks(&mut agg, 55000.0);
823 let action2 = agg.check_rebalance(now()).unwrap();
824 agg.apply_rebalance(&action2, now());
825
826 assert_eq!(agg.call_buffer_len(), 0);
828 }
829
830 #[rstest]
831 fn test_initial_active_set_empty_when_no_atm() {
832 let agg = make_multi_strike_aggregator();
833 assert_eq!(agg.instrument_ids().len(), 0);
835 assert_eq!(agg.all_instrument_ids().len(), 10);
836 }
837
838 #[rstest]
839 fn test_catalog_vs_active_separation() {
840 let mut agg = make_multi_strike_aggregator();
841 set_atm_via_greeks(&mut agg, 50000.0);
843 let action = agg.check_rebalance(now()).unwrap();
844 agg.apply_rebalance(&action, now());
845
846 assert_eq!(agg.instruments().len(), 10);
848 assert_eq!(agg.instrument_ids().len(), 6);
850 }
851
852 #[rstest]
855 fn test_add_instrument_already_known() {
856 let (mut agg, call_id, _) = make_aggregator();
857 let strike = Price::from("50000");
858 let count_before = agg.instruments().len();
859
860 let result = agg.add_instrument(call_id, strike, OptionKind::Call);
861
862 assert!(!result);
863 assert_eq!(agg.instruments().len(), count_before);
864 }
865
866 #[rstest]
867 fn test_add_instrument_new_in_active_range() {
868 let (mut agg, _, _) = make_aggregator();
869 let new_id = InstrumentId::from("BTC-20240101-50000-C2.DERIBIT");
871 let strike = Price::from("50000");
872
873 let result = agg.add_instrument(new_id, strike, OptionKind::Call);
874
875 assert!(result);
876 assert_eq!(agg.instruments().len(), 3);
877 assert!(agg.active_ids().contains(&new_id));
878 }
879
880 #[rstest]
881 fn test_add_instrument_new_out_of_range() {
882 let (mut agg, _, _) = make_aggregator();
883 let new_id = InstrumentId::from("BTC-20240101-60000-C.DERIBIT");
885 let strike = Price::from("60000");
886
887 let result = agg.add_instrument(new_id, strike, OptionKind::Call);
888
889 assert!(result);
890 assert_eq!(agg.instruments().len(), 3);
891 assert!(!agg.active_ids().contains(&new_id));
892 }
893
894 #[rstest]
895 fn test_add_instrument_available_for_rebalance() {
896 let mut agg = make_multi_strike_aggregator();
897 set_atm_via_greeks(&mut agg, 50000.0);
899 let action = agg.check_rebalance(now()).unwrap();
900 agg.apply_rebalance(&action, now());
901 assert_eq!(agg.instrument_ids().len(), 6);
903
904 let new_id = InstrumentId::from("BTC-20240101-57500-C.DERIBIT");
906 let strike = Price::from("57500");
907 let result = agg.add_instrument(new_id, strike, OptionKind::Call);
908 assert!(result);
909 assert!(!agg.active_ids().contains(&new_id));
910
911 set_atm_via_greeks(&mut agg, 57500.0);
913 let action2 = agg.check_rebalance(now()).unwrap();
914 agg.apply_rebalance(&action2, now());
915
916 assert!(agg.active_ids().contains(&new_id));
917 }
918
919 #[rstest]
922 fn test_hysteresis_blocks_small_movement() {
923 let strikes = [47500, 50000, 52500];
924 let mut instruments = HashMap::new();
925 for s in &strikes {
926 let strike = Price::from(&s.to_string());
927 let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
928 instruments.insert(call_id, (strike, OptionKind::Call));
929 }
930 let tracker = AtmTracker::new();
931 let mut agg = OptionChainAggregator::new(
932 make_series_id(),
933 StrikeRange::AtmRelative {
934 strikes_above: 1,
935 strikes_below: 1,
936 },
937 tracker,
938 instruments,
939 );
940 agg.set_hysteresis(0.6);
941 agg.set_cooldown_ns(0);
942
943 set_atm_via_greeks(&mut agg, 50000.0);
945 let action = agg.check_rebalance(now()).unwrap();
946 agg.apply_rebalance(&action, now());
947 assert_eq!(agg.last_atm_strike(), Some(Price::from("50000")));
948
949 set_atm_via_greeks(&mut agg, 51000.0);
952 assert!(agg.check_rebalance(now()).is_none());
953 }
954
955 #[rstest]
956 fn test_hysteresis_allows_large_movement() {
957 let strikes = [47500, 50000, 52500];
958 let mut instruments = HashMap::new();
959 for s in &strikes {
960 let strike = Price::from(&s.to_string());
961 let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
962 instruments.insert(call_id, (strike, OptionKind::Call));
963 }
964 let tracker = AtmTracker::new();
965 let mut agg = OptionChainAggregator::new(
966 make_series_id(),
967 StrikeRange::AtmRelative {
968 strikes_above: 1,
969 strikes_below: 1,
970 },
971 tracker,
972 instruments,
973 );
974 agg.set_hysteresis(0.6);
975 agg.set_cooldown_ns(0);
976
977 set_atm_via_greeks(&mut agg, 50000.0);
979 let action = agg.check_rebalance(now()).unwrap();
980 agg.apply_rebalance(&action, now());
981
982 set_atm_via_greeks(&mut agg, 52000.0);
984 assert!(agg.check_rebalance(now()).is_some());
985 }
986
987 #[rstest]
988 fn test_zero_hysteresis_disables_guard() {
989 let mut agg = make_multi_strike_aggregator();
990 agg.set_hysteresis(0.0);
991 agg.set_cooldown_ns(0);
992
993 set_atm_via_greeks(&mut agg, 50000.0);
994 let action = agg.check_rebalance(now()).unwrap();
995 agg.apply_rebalance(&action, now());
996
997 set_atm_via_greeks(&mut agg, 52500.0);
999 assert!(agg.check_rebalance(now()).is_some());
1000 }
1001
1002 #[rstest]
1005 fn test_cooldown_blocks_rapid_rebalance() {
1006 let mut agg = make_multi_strike_aggregator();
1007 agg.set_hysteresis(0.0);
1008 agg.set_cooldown_ns(5_000_000_000); set_atm_via_greeks(&mut agg, 50000.0);
1011 let t0 = now();
1012 let action = agg.check_rebalance(t0).unwrap();
1013 agg.apply_rebalance(&action, t0);
1014
1015 set_atm_via_greeks(&mut agg, 55000.0);
1017 let t1 = UnixNanos::from(t0.as_u64() + 1_000_000_000); assert!(agg.check_rebalance(t1).is_none());
1019 }
1020
1021 #[rstest]
1022 fn test_cooldown_allows_after_elapsed() {
1023 let mut agg = make_multi_strike_aggregator();
1024 agg.set_hysteresis(0.0);
1025 agg.set_cooldown_ns(5_000_000_000); set_atm_via_greeks(&mut agg, 50000.0);
1028 let t0 = now();
1029 let action = agg.check_rebalance(t0).unwrap();
1030 agg.apply_rebalance(&action, t0);
1031
1032 set_atm_via_greeks(&mut agg, 55000.0);
1034 let t1 = UnixNanos::from(t0.as_u64() + 6_000_000_000); assert!(agg.check_rebalance(t1).is_some());
1036 }
1037
1038 #[rstest]
1039 fn test_zero_cooldown_disables_guard() {
1040 let mut agg = make_multi_strike_aggregator();
1041 agg.set_hysteresis(0.0);
1042 agg.set_cooldown_ns(0);
1043
1044 set_atm_via_greeks(&mut agg, 50000.0);
1045 let t0 = now();
1046 let action = agg.check_rebalance(t0).unwrap();
1047 agg.apply_rebalance(&action, t0);
1048
1049 set_atm_via_greeks(&mut agg, 55000.0);
1051 assert!(agg.check_rebalance(t0).is_some());
1052 }
1053
1054 #[rstest]
1057 fn test_pending_greeks_consumed_on_first_quote() {
1058 let (mut agg, call_id, _) = make_aggregator();
1059
1060 let greeks = OptionGreeks {
1062 instrument_id: call_id,
1063 greeks: OptionGreekValues {
1064 delta: 0.55,
1065 ..Default::default()
1066 },
1067 ..Default::default()
1068 };
1069 agg.update_greeks(&greeks);
1070 assert_eq!(agg.pending_greeks_count(), 1);
1071
1072 let quote = make_quote(call_id, "100.00", "101.00");
1074 agg.update_quote("e);
1075 assert_eq!(agg.pending_greeks_count(), 0);
1076
1077 let strike = Price::from("50000");
1079 let data = agg.get_call_greeks_from_buffer(&strike);
1080 assert!(data.is_some());
1081 assert_eq!(data.unwrap().delta, 0.55);
1082 }
1083
1084 #[rstest]
1087 fn test_snapshot_ts_event_reflects_max_quote_timestamp() {
1088 let (mut agg, call_id, put_id) = make_aggregator();
1089
1090 let quote1 = QuoteTick::new(
1091 call_id,
1092 Price::from("100.00"),
1093 Price::from("101.00"),
1094 Quantity::from("1.0"),
1095 Quantity::from("1.0"),
1096 UnixNanos::from(500u64), UnixNanos::from(500u64),
1098 );
1099 agg.update_quote("e1);
1100
1101 let quote2 = QuoteTick::new(
1102 put_id,
1103 Price::from("50.00"),
1104 Price::from("51.00"),
1105 Quantity::from("1.0"),
1106 Quantity::from("1.0"),
1107 UnixNanos::from(800u64), UnixNanos::from(800u64),
1109 );
1110 agg.update_quote("e2);
1111
1112 let slice = agg.snapshot(UnixNanos::from(1000u64));
1113 assert_eq!(slice.ts_event, UnixNanos::from(800u64));
1114 assert_eq!(slice.ts_init, UnixNanos::from(1000u64));
1115 }
1116
1117 #[rstest]
1118 fn test_snapshot_ts_event_fallback_when_no_quotes() {
1119 let (agg, _, _) = make_aggregator();
1120 let slice = agg.snapshot(UnixNanos::from(1000u64));
1121 assert_eq!(slice.ts_event, UnixNanos::from(1000u64));
1123 }
1124
1125 #[rstest]
1126 fn test_snapshot_retains_buffered_data_during_hysteresis_window() {
1127 let strikes = [47500, 50000, 52500];
1129 let mut instruments = HashMap::new();
1130 for s in &strikes {
1131 let strike = Price::from(&s.to_string());
1132 let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
1133 instruments.insert(call_id, (strike, OptionKind::Call));
1134 }
1135 let tracker = AtmTracker::new();
1136 let mut agg = OptionChainAggregator::new(
1137 make_series_id(),
1138 StrikeRange::AtmRelative {
1139 strikes_above: 1,
1140 strikes_below: 1,
1141 },
1142 tracker,
1143 instruments,
1144 );
1145 agg.set_hysteresis(0.6);
1146 agg.set_cooldown_ns(0);
1147
1148 set_atm_via_greeks(&mut agg, 50000.0);
1150 let action = agg.check_rebalance(now()).unwrap();
1151 agg.apply_rebalance(&action, now());
1152 assert_eq!(agg.instrument_ids().len(), 3);
1153
1154 let q1 = make_quote(
1156 InstrumentId::from("BTC-20240101-47500-C.DERIBIT"),
1157 "3000.00",
1158 "3100.00",
1159 );
1160 let q2 = make_quote(
1161 InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
1162 "1500.00",
1163 "1600.00",
1164 );
1165 let q3 = make_quote(
1166 InstrumentId::from("BTC-20240101-52500-C.DERIBIT"),
1167 "500.00",
1168 "600.00",
1169 );
1170 agg.update_quote(&q1);
1171 agg.update_quote(&q2);
1172 agg.update_quote(&q3);
1173 assert_eq!(agg.call_buffer_len(), 3);
1174
1175 set_atm_via_greeks(&mut agg, 51000.0);
1177 assert!(agg.check_rebalance(now()).is_none());
1178
1179 let slice = agg.snapshot(UnixNanos::from(100u64));
1181 assert_eq!(slice.call_count(), 3);
1182 }
1183
1184 #[rstest]
1185 fn test_remove_instrument_from_catalog() {
1186 let (mut agg, call_id, put_id) = make_aggregator();
1187 assert_eq!(agg.instruments().len(), 2);
1188
1189 let removed = agg.remove_instrument(&call_id);
1190 assert!(removed);
1191 assert_eq!(agg.instruments().len(), 1);
1192 assert!(!agg.active_ids().contains(&call_id));
1193 assert!(agg.instruments().contains_key(&put_id));
1194 }
1195
1196 #[rstest]
1197 fn test_remove_instrument_cleans_buffer() {
1198 let (mut agg, call_id, _) = make_aggregator();
1199 let quote = make_quote(call_id, "100.00", "101.00");
1200 agg.update_quote("e);
1201 assert_eq!(agg.call_buffer_len(), 1);
1202
1203 let _ = agg.remove_instrument(&call_id);
1204 assert_eq!(agg.call_buffer_len(), 0);
1206 }
1207
1208 #[rstest]
1209 fn test_remove_instrument_preserves_sibling_buffer() {
1210 let (mut agg, call_id, _) = make_aggregator();
1211 let sibling_id = InstrumentId::from("BTC-20240101-50000-C2.DERIBIT");
1213 let strike = Price::from("50000");
1214 let _ = agg.add_instrument(sibling_id, strike, OptionKind::Call);
1215
1216 let quote = make_quote(call_id, "100.00", "101.00");
1217 agg.update_quote("e);
1218 assert_eq!(agg.call_buffer_len(), 1);
1219
1220 let _ = agg.remove_instrument(&call_id);
1222 assert_eq!(agg.call_buffer_len(), 1); assert!(agg.instruments().contains_key(&sibling_id));
1224 }
1225
1226 #[rstest]
1227 fn test_remove_instrument_unknown_noop() {
1228 let (mut agg, _, _) = make_aggregator();
1229 let unknown = InstrumentId::from("ETH-20240101-3000-C.DERIBIT");
1230 assert!(!agg.remove_instrument(&unknown));
1231 assert_eq!(agg.instruments().len(), 2);
1232 }
1233
1234 #[rstest]
1235 fn test_remove_instrument_cleans_pending_greeks() {
1236 let (mut agg, call_id, _) = make_aggregator();
1237 let greeks = OptionGreeks {
1238 instrument_id: call_id,
1239 greeks: OptionGreekValues {
1240 delta: 0.55,
1241 ..Default::default()
1242 },
1243 ..Default::default()
1244 };
1245 agg.update_greeks(&greeks);
1246 assert_eq!(agg.pending_greeks_count(), 1);
1247
1248 let _ = agg.remove_instrument(&call_id);
1249 assert_eq!(agg.pending_greeks_count(), 0);
1250 }
1251
1252 #[rstest]
1253 fn test_is_catalog_empty_after_full_removal() {
1254 let (mut agg, call_id, put_id) = make_aggregator();
1255 assert!(!agg.is_catalog_empty());
1256
1257 let _ = agg.remove_instrument(&call_id);
1258 assert!(!agg.is_catalog_empty());
1259
1260 let _ = agg.remove_instrument(&put_id);
1261 assert!(agg.is_catalog_empty());
1262 }
1263
1264 #[rstest]
1267 fn test_expired_quote_is_dropped() {
1268 let (mut agg, call_id, _) = make_aggregator();
1269 let expired_quote = QuoteTick::new(
1271 call_id,
1272 Price::from("100.00"),
1273 Price::from("101.00"),
1274 Quantity::from("1.0"),
1275 Quantity::from("1.0"),
1276 UnixNanos::from(1_700_000_000_000_000_000u64),
1277 UnixNanos::from(1_700_000_000_000_000_000u64),
1278 );
1279 agg.update_quote(&expired_quote);
1280 assert!(agg.is_buffer_empty());
1281 }
1282
1283 #[rstest]
1284 fn test_expired_greeks_are_dropped() {
1285 let (mut agg, call_id, _) = make_aggregator();
1286 let quote = make_quote(call_id, "100.00", "101.00");
1288 agg.update_quote("e);
1289 assert_eq!(agg.call_buffer_len(), 1);
1290
1291 let greeks = OptionGreeks {
1293 instrument_id: call_id,
1294 ts_event: UnixNanos::from(1_700_000_000_000_000_000u64),
1295 greeks: OptionGreekValues {
1296 delta: 0.55,
1297 ..Default::default()
1298 },
1299 ..Default::default()
1300 };
1301 agg.update_greeks(&greeks);
1302
1303 let strike = Price::from("50000");
1304 assert!(agg.get_call_greeks_from_buffer(&strike).is_none());
1305 }
1306}