1pub mod config;
21pub mod database;
22mod error;
23pub mod fifo;
24pub mod quote;
25pub mod refs;
26
27mod bounded;
28mod index;
29
30#[cfg(test)]
31mod tests;
32
33use std::{
34 borrow::Cow,
35 cell::{Ref, RefCell},
36 fmt::{Debug, Display},
37 rc::Rc,
38 str::FromStr,
39 time::{SystemTime, UNIX_EPOCH},
40};
41
42use ahash::{AHashMap, AHashSet};
43use bounded::BoundedVecDeque;
44use bytes::Bytes;
45pub use config::CacheConfig; use database::{CacheDatabaseAdapter, CacheMap};
47pub use error::{
48 ACCOUNT_NOT_FOUND, AccountLookupError, CURRENCY_NOT_FOUND, CurrencyLookupError,
49 INSTRUMENT_NOT_FOUND, InstrumentLookupError, ORDER_BOOK_NOT_FOUND, ORDER_LIST_NOT_FOUND,
50 ORDER_NOT_FOUND, OWN_ORDER_BOOK_NOT_FOUND, OrderBookLookupError, OrderListLookupError,
51 OrderLookupError, OwnOrderBookLookupError, POSITION_NOT_FOUND, PositionLookupError,
52 SYNTHETIC_INSTRUMENT_NOT_FOUND, SyntheticInstrumentLookupError,
53};
54use index::CacheIndex;
55use nautilus_core::{
56 SharedCell, UUID4, UnixNanos,
57 correctness::{
58 check_key_not_in_map, check_predicate_false, check_slice_not_empty,
59 check_valid_string_ascii,
60 },
61 datetime::secs_to_nanos_unchecked,
62};
63#[cfg(feature = "defi")]
64use nautilus_model::defi::{Pool, PoolProfiler};
65use nautilus_model::{
66 accounts::{Account, AccountAny},
67 data::{
68 Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, InstrumentStatus,
69 MarkPriceUpdate, QuoteTick, TradeTick, YieldCurveData, option_chain::OptionGreeks,
70 },
71 enums::{
72 AggregationSource, ContingencyType, InstrumentClass, OmsType, OrderSide, PositionSide,
73 PriceType, TriggerType,
74 },
75 events::{AccountState, OrderEventAny},
76 identifiers::{
77 AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
78 OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
79 },
80 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
81 orderbook::{
82 OrderBook,
83 own::{OwnOrderBook, should_handle_own_book_order},
84 },
85 orders::{Order, OrderAny, OrderError, OrderList},
86 position::Position,
87 types::{Currency, Money, Price, Quantity},
88};
89pub use refs::{AccountRef, AccountRefMut, OrderRef, OrderRefMut, PositionRef, PositionRefMut};
90use rust_decimal::Decimal;
91use ustr::Ustr;
92
93use crate::xrate::get_exchange_rate;
94
95#[derive(Clone, Debug, PartialEq, Eq)]
100pub struct CacheSnapshotRef {
101 pub blob_ref: String,
103 pub blob: Bytes,
105}
106
107impl CacheSnapshotRef {
108 #[must_use]
110 pub fn new(blob_ref: impl Into<String>, blob: impl Into<Bytes>) -> Self {
111 Self {
112 blob_ref: blob_ref.into(),
113 blob: blob.into(),
114 }
115 }
116}
117
118#[derive(Clone, Debug)]
125pub struct CacheView {
126 inner: Rc<RefCell<Cache>>,
127}
128
129impl CacheView {
130 #[must_use]
132 pub fn new(inner: Rc<RefCell<Cache>>) -> Self {
133 Self { inner }
134 }
135
136 pub fn borrow(&self) -> Ref<'_, Cache> {
142 self.inner.borrow()
143 }
144}
145
146impl From<Rc<RefCell<Cache>>> for CacheView {
147 fn from(inner: Rc<RefCell<Cache>>) -> Self {
148 Self::new(inner)
149 }
150}
151
152#[derive(Debug)]
159pub struct CacheApi<'a> {
160 cache: &'a RefCell<Cache>,
161}
162
163impl<'a> CacheApi<'a> {
164 pub(crate) fn new(cache: &'a RefCell<Cache>) -> Self {
165 Self { cache }
166 }
167
168 #[must_use]
174 pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
175 self.cache().calculate_unrealized_pnl(position)
176 }
177
178 #[must_use]
184 pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
185 self.cache().oms_type(position_id)
186 }
187
188 #[must_use]
194 pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<Vec<u8>>> {
195 self.cache().position_snapshot_bytes(position_id)
196 }
197
198 #[must_use]
204 pub fn position_snapshot_count(&self, position_id: &PositionId) -> usize {
205 self.cache().position_snapshot_count(position_id)
206 }
207
208 #[must_use]
214 pub fn position_snapshots(
215 &self,
216 position_id: Option<&PositionId>,
217 account_id: Option<&AccountId>,
218 ) -> Vec<Position> {
219 self.cache().position_snapshots(position_id, account_id)
220 }
221
222 #[must_use]
228 pub fn position_snapshots_from(&self, position_id: &PositionId, skip: usize) -> Vec<Position> {
229 self.cache().position_snapshots_from(position_id, skip)
230 }
231
232 #[must_use]
238 pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> AHashSet<PositionId> {
239 self.cache().position_snapshot_ids(instrument_id)
240 }
241
242 #[must_use]
248 pub fn client_order_ids(
249 &self,
250 venue: Option<&Venue>,
251 instrument_id: Option<&InstrumentId>,
252 strategy_id: Option<&StrategyId>,
253 account_id: Option<&AccountId>,
254 ) -> AHashSet<ClientOrderId> {
255 self.cache()
256 .client_order_ids(venue, instrument_id, strategy_id, account_id)
257 }
258
259 #[must_use]
265 pub fn client_order_ids_open(
266 &self,
267 venue: Option<&Venue>,
268 instrument_id: Option<&InstrumentId>,
269 strategy_id: Option<&StrategyId>,
270 account_id: Option<&AccountId>,
271 ) -> AHashSet<ClientOrderId> {
272 self.cache()
273 .client_order_ids_open(venue, instrument_id, strategy_id, account_id)
274 }
275
276 #[must_use]
282 pub fn client_order_ids_closed(
283 &self,
284 venue: Option<&Venue>,
285 instrument_id: Option<&InstrumentId>,
286 strategy_id: Option<&StrategyId>,
287 account_id: Option<&AccountId>,
288 ) -> AHashSet<ClientOrderId> {
289 self.cache()
290 .client_order_ids_closed(venue, instrument_id, strategy_id, account_id)
291 }
292
293 #[must_use]
299 pub fn client_order_ids_active_local(
300 &self,
301 venue: Option<&Venue>,
302 instrument_id: Option<&InstrumentId>,
303 strategy_id: Option<&StrategyId>,
304 account_id: Option<&AccountId>,
305 ) -> AHashSet<ClientOrderId> {
306 self.cache()
307 .client_order_ids_active_local(venue, instrument_id, strategy_id, account_id)
308 }
309
310 #[must_use]
316 pub fn client_order_ids_emulated(
317 &self,
318 venue: Option<&Venue>,
319 instrument_id: Option<&InstrumentId>,
320 strategy_id: Option<&StrategyId>,
321 account_id: Option<&AccountId>,
322 ) -> AHashSet<ClientOrderId> {
323 self.cache()
324 .client_order_ids_emulated(venue, instrument_id, strategy_id, account_id)
325 }
326
327 #[must_use]
333 pub fn client_order_ids_inflight(
334 &self,
335 venue: Option<&Venue>,
336 instrument_id: Option<&InstrumentId>,
337 strategy_id: Option<&StrategyId>,
338 account_id: Option<&AccountId>,
339 ) -> AHashSet<ClientOrderId> {
340 self.cache()
341 .client_order_ids_inflight(venue, instrument_id, strategy_id, account_id)
342 }
343
344 #[must_use]
350 pub fn position_ids(
351 &self,
352 venue: Option<&Venue>,
353 instrument_id: Option<&InstrumentId>,
354 strategy_id: Option<&StrategyId>,
355 account_id: Option<&AccountId>,
356 ) -> AHashSet<PositionId> {
357 self.cache()
358 .position_ids(venue, instrument_id, strategy_id, account_id)
359 }
360
361 #[must_use]
367 pub fn position_open_ids(
368 &self,
369 venue: Option<&Venue>,
370 instrument_id: Option<&InstrumentId>,
371 strategy_id: Option<&StrategyId>,
372 account_id: Option<&AccountId>,
373 ) -> AHashSet<PositionId> {
374 self.cache()
375 .position_open_ids(venue, instrument_id, strategy_id, account_id)
376 }
377
378 #[must_use]
384 pub fn position_closed_ids(
385 &self,
386 venue: Option<&Venue>,
387 instrument_id: Option<&InstrumentId>,
388 strategy_id: Option<&StrategyId>,
389 account_id: Option<&AccountId>,
390 ) -> AHashSet<PositionId> {
391 self.cache()
392 .position_closed_ids(venue, instrument_id, strategy_id, account_id)
393 }
394
395 #[must_use]
401 pub fn actor_ids(&self) -> AHashSet<ComponentId> {
402 self.cache().actor_ids()
403 }
404
405 #[must_use]
411 pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
412 self.cache().strategy_ids()
413 }
414
415 #[must_use]
421 pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
422 self.cache().exec_algorithm_ids()
423 }
424
425 #[must_use]
431 pub fn order(&self, client_order_id: &ClientOrderId) -> Option<OrderAny> {
432 self.cache().order_owned(client_order_id)
433 }
434
435 pub fn try_order(&self, client_order_id: &ClientOrderId) -> Result<OrderAny, OrderLookupError> {
446 self.cache().try_order_owned(client_order_id)
447 }
448
449 #[must_use]
455 pub fn orders_for_ids(
456 &self,
457 client_order_ids: &[ClientOrderId],
458 context: &dyn Display,
459 ) -> Vec<OrderAny> {
460 self.cache().orders_for_ids(client_order_ids, context)
461 }
462
463 #[must_use]
469 pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<ClientOrderId> {
470 self.cache().client_order_id(venue_order_id).copied()
471 }
472
473 #[must_use]
479 pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<VenueOrderId> {
480 self.cache().venue_order_id(client_order_id).copied()
481 }
482
483 #[must_use]
489 pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<ClientId> {
490 self.cache().client_id(client_order_id).copied()
491 }
492
493 #[must_use]
499 pub fn orders(
500 &self,
501 venue: Option<&Venue>,
502 instrument_id: Option<&InstrumentId>,
503 strategy_id: Option<&StrategyId>,
504 account_id: Option<&AccountId>,
505 side: Option<OrderSide>,
506 ) -> Vec<OrderAny> {
507 self.cache()
508 .orders_refs(venue, instrument_id, strategy_id, account_id, side)
509 .into_iter()
510 .map(|order| order.cloned())
511 .collect()
512 }
513
514 #[must_use]
520 pub fn orders_open(
521 &self,
522 venue: Option<&Venue>,
523 instrument_id: Option<&InstrumentId>,
524 strategy_id: Option<&StrategyId>,
525 account_id: Option<&AccountId>,
526 side: Option<OrderSide>,
527 ) -> Vec<OrderAny> {
528 self.cache()
529 .orders_open_refs(venue, instrument_id, strategy_id, account_id, side)
530 .into_iter()
531 .map(|order| order.cloned())
532 .collect()
533 }
534
535 #[must_use]
541 pub fn orders_closed(
542 &self,
543 venue: Option<&Venue>,
544 instrument_id: Option<&InstrumentId>,
545 strategy_id: Option<&StrategyId>,
546 account_id: Option<&AccountId>,
547 side: Option<OrderSide>,
548 ) -> Vec<OrderAny> {
549 self.cache()
550 .orders_closed_refs(venue, instrument_id, strategy_id, account_id, side)
551 .into_iter()
552 .map(|order| order.cloned())
553 .collect()
554 }
555
556 #[must_use]
562 pub fn orders_active_local(
563 &self,
564 venue: Option<&Venue>,
565 instrument_id: Option<&InstrumentId>,
566 strategy_id: Option<&StrategyId>,
567 account_id: Option<&AccountId>,
568 side: Option<OrderSide>,
569 ) -> Vec<OrderAny> {
570 self.cache()
571 .orders_active_local_refs(venue, instrument_id, strategy_id, account_id, side)
572 .into_iter()
573 .map(|order| order.cloned())
574 .collect()
575 }
576
577 #[must_use]
583 pub fn orders_emulated(
584 &self,
585 venue: Option<&Venue>,
586 instrument_id: Option<&InstrumentId>,
587 strategy_id: Option<&StrategyId>,
588 account_id: Option<&AccountId>,
589 side: Option<OrderSide>,
590 ) -> Vec<OrderAny> {
591 self.cache()
592 .orders_emulated_refs(venue, instrument_id, strategy_id, account_id, side)
593 .into_iter()
594 .map(|order| order.cloned())
595 .collect()
596 }
597
598 #[must_use]
604 pub fn orders_inflight(
605 &self,
606 venue: Option<&Venue>,
607 instrument_id: Option<&InstrumentId>,
608 strategy_id: Option<&StrategyId>,
609 account_id: Option<&AccountId>,
610 side: Option<OrderSide>,
611 ) -> Vec<OrderAny> {
612 self.cache()
613 .orders_inflight_refs(venue, instrument_id, strategy_id, account_id, side)
614 .into_iter()
615 .map(|order| order.cloned())
616 .collect()
617 }
618
619 #[must_use]
625 pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<OrderAny> {
626 self.cache()
627 .orders_for_position(position_id)
628 .into_iter()
629 .map(|order| order.cloned())
630 .collect()
631 }
632
633 #[must_use]
639 pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
640 self.cache().order_exists(client_order_id)
641 }
642
643 #[must_use]
649 pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
650 self.cache().is_order_open(client_order_id)
651 }
652
653 #[must_use]
659 pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
660 self.cache().is_order_closed(client_order_id)
661 }
662
663 #[must_use]
669 pub fn is_order_active_local(&self, client_order_id: &ClientOrderId) -> bool {
670 self.cache().is_order_active_local(client_order_id)
671 }
672
673 #[must_use]
679 pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
680 self.cache().is_order_emulated(client_order_id)
681 }
682
683 #[must_use]
689 pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
690 self.cache().is_order_inflight(client_order_id)
691 }
692
693 #[must_use]
699 pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
700 self.cache().is_order_pending_cancel_local(client_order_id)
701 }
702
703 #[must_use]
709 pub fn orders_open_count(
710 &self,
711 venue: Option<&Venue>,
712 instrument_id: Option<&InstrumentId>,
713 strategy_id: Option<&StrategyId>,
714 account_id: Option<&AccountId>,
715 side: Option<OrderSide>,
716 ) -> usize {
717 self.cache()
718 .orders_open_count(venue, instrument_id, strategy_id, account_id, side)
719 }
720
721 #[must_use]
727 pub fn orders_closed_count(
728 &self,
729 venue: Option<&Venue>,
730 instrument_id: Option<&InstrumentId>,
731 strategy_id: Option<&StrategyId>,
732 account_id: Option<&AccountId>,
733 side: Option<OrderSide>,
734 ) -> usize {
735 self.cache()
736 .orders_closed_count(venue, instrument_id, strategy_id, account_id, side)
737 }
738
739 #[must_use]
745 pub fn orders_active_local_count(
746 &self,
747 venue: Option<&Venue>,
748 instrument_id: Option<&InstrumentId>,
749 strategy_id: Option<&StrategyId>,
750 account_id: Option<&AccountId>,
751 side: Option<OrderSide>,
752 ) -> usize {
753 self.cache()
754 .orders_active_local_count(venue, instrument_id, strategy_id, account_id, side)
755 }
756
757 #[must_use]
763 pub fn orders_emulated_count(
764 &self,
765 venue: Option<&Venue>,
766 instrument_id: Option<&InstrumentId>,
767 strategy_id: Option<&StrategyId>,
768 account_id: Option<&AccountId>,
769 side: Option<OrderSide>,
770 ) -> usize {
771 self.cache()
772 .orders_emulated_count(venue, instrument_id, strategy_id, account_id, side)
773 }
774
775 #[must_use]
781 pub fn orders_inflight_count(
782 &self,
783 venue: Option<&Venue>,
784 instrument_id: Option<&InstrumentId>,
785 strategy_id: Option<&StrategyId>,
786 account_id: Option<&AccountId>,
787 side: Option<OrderSide>,
788 ) -> usize {
789 self.cache()
790 .orders_inflight_count(venue, instrument_id, strategy_id, account_id, side)
791 }
792
793 #[must_use]
799 pub fn orders_total_count(
800 &self,
801 venue: Option<&Venue>,
802 instrument_id: Option<&InstrumentId>,
803 strategy_id: Option<&StrategyId>,
804 account_id: Option<&AccountId>,
805 side: Option<OrderSide>,
806 ) -> usize {
807 self.cache()
808 .orders_total_count(venue, instrument_id, strategy_id, account_id, side)
809 }
810
811 #[must_use]
817 pub fn has_orders_open(
818 &self,
819 venue: Option<&Venue>,
820 instrument_id: Option<&InstrumentId>,
821 strategy_id: Option<&StrategyId>,
822 account_id: Option<&AccountId>,
823 side: Option<OrderSide>,
824 ) -> bool {
825 self.cache()
826 .has_orders_open(venue, instrument_id, strategy_id, account_id, side)
827 }
828
829 #[must_use]
835 pub fn has_orders_closed(
836 &self,
837 venue: Option<&Venue>,
838 instrument_id: Option<&InstrumentId>,
839 strategy_id: Option<&StrategyId>,
840 account_id: Option<&AccountId>,
841 side: Option<OrderSide>,
842 ) -> bool {
843 self.cache()
844 .has_orders_closed(venue, instrument_id, strategy_id, account_id, side)
845 }
846
847 #[must_use]
853 pub fn has_orders_active_local(
854 &self,
855 venue: Option<&Venue>,
856 instrument_id: Option<&InstrumentId>,
857 strategy_id: Option<&StrategyId>,
858 account_id: Option<&AccountId>,
859 side: Option<OrderSide>,
860 ) -> bool {
861 self.cache()
862 .has_orders_active_local(venue, instrument_id, strategy_id, account_id, side)
863 }
864
865 #[must_use]
871 pub fn has_orders_emulated(
872 &self,
873 venue: Option<&Venue>,
874 instrument_id: Option<&InstrumentId>,
875 strategy_id: Option<&StrategyId>,
876 account_id: Option<&AccountId>,
877 side: Option<OrderSide>,
878 ) -> bool {
879 self.cache()
880 .has_orders_emulated(venue, instrument_id, strategy_id, account_id, side)
881 }
882
883 #[must_use]
889 pub fn has_orders_inflight(
890 &self,
891 venue: Option<&Venue>,
892 instrument_id: Option<&InstrumentId>,
893 strategy_id: Option<&StrategyId>,
894 account_id: Option<&AccountId>,
895 side: Option<OrderSide>,
896 ) -> bool {
897 self.cache()
898 .has_orders_inflight(venue, instrument_id, strategy_id, account_id, side)
899 }
900
901 #[must_use]
907 pub fn has_orders(
908 &self,
909 venue: Option<&Venue>,
910 instrument_id: Option<&InstrumentId>,
911 strategy_id: Option<&StrategyId>,
912 account_id: Option<&AccountId>,
913 side: Option<OrderSide>,
914 ) -> bool {
915 self.cache()
916 .has_orders(venue, instrument_id, strategy_id, account_id, side)
917 }
918
919 #[must_use]
925 pub fn order_list(&self, order_list_id: &OrderListId) -> Option<OrderList> {
926 self.cache().order_list(order_list_id).cloned()
927 }
928
929 pub fn try_order_list(
940 &self,
941 order_list_id: &OrderListId,
942 ) -> Result<OrderList, OrderListLookupError> {
943 self.cache().try_order_list(order_list_id).cloned()
944 }
945
946 #[must_use]
952 pub fn order_lists(
953 &self,
954 venue: Option<&Venue>,
955 instrument_id: Option<&InstrumentId>,
956 strategy_id: Option<&StrategyId>,
957 account_id: Option<&AccountId>,
958 ) -> Vec<OrderList> {
959 self.cache()
960 .order_lists(venue, instrument_id, strategy_id, account_id)
961 .into_iter()
962 .cloned()
963 .collect()
964 }
965
966 #[must_use]
972 pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
973 self.cache().order_list_exists(order_list_id)
974 }
975
976 #[must_use]
982 pub fn orders_for_exec_algorithm(
983 &self,
984 exec_algorithm_id: &ExecAlgorithmId,
985 venue: Option<&Venue>,
986 instrument_id: Option<&InstrumentId>,
987 strategy_id: Option<&StrategyId>,
988 account_id: Option<&AccountId>,
989 side: Option<OrderSide>,
990 ) -> Vec<OrderAny> {
991 self.cache()
992 .orders_for_exec_algorithm(
993 exec_algorithm_id,
994 venue,
995 instrument_id,
996 strategy_id,
997 account_id,
998 side,
999 )
1000 .into_iter()
1001 .map(|order| order.cloned())
1002 .collect()
1003 }
1004
1005 #[must_use]
1011 pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<OrderAny> {
1012 self.cache()
1013 .orders_for_exec_spawn(exec_spawn_id)
1014 .into_iter()
1015 .map(|order| order.cloned())
1016 .collect()
1017 }
1018
1019 #[must_use]
1025 pub fn exec_spawn_total_quantity(
1026 &self,
1027 exec_spawn_id: &ClientOrderId,
1028 active_only: bool,
1029 ) -> Option<Quantity> {
1030 self.cache()
1031 .exec_spawn_total_quantity(exec_spawn_id, active_only)
1032 }
1033
1034 #[must_use]
1040 pub fn exec_spawn_total_filled_qty(
1041 &self,
1042 exec_spawn_id: &ClientOrderId,
1043 active_only: bool,
1044 ) -> Option<Quantity> {
1045 self.cache()
1046 .exec_spawn_total_filled_qty(exec_spawn_id, active_only)
1047 }
1048
1049 #[must_use]
1055 pub fn exec_spawn_total_leaves_qty(
1056 &self,
1057 exec_spawn_id: &ClientOrderId,
1058 active_only: bool,
1059 ) -> Option<Quantity> {
1060 self.cache()
1061 .exec_spawn_total_leaves_qty(exec_spawn_id, active_only)
1062 }
1063
1064 #[must_use]
1070 pub fn position(&self, position_id: &PositionId) -> Option<Position> {
1071 self.cache()
1072 .position_ref(position_id)
1073 .map(|position| position.cloned())
1074 }
1075
1076 pub fn try_position(&self, position_id: &PositionId) -> Result<Position, PositionLookupError> {
1087 self.cache()
1088 .try_position_ref(position_id)
1089 .map(|position| position.cloned())
1090 }
1091
1092 #[must_use]
1098 pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<Position> {
1099 self.cache()
1100 .position_for_order_ref(client_order_id)
1101 .map(|position| position.cloned())
1102 }
1103
1104 #[must_use]
1110 pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<PositionId> {
1111 self.cache().position_id(client_order_id).copied()
1112 }
1113
1114 #[must_use]
1120 pub fn positions(
1121 &self,
1122 venue: Option<&Venue>,
1123 instrument_id: Option<&InstrumentId>,
1124 strategy_id: Option<&StrategyId>,
1125 account_id: Option<&AccountId>,
1126 side: Option<PositionSide>,
1127 ) -> Vec<Position> {
1128 self.cache()
1129 .positions_refs(venue, instrument_id, strategy_id, account_id, side)
1130 .into_iter()
1131 .map(|position| position.cloned())
1132 .collect()
1133 }
1134
1135 #[must_use]
1141 pub fn positions_open(
1142 &self,
1143 venue: Option<&Venue>,
1144 instrument_id: Option<&InstrumentId>,
1145 strategy_id: Option<&StrategyId>,
1146 account_id: Option<&AccountId>,
1147 side: Option<PositionSide>,
1148 ) -> Vec<Position> {
1149 self.cache()
1150 .positions_open_refs(venue, instrument_id, strategy_id, account_id, side)
1151 .into_iter()
1152 .map(|position| position.cloned())
1153 .collect()
1154 }
1155
1156 #[must_use]
1162 pub fn positions_closed(
1163 &self,
1164 venue: Option<&Venue>,
1165 instrument_id: Option<&InstrumentId>,
1166 strategy_id: Option<&StrategyId>,
1167 account_id: Option<&AccountId>,
1168 side: Option<PositionSide>,
1169 ) -> Vec<Position> {
1170 self.cache()
1171 .positions_closed_refs(venue, instrument_id, strategy_id, account_id, side)
1172 .into_iter()
1173 .map(|position| position.cloned())
1174 .collect()
1175 }
1176
1177 #[must_use]
1183 pub fn position_exists(&self, position_id: &PositionId) -> bool {
1184 self.cache().position_exists(position_id)
1185 }
1186
1187 #[must_use]
1193 pub fn is_position_open(&self, position_id: &PositionId) -> bool {
1194 self.cache().is_position_open(position_id)
1195 }
1196
1197 #[must_use]
1203 pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
1204 self.cache().is_position_closed(position_id)
1205 }
1206
1207 #[must_use]
1213 pub fn positions_open_count(
1214 &self,
1215 venue: Option<&Venue>,
1216 instrument_id: Option<&InstrumentId>,
1217 strategy_id: Option<&StrategyId>,
1218 account_id: Option<&AccountId>,
1219 side: Option<PositionSide>,
1220 ) -> usize {
1221 self.cache()
1222 .positions_open_count(venue, instrument_id, strategy_id, account_id, side)
1223 }
1224
1225 #[must_use]
1231 pub fn positions_closed_count(
1232 &self,
1233 venue: Option<&Venue>,
1234 instrument_id: Option<&InstrumentId>,
1235 strategy_id: Option<&StrategyId>,
1236 account_id: Option<&AccountId>,
1237 side: Option<PositionSide>,
1238 ) -> usize {
1239 self.cache()
1240 .positions_closed_count(venue, instrument_id, strategy_id, account_id, side)
1241 }
1242
1243 #[must_use]
1249 pub fn positions_total_count(
1250 &self,
1251 venue: Option<&Venue>,
1252 instrument_id: Option<&InstrumentId>,
1253 strategy_id: Option<&StrategyId>,
1254 account_id: Option<&AccountId>,
1255 side: Option<PositionSide>,
1256 ) -> usize {
1257 self.cache()
1258 .positions_total_count(venue, instrument_id, strategy_id, account_id, side)
1259 }
1260
1261 #[must_use]
1267 pub fn has_positions_open(
1268 &self,
1269 venue: Option<&Venue>,
1270 instrument_id: Option<&InstrumentId>,
1271 strategy_id: Option<&StrategyId>,
1272 account_id: Option<&AccountId>,
1273 side: Option<PositionSide>,
1274 ) -> bool {
1275 self.cache()
1276 .has_positions_open(venue, instrument_id, strategy_id, account_id, side)
1277 }
1278
1279 #[must_use]
1285 pub fn has_positions_closed(
1286 &self,
1287 venue: Option<&Venue>,
1288 instrument_id: Option<&InstrumentId>,
1289 strategy_id: Option<&StrategyId>,
1290 account_id: Option<&AccountId>,
1291 side: Option<PositionSide>,
1292 ) -> bool {
1293 self.cache()
1294 .has_positions_closed(venue, instrument_id, strategy_id, account_id, side)
1295 }
1296
1297 #[must_use]
1303 pub fn has_positions(
1304 &self,
1305 venue: Option<&Venue>,
1306 instrument_id: Option<&InstrumentId>,
1307 strategy_id: Option<&StrategyId>,
1308 account_id: Option<&AccountId>,
1309 side: Option<PositionSide>,
1310 ) -> bool {
1311 self.cache()
1312 .has_positions(venue, instrument_id, strategy_id, account_id, side)
1313 }
1314
1315 #[must_use]
1321 pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<StrategyId> {
1322 self.cache().strategy_id_for_order(client_order_id).copied()
1323 }
1324
1325 #[must_use]
1331 pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<StrategyId> {
1332 self.cache().strategy_id_for_position(position_id).copied()
1333 }
1334
1335 pub fn get(&self, key: &str) -> anyhow::Result<Option<Bytes>> {
1346 let cache = self.cache();
1347 let value = cache.get(key)?;
1348 Ok(value.cloned())
1349 }
1350
1351 #[must_use]
1358 pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
1359 self.cache().price(instrument_id, price_type)
1360 }
1361
1362 #[must_use]
1368 pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
1369 self.cache().quotes(instrument_id)
1370 }
1371
1372 #[must_use]
1378 pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
1379 self.cache().trades(instrument_id)
1380 }
1381
1382 #[must_use]
1388 pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
1389 self.cache().mark_prices(instrument_id)
1390 }
1391
1392 #[must_use]
1398 pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
1399 self.cache().index_prices(instrument_id)
1400 }
1401
1402 #[must_use]
1408 pub fn funding_rates(&self, instrument_id: &InstrumentId) -> Option<Vec<FundingRateUpdate>> {
1409 self.cache().funding_rates(instrument_id)
1410 }
1411
1412 #[must_use]
1418 pub fn instrument_statuses(
1419 &self,
1420 instrument_id: &InstrumentId,
1421 ) -> Option<Vec<InstrumentStatus>> {
1422 self.cache().instrument_statuses(instrument_id)
1423 }
1424
1425 #[must_use]
1431 pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
1432 self.cache().bars(bar_type)
1433 }
1434
1435 #[must_use]
1441 pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<OrderBook> {
1442 self.cache().order_book(instrument_id).cloned()
1443 }
1444
1445 pub fn try_order_book(
1456 &self,
1457 instrument_id: &InstrumentId,
1458 ) -> Result<OrderBook, OrderBookLookupError> {
1459 self.cache().try_order_book(instrument_id).cloned()
1460 }
1461
1462 #[must_use]
1468 pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<OwnOrderBook> {
1469 self.cache().own_order_book(instrument_id).cloned()
1470 }
1471
1472 pub fn try_own_order_book(
1484 &self,
1485 instrument_id: &InstrumentId,
1486 ) -> Result<OwnOrderBook, OwnOrderBookLookupError> {
1487 self.cache().try_own_order_book(instrument_id).cloned()
1488 }
1489
1490 #[must_use]
1496 pub fn quote(&self, instrument_id: &InstrumentId) -> Option<QuoteTick> {
1497 self.cache().quote(instrument_id).copied()
1498 }
1499
1500 #[must_use]
1508 pub fn quote_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<QuoteTick> {
1509 self.cache().quote_at_index(instrument_id, index).copied()
1510 }
1511
1512 #[must_use]
1518 pub fn trade(&self, instrument_id: &InstrumentId) -> Option<TradeTick> {
1519 self.cache().trade(instrument_id).copied()
1520 }
1521
1522 #[must_use]
1530 pub fn trade_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<TradeTick> {
1531 self.cache().trade_at_index(instrument_id, index).copied()
1532 }
1533
1534 #[must_use]
1540 pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<MarkPriceUpdate> {
1541 self.cache().mark_price(instrument_id).copied()
1542 }
1543
1544 #[must_use]
1550 pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<IndexPriceUpdate> {
1551 self.cache().index_price(instrument_id).copied()
1552 }
1553
1554 #[must_use]
1560 pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<FundingRateUpdate> {
1561 self.cache().funding_rate(instrument_id).copied()
1562 }
1563
1564 #[must_use]
1570 pub fn instrument_status(&self, instrument_id: &InstrumentId) -> Option<InstrumentStatus> {
1571 self.cache().instrument_status(instrument_id).copied()
1572 }
1573
1574 #[must_use]
1580 pub fn bar(&self, bar_type: &BarType) -> Option<Bar> {
1581 self.cache().bar(bar_type).copied()
1582 }
1583
1584 #[must_use]
1592 pub fn bar_at_index(&self, bar_type: &BarType, index: usize) -> Option<Bar> {
1593 self.cache().bar_at_index(bar_type, index).copied()
1594 }
1595
1596 #[must_use]
1602 pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
1603 self.cache().book_update_count(instrument_id)
1604 }
1605
1606 #[must_use]
1612 pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
1613 self.cache().quote_count(instrument_id)
1614 }
1615
1616 #[must_use]
1622 pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
1623 self.cache().trade_count(instrument_id)
1624 }
1625
1626 #[must_use]
1632 pub fn bar_count(&self, bar_type: &BarType) -> usize {
1633 self.cache().bar_count(bar_type)
1634 }
1635
1636 #[must_use]
1642 pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
1643 self.cache().has_order_book(instrument_id)
1644 }
1645
1646 #[must_use]
1652 pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
1653 self.cache().has_quote_ticks(instrument_id)
1654 }
1655
1656 #[must_use]
1662 pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
1663 self.cache().has_trade_ticks(instrument_id)
1664 }
1665
1666 #[must_use]
1672 pub fn has_bars(&self, bar_type: &BarType) -> bool {
1673 self.cache().has_bars(bar_type)
1674 }
1675
1676 #[must_use]
1682 pub fn get_xrate(
1683 &self,
1684 venue: Venue,
1685 from_currency: Currency,
1686 to_currency: Currency,
1687 price_type: PriceType,
1688 ) -> Option<Decimal> {
1689 self.cache()
1690 .get_xrate(venue, from_currency, to_currency, price_type)
1691 }
1692
1693 #[must_use]
1699 pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
1700 self.cache().get_mark_xrate(from_currency, to_currency)
1701 }
1702
1703 #[must_use]
1709 pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1710 self.cache().yield_curve(key)
1711 }
1712
1713 #[must_use]
1719 pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1720 self.cache().greeks(instrument_id)
1721 }
1722
1723 #[must_use]
1729 pub fn option_greeks(&self, instrument_id: &InstrumentId) -> Option<OptionGreeks> {
1730 self.cache().option_greeks(instrument_id).copied()
1731 }
1732
1733 #[must_use]
1739 pub fn currency(&self, code: &Ustr) -> Option<Currency> {
1740 self.cache().currency(code).copied()
1741 }
1742
1743 pub fn try_currency(&self, code: &Ustr) -> Result<Currency, CurrencyLookupError> {
1754 self.cache().try_currency(code).copied()
1755 }
1756
1757 #[must_use]
1763 pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
1764 self.cache().instrument(instrument_id).cloned()
1765 }
1766
1767 pub fn try_instrument(
1778 &self,
1779 instrument_id: &InstrumentId,
1780 ) -> Result<InstrumentAny, InstrumentLookupError> {
1781 self.cache().try_instrument(instrument_id).cloned()
1782 }
1783
1784 #[must_use]
1790 pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<InstrumentId> {
1791 self.cache()
1792 .instrument_ids(venue)
1793 .into_iter()
1794 .copied()
1795 .collect()
1796 }
1797
1798 #[must_use]
1804 pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<InstrumentAny> {
1805 self.cache()
1806 .instruments(venue, underlying)
1807 .into_iter()
1808 .cloned()
1809 .collect()
1810 }
1811
1812 #[must_use]
1819 pub fn instruments_by_parent(
1820 &self,
1821 venue: &Venue,
1822 root: &Ustr,
1823 class: InstrumentClass,
1824 ) -> Vec<InstrumentAny> {
1825 self.cache()
1826 .instruments_by_parent(venue, root, class)
1827 .into_iter()
1828 .cloned()
1829 .collect()
1830 }
1831
1832 #[must_use]
1838 pub fn bar_types(
1839 &self,
1840 instrument_id: Option<&InstrumentId>,
1841 price_type: Option<&PriceType>,
1842 aggregation_source: AggregationSource,
1843 ) -> Vec<BarType> {
1844 self.cache()
1845 .bar_types(instrument_id, price_type, aggregation_source)
1846 .into_iter()
1847 .copied()
1848 .collect()
1849 }
1850
1851 #[must_use]
1857 pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<SyntheticInstrument> {
1858 self.cache().synthetic(instrument_id).cloned()
1859 }
1860
1861 pub fn try_synthetic(
1873 &self,
1874 instrument_id: &InstrumentId,
1875 ) -> Result<SyntheticInstrument, SyntheticInstrumentLookupError> {
1876 self.cache().try_synthetic(instrument_id).cloned()
1877 }
1878
1879 #[must_use]
1885 pub fn synthetic_ids(&self) -> Vec<InstrumentId> {
1886 self.cache().synthetic_ids().into_iter().copied().collect()
1887 }
1888
1889 #[must_use]
1895 pub fn synthetics(&self) -> Vec<SyntheticInstrument> {
1896 self.cache().synthetics().into_iter().cloned().collect()
1897 }
1898
1899 #[cfg(feature = "defi")]
1905 #[must_use]
1906 pub fn pool(&self, instrument_id: &InstrumentId) -> Option<Pool> {
1907 self.cache().pool(instrument_id).cloned()
1908 }
1909
1910 #[cfg(feature = "defi")]
1916 #[must_use]
1917 pub fn pool_ids(&self, venue: Option<&Venue>) -> Vec<InstrumentId> {
1918 self.cache().pool_ids(venue)
1919 }
1920
1921 #[cfg(feature = "defi")]
1927 #[must_use]
1928 pub fn pools(&self, venue: Option<&Venue>) -> Vec<Pool> {
1929 self.cache().pools(venue).into_iter().cloned().collect()
1930 }
1931
1932 #[cfg(feature = "defi")]
1938 #[must_use]
1939 pub fn pool_profiler(&self, instrument_id: &InstrumentId) -> Option<PoolProfiler> {
1940 self.cache().pool_profiler(instrument_id).cloned()
1941 }
1942
1943 #[cfg(feature = "defi")]
1949 #[must_use]
1950 pub fn pool_profiler_ids(&self, venue: Option<&Venue>) -> Vec<InstrumentId> {
1951 self.cache().pool_profiler_ids(venue)
1952 }
1953
1954 #[cfg(feature = "defi")]
1960 #[must_use]
1961 pub fn pool_profilers(&self, venue: Option<&Venue>) -> Vec<PoolProfiler> {
1962 self.cache()
1963 .pool_profilers(venue)
1964 .into_iter()
1965 .cloned()
1966 .collect()
1967 }
1968
1969 #[must_use]
1975 pub fn account(&self, account_id: &AccountId) -> Option<AccountAny> {
1976 self.cache().account_owned(account_id)
1977 }
1978
1979 pub fn try_account(&self, account_id: &AccountId) -> Result<AccountAny, AccountLookupError> {
1990 self.cache()
1991 .try_account(account_id)
1992 .map(|account| account.cloned())
1993 }
1994
1995 #[must_use]
2001 pub fn account_for_venue(&self, venue: &Venue) -> Option<AccountAny> {
2002 self.cache().account_for_venue_owned(venue)
2003 }
2004
2005 #[must_use]
2011 pub fn account_id(&self, venue: &Venue) -> Option<AccountId> {
2012 self.cache().account_id(venue).copied()
2013 }
2014
2015 #[must_use]
2021 pub fn accounts(&self, account_id: &AccountId) -> Vec<AccountAny> {
2022 self.cache()
2023 .accounts(account_id)
2024 .into_iter()
2025 .map(|account| account.cloned())
2026 .collect()
2027 }
2028
2029 #[must_use]
2035 pub fn accounts_all(&self) -> Vec<AccountAny> {
2036 self.cache().accounts_all_owned()
2037 }
2038
2039 fn cache(&self) -> Ref<'_, Cache> {
2040 self.cache.borrow()
2041 }
2042}
2043
2044enum FilterSources<'a, K> {
2051 Unfiltered,
2052 Empty,
2053 Sets(Vec<&'a AHashSet<K>>),
2054}
2055
2056fn intersect_filter_sources<K>(mut sources: Vec<&AHashSet<K>>) -> AHashSet<K>
2062where
2063 K: Copy + Eq + std::hash::Hash,
2064{
2065 debug_assert!(!sources.is_empty());
2066 sources.sort_unstable_by_key(|s| s.len());
2067 let driver = sources[0];
2068 let rest = &sources[1..];
2069
2070 if rest.is_empty() {
2071 return driver.clone();
2072 }
2073
2074 driver
2075 .iter()
2076 .filter(|id| rest.iter().all(|s| s.contains(id)))
2077 .copied()
2078 .collect()
2079}
2080
2081fn intersect_pair_or_many<'a, K>(
2089 bucket: &'a AHashSet<K>,
2090 mut sources: Vec<&'a AHashSet<K>>,
2091) -> AHashSet<K>
2092where
2093 K: Copy + Eq + std::hash::Hash,
2094{
2095 debug_assert!(!sources.is_empty());
2096 if sources.len() == 1 {
2097 let filter = sources[0];
2098 let (larger, smaller) = if bucket.len() >= filter.len() {
2099 (bucket, filter)
2100 } else {
2101 (filter, bucket)
2102 };
2103 return larger.intersection(smaller).copied().collect();
2104 }
2105
2106 sources.push(bucket);
2107 intersect_filter_sources(sources)
2108}
2109
2110#[cfg_attr(
2112 feature = "python",
2113 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
2114)]
2115pub struct Cache {
2116 config: CacheConfig,
2117 index: CacheIndex,
2118 database: Option<Box<dyn CacheDatabaseAdapter>>,
2119 general: AHashMap<String, Bytes>,
2120 currencies: AHashMap<Ustr, Currency>,
2121 instruments: AHashMap<InstrumentId, InstrumentAny>,
2122 synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
2123 books: AHashMap<InstrumentId, OrderBook>,
2124 own_books: AHashMap<InstrumentId, OwnOrderBook>,
2125 quotes: AHashMap<InstrumentId, BoundedVecDeque<QuoteTick>>,
2126 trades: AHashMap<InstrumentId, BoundedVecDeque<TradeTick>>,
2127 mark_xrates: AHashMap<(Currency, Currency), f64>,
2128 mark_prices: AHashMap<InstrumentId, BoundedVecDeque<MarkPriceUpdate>>,
2129 index_prices: AHashMap<InstrumentId, BoundedVecDeque<IndexPriceUpdate>>,
2130 funding_rates: AHashMap<InstrumentId, BoundedVecDeque<FundingRateUpdate>>,
2131 instrument_statuses: AHashMap<InstrumentId, BoundedVecDeque<InstrumentStatus>>,
2132 bars: AHashMap<BarType, BoundedVecDeque<Bar>>,
2133 greeks: AHashMap<InstrumentId, GreeksData>,
2134 option_greeks: AHashMap<InstrumentId, OptionGreeks>,
2135 yield_curves: AHashMap<String, YieldCurveData>,
2136 accounts: AHashMap<AccountId, SharedCell<AccountAny>>,
2137 orders: AHashMap<ClientOrderId, SharedCell<OrderAny>>,
2138 order_lists: AHashMap<OrderListId, OrderList>,
2139 positions: AHashMap<PositionId, SharedCell<Position>>,
2140 position_snapshots: AHashMap<PositionId, Vec<Bytes>>,
2141 #[cfg(feature = "defi")]
2142 pub(crate) defi: crate::defi::cache::DefiCache,
2143}
2144
2145impl Debug for Cache {
2146 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2147 f.debug_struct(stringify!(Cache))
2148 .field("config", &self.config)
2149 .field("index", &self.index)
2150 .field("general", &self.general)
2151 .field("currencies", &self.currencies)
2152 .field("instruments", &self.instruments)
2153 .field("synthetics", &self.synthetics)
2154 .field("books", &self.books)
2155 .field("own_books", &self.own_books)
2156 .field("quotes", &self.quotes)
2157 .field("trades", &self.trades)
2158 .field("mark_xrates", &self.mark_xrates)
2159 .field("mark_prices", &self.mark_prices)
2160 .field("index_prices", &self.index_prices)
2161 .field("funding_rates", &self.funding_rates)
2162 .field("instrument_statuses", &self.instrument_statuses)
2163 .field("bars", &self.bars)
2164 .field("greeks", &self.greeks)
2165 .field("option_greeks", &self.option_greeks)
2166 .field("yield_curves", &self.yield_curves)
2167 .field("accounts", &self.accounts)
2168 .field("orders", &self.orders)
2169 .field("order_lists", &self.order_lists)
2170 .field("positions", &self.positions)
2171 .field("position_snapshots", &self.position_snapshots)
2172 .finish()
2173 }
2174}
2175
2176impl Default for Cache {
2177 fn default() -> Self {
2179 Self::new(Some(CacheConfig::default()), None)
2180 }
2181}
2182
2183impl Cache {
2184 #[must_use]
2186 pub fn new(
2194 config: Option<CacheConfig>,
2195 database: Option<Box<dyn CacheDatabaseAdapter>>,
2196 ) -> Self {
2197 let config = config.unwrap_or_default();
2198 config.validate().expect("invalid `CacheConfig`");
2199
2200 Self {
2201 config,
2202 index: CacheIndex::default(),
2203 database,
2204 general: AHashMap::new(),
2205 currencies: AHashMap::new(),
2206 instruments: AHashMap::new(),
2207 synthetics: AHashMap::new(),
2208 books: AHashMap::new(),
2209 own_books: AHashMap::new(),
2210 quotes: AHashMap::new(),
2211 trades: AHashMap::new(),
2212 mark_xrates: AHashMap::new(),
2213 mark_prices: AHashMap::new(),
2214 index_prices: AHashMap::new(),
2215 funding_rates: AHashMap::new(),
2216 instrument_statuses: AHashMap::new(),
2217 bars: AHashMap::new(),
2218 greeks: AHashMap::new(),
2219 option_greeks: AHashMap::new(),
2220 yield_curves: AHashMap::new(),
2221 accounts: AHashMap::new(),
2222 orders: AHashMap::new(),
2223 order_lists: AHashMap::new(),
2224 positions: AHashMap::new(),
2225 position_snapshots: AHashMap::new(),
2226 #[cfg(feature = "defi")]
2227 defi: crate::defi::cache::DefiCache::default(),
2228 }
2229 }
2230
2231 #[must_use]
2233 pub fn memory_address(&self) -> String {
2234 format!("{:?}", std::ptr::from_ref(self))
2235 }
2236
2237 pub fn set_database(&mut self, database: Box<dyn CacheDatabaseAdapter>) {
2241 let type_name = std::any::type_name_of_val(&*database);
2242 log::info!("Cache database adapter set: {type_name}");
2243 self.database = Some(database);
2244 }
2245
2246 pub fn cache_general(&mut self) -> anyhow::Result<()> {
2254 self.general = match &mut self.database {
2255 Some(db) => db.load()?,
2256 None => AHashMap::new(),
2257 };
2258
2259 log::info!(
2260 "Cached {} general object(s) from database",
2261 self.general.len()
2262 );
2263 Ok(())
2264 }
2265
2266 pub async fn cache_all(&mut self) -> anyhow::Result<()> {
2272 let cache_map = match &self.database {
2273 Some(db) => db.load_all().await?,
2274 None => CacheMap::default(),
2275 };
2276
2277 self.currencies = cache_map.currencies;
2278 self.instruments = cache_map.instruments;
2279 self.synthetics = cache_map.synthetics;
2280 self.accounts = cache_map
2281 .accounts
2282 .into_iter()
2283 .map(|(id, account)| (id, SharedCell::new(account)))
2284 .collect();
2285 self.orders = cache_map
2286 .orders
2287 .into_iter()
2288 .map(|(id, order)| (id, SharedCell::new(order)))
2289 .collect();
2290 self.positions = cache_map
2291 .positions
2292 .into_iter()
2293 .map(|(id, position)| (id, SharedCell::new(position)))
2294 .collect();
2295
2296 if let Some(db) = &self.database {
2297 self.index.order_position = db.load_index_order_position()?;
2298 self.index.order_client = db.load_index_order_client()?;
2299 }
2300
2301 self.assign_position_ids_to_contingencies();
2302 Ok(())
2303 }
2304
2305 pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
2311 self.currencies = match &mut self.database {
2312 Some(db) => db.load_currencies().await?,
2313 None => AHashMap::new(),
2314 };
2315
2316 log::info!("Cached {} currencies from database", self.general.len());
2317 Ok(())
2318 }
2319
2320 pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
2326 self.instruments = match &mut self.database {
2327 Some(db) => db.load_instruments().await?,
2328 None => AHashMap::new(),
2329 };
2330
2331 log::info!("Cached {} instruments from database", self.general.len());
2332 Ok(())
2333 }
2334
2335 pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
2341 self.synthetics = match &mut self.database {
2342 Some(db) => db.load_synthetics().await?,
2343 None => AHashMap::new(),
2344 };
2345
2346 log::info!(
2347 "Cached {} synthetic instruments from database",
2348 self.general.len()
2349 );
2350 Ok(())
2351 }
2352
2353 pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
2359 self.accounts = match &mut self.database {
2360 Some(db) => db
2361 .load_accounts()
2362 .await?
2363 .into_iter()
2364 .map(|(id, account)| (id, SharedCell::new(account)))
2365 .collect(),
2366 None => AHashMap::new(),
2367 };
2368
2369 log::info!(
2370 "Cached {} synthetic instruments from database",
2371 self.general.len()
2372 );
2373 Ok(())
2374 }
2375
2376 pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
2382 self.orders = match &mut self.database {
2383 Some(db) => db
2384 .load_orders()
2385 .await?
2386 .into_iter()
2387 .map(|(id, order)| (id, SharedCell::new(order)))
2388 .collect(),
2389 None => AHashMap::new(),
2390 };
2391
2392 if let Some(db) = &self.database {
2393 self.index.order_position = db.load_index_order_position()?;
2394 self.index.order_client = db.load_index_order_client()?;
2395 }
2396
2397 log::info!("Cached {} orders from database", self.general.len());
2398
2399 self.assign_position_ids_to_contingencies();
2400 Ok(())
2401 }
2402
2403 pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
2409 self.positions = match &mut self.database {
2410 Some(db) => db
2411 .load_positions()
2412 .await?
2413 .into_iter()
2414 .map(|(id, position)| (id, SharedCell::new(position)))
2415 .collect(),
2416 None => AHashMap::new(),
2417 };
2418
2419 log::info!("Cached {} positions from database", self.general.len());
2420 Ok(())
2421 }
2422
2423 pub fn build_index(&mut self) {
2425 log::debug!("Building index");
2426
2427 for account_id in self.accounts.keys() {
2429 self.index
2430 .venue_account
2431 .insert(account_id.get_issuer(), *account_id);
2432 }
2433
2434 for (client_order_id, order_cell) in &self.orders {
2436 let order = order_cell.borrow();
2437 let instrument_id = order.instrument_id();
2438 let venue = instrument_id.venue;
2439 let strategy_id = order.strategy_id();
2440
2441 self.index
2443 .venue_orders
2444 .entry(venue)
2445 .or_default()
2446 .insert(*client_order_id);
2447
2448 if let Some(venue_order_id) = order.venue_order_id() {
2450 self.index
2451 .venue_order_ids
2452 .insert(venue_order_id, *client_order_id);
2453 }
2454
2455 if let Some(position_id) = order.position_id() {
2457 self.index
2458 .order_position
2459 .insert(*client_order_id, position_id);
2460 }
2461
2462 self.index
2464 .order_strategy
2465 .insert(*client_order_id, order.strategy_id());
2466
2467 self.index
2469 .instrument_orders
2470 .entry(instrument_id)
2471 .or_default()
2472 .insert(*client_order_id);
2473
2474 self.index
2476 .strategy_orders
2477 .entry(strategy_id)
2478 .or_default()
2479 .insert(*client_order_id);
2480
2481 if let Some(account_id) = order.account_id() {
2483 self.index
2484 .account_orders
2485 .entry(account_id)
2486 .or_default()
2487 .insert(*client_order_id);
2488 }
2489
2490 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
2492 self.index
2493 .exec_algorithm_orders
2494 .entry(exec_algorithm_id)
2495 .or_default()
2496 .insert(*client_order_id);
2497 }
2498
2499 if let Some(exec_spawn_id) = order.exec_spawn_id() {
2501 self.index
2502 .exec_spawn_orders
2503 .entry(exec_spawn_id)
2504 .or_default()
2505 .insert(*client_order_id);
2506 }
2507
2508 self.index.orders.insert(*client_order_id);
2510
2511 if order.is_active_local() {
2513 self.index.orders_active_local.insert(*client_order_id);
2514 }
2515
2516 if order.is_open() {
2518 self.index.orders_open.insert(*client_order_id);
2519 }
2520
2521 if order.is_closed() {
2523 self.index.orders_closed.insert(*client_order_id);
2524 }
2525
2526 if let Some(emulation_trigger) = order.emulation_trigger()
2528 && emulation_trigger != TriggerType::NoTrigger
2529 && !order.is_closed()
2530 {
2531 self.index.orders_emulated.insert(*client_order_id);
2532 }
2533
2534 if order.is_inflight() {
2536 self.index.orders_inflight.insert(*client_order_id);
2537 }
2538
2539 self.index.strategies.insert(strategy_id);
2541
2542 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
2544 self.index.exec_algorithms.insert(exec_algorithm_id);
2545 }
2546 }
2547
2548 for (position_id, position_cell) in &self.positions {
2550 let position = position_cell.borrow();
2551 let instrument_id = position.instrument_id;
2552 let venue = instrument_id.venue;
2553 let strategy_id = position.strategy_id;
2554
2555 self.index
2557 .venue_positions
2558 .entry(venue)
2559 .or_default()
2560 .insert(*position_id);
2561
2562 self.index
2564 .position_strategy
2565 .insert(*position_id, position.strategy_id);
2566
2567 self.index
2569 .position_orders
2570 .entry(*position_id)
2571 .or_default()
2572 .extend(position.client_order_ids());
2573
2574 self.index
2576 .instrument_positions
2577 .entry(instrument_id)
2578 .or_default()
2579 .insert(*position_id);
2580
2581 self.index
2583 .strategy_positions
2584 .entry(strategy_id)
2585 .or_default()
2586 .insert(*position_id);
2587
2588 self.index
2590 .account_positions
2591 .entry(position.account_id)
2592 .or_default()
2593 .insert(*position_id);
2594
2595 self.index.positions.insert(*position_id);
2597
2598 if position.is_open() {
2600 self.index.positions_open.insert(*position_id);
2601 }
2602
2603 if position.is_closed() {
2605 self.index.positions_closed.insert(*position_id);
2606 }
2607
2608 self.index.strategies.insert(strategy_id);
2610 }
2611 }
2612
2613 #[must_use]
2615 pub const fn has_backing(&self) -> bool {
2616 self.database.is_some()
2617 }
2618
2619 #[must_use]
2621 pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
2622 let Some(quote) = self.quote(&position.instrument_id) else {
2623 log::warn!(
2624 "Cannot calculate unrealized PnL for {}, no quotes for {}",
2625 position.id,
2626 position.instrument_id
2627 );
2628 return None;
2629 };
2630
2631 let last = match position.side {
2633 PositionSide::Flat | PositionSide::NoPositionSide => {
2634 return Some(Money::zero(position.settlement_currency));
2635 }
2636 PositionSide::Long => quote.bid_price,
2637 PositionSide::Short => quote.ask_price,
2638 };
2639
2640 Some(position.unrealized_pnl(last))
2641 }
2642
2643 #[must_use]
2652 pub fn check_integrity(&mut self) -> bool {
2653 let mut error_count = 0;
2654 let failure = "Integrity failure";
2655
2656 let timestamp_us = SystemTime::now()
2658 .duration_since(UNIX_EPOCH)
2659 .expect("Time went backwards")
2660 .as_micros();
2661
2662 log::info!("Checking data integrity");
2663
2664 for account_id in self.accounts.keys() {
2666 if !self
2667 .index
2668 .venue_account
2669 .contains_key(&account_id.get_issuer())
2670 {
2671 log::error!(
2672 "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
2673 );
2674 error_count += 1;
2675 }
2676 }
2677
2678 for (client_order_id, order_cell) in &self.orders {
2679 let order = order_cell.borrow();
2680
2681 if !self.index.order_strategy.contains_key(client_order_id) {
2682 log::error!(
2683 "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
2684 );
2685 error_count += 1;
2686 }
2687
2688 if !self.index.orders.contains(client_order_id) {
2689 log::error!(
2690 "{failure} in orders: {client_order_id} not found in `self.index.orders`",
2691 );
2692 error_count += 1;
2693 }
2694
2695 if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
2696 log::error!(
2697 "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
2698 );
2699 error_count += 1;
2700 }
2701
2702 if order.is_active_local() && !self.index.orders_active_local.contains(client_order_id)
2703 {
2704 log::error!(
2705 "{failure} in orders: {client_order_id} not found in `self.index.orders_active_local`",
2706 );
2707 error_count += 1;
2708 }
2709
2710 if order.is_open() && !self.index.orders_open.contains(client_order_id) {
2711 log::error!(
2712 "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
2713 );
2714 error_count += 1;
2715 }
2716
2717 if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
2718 log::error!(
2719 "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
2720 );
2721 error_count += 1;
2722 }
2723
2724 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
2725 if !self
2726 .index
2727 .exec_algorithm_orders
2728 .contains_key(&exec_algorithm_id)
2729 {
2730 log::error!(
2731 "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
2732 );
2733 error_count += 1;
2734 }
2735
2736 if order.exec_spawn_id().is_none()
2737 && !self.index.exec_spawn_orders.contains_key(client_order_id)
2738 {
2739 log::error!(
2740 "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
2741 );
2742 error_count += 1;
2743 }
2744 }
2745 }
2746
2747 for (position_id, position_cell) in &self.positions {
2748 let position = position_cell.borrow();
2749
2750 if !self.index.position_strategy.contains_key(position_id) {
2751 log::error!(
2752 "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
2753 );
2754 error_count += 1;
2755 }
2756
2757 if !self.index.position_orders.contains_key(position_id) {
2758 log::error!(
2759 "{failure} in positions: {position_id} not found in `self.index.position_orders`",
2760 );
2761 error_count += 1;
2762 }
2763
2764 if !self.index.positions.contains(position_id) {
2765 log::error!(
2766 "{failure} in positions: {position_id} not found in `self.index.positions`",
2767 );
2768 error_count += 1;
2769 }
2770
2771 if position.is_open() && !self.index.positions_open.contains(position_id) {
2772 log::error!(
2773 "{failure} in positions: {position_id} not found in `self.index.positions_open`",
2774 );
2775 error_count += 1;
2776 }
2777
2778 if position.is_closed() && !self.index.positions_closed.contains(position_id) {
2779 log::error!(
2780 "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
2781 );
2782 error_count += 1;
2783 }
2784 }
2785
2786 for account_id in self.index.venue_account.values() {
2788 if !self.accounts.contains_key(account_id) {
2789 log::error!(
2790 "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
2791 );
2792 error_count += 1;
2793 }
2794 }
2795
2796 for client_order_id in self.index.venue_order_ids.values() {
2797 if !self.orders.contains_key(client_order_id) {
2798 log::error!(
2799 "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
2800 );
2801 error_count += 1;
2802 }
2803 }
2804
2805 for client_order_id in self.index.client_order_ids.keys() {
2806 if !self.orders.contains_key(client_order_id) {
2807 log::error!(
2808 "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
2809 );
2810 error_count += 1;
2811 }
2812 }
2813
2814 for client_order_id in self.index.order_position.keys() {
2815 if !self.orders.contains_key(client_order_id) {
2816 log::error!(
2817 "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
2818 );
2819 error_count += 1;
2820 }
2821 }
2822
2823 for client_order_id in self.index.order_strategy.keys() {
2825 if !self.orders.contains_key(client_order_id) {
2826 log::error!(
2827 "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
2828 );
2829 error_count += 1;
2830 }
2831 }
2832
2833 for position_id in self.index.position_strategy.keys() {
2834 if !self.positions.contains_key(position_id) {
2835 log::error!(
2836 "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
2837 );
2838 error_count += 1;
2839 }
2840 }
2841
2842 for position_id in self.index.position_orders.keys() {
2843 if !self.positions.contains_key(position_id) {
2844 log::error!(
2845 "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
2846 );
2847 error_count += 1;
2848 }
2849 }
2850
2851 for (instrument_id, client_order_ids) in &self.index.instrument_orders {
2852 for client_order_id in client_order_ids {
2853 if !self.orders.contains_key(client_order_id) {
2854 log::error!(
2855 "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
2856 );
2857 error_count += 1;
2858 }
2859 }
2860 }
2861
2862 for instrument_id in self.index.instrument_positions.keys() {
2863 if !self.index.instrument_orders.contains_key(instrument_id) {
2864 log::error!(
2865 "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
2866 );
2867 error_count += 1;
2868 }
2869 }
2870
2871 for client_order_ids in self.index.strategy_orders.values() {
2872 for client_order_id in client_order_ids {
2873 if !self.orders.contains_key(client_order_id) {
2874 log::error!(
2875 "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
2876 );
2877 error_count += 1;
2878 }
2879 }
2880 }
2881
2882 for position_ids in self.index.strategy_positions.values() {
2883 for position_id in position_ids {
2884 if !self.positions.contains_key(position_id) {
2885 log::error!(
2886 "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
2887 );
2888 error_count += 1;
2889 }
2890 }
2891 }
2892
2893 for client_order_id in &self.index.orders {
2894 if !self.orders.contains_key(client_order_id) {
2895 log::error!(
2896 "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
2897 );
2898 error_count += 1;
2899 }
2900 }
2901
2902 for client_order_id in &self.index.orders_emulated {
2903 if !self.orders.contains_key(client_order_id) {
2904 log::error!(
2905 "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
2906 );
2907 error_count += 1;
2908 }
2909 }
2910
2911 for client_order_id in &self.index.orders_active_local {
2912 if !self.orders.contains_key(client_order_id) {
2913 log::error!(
2914 "{failure} in `index.orders_active_local`: {client_order_id} not found in `self.orders`",
2915 );
2916 error_count += 1;
2917 }
2918 }
2919
2920 for client_order_id in &self.index.orders_inflight {
2921 if !self.orders.contains_key(client_order_id) {
2922 log::error!(
2923 "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
2924 );
2925 error_count += 1;
2926 }
2927 }
2928
2929 for client_order_id in &self.index.orders_open {
2930 if !self.orders.contains_key(client_order_id) {
2931 log::error!(
2932 "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
2933 );
2934 error_count += 1;
2935 }
2936 }
2937
2938 for client_order_id in &self.index.orders_closed {
2939 if !self.orders.contains_key(client_order_id) {
2940 log::error!(
2941 "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
2942 );
2943 error_count += 1;
2944 }
2945 }
2946
2947 for position_id in &self.index.positions {
2948 if !self.positions.contains_key(position_id) {
2949 log::error!(
2950 "{failure} in `index.positions`: {position_id} not found in `self.positions`",
2951 );
2952 error_count += 1;
2953 }
2954 }
2955
2956 for position_id in &self.index.positions_open {
2957 if !self.positions.contains_key(position_id) {
2958 log::error!(
2959 "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
2960 );
2961 error_count += 1;
2962 }
2963 }
2964
2965 for position_id in &self.index.positions_closed {
2966 if !self.positions.contains_key(position_id) {
2967 log::error!(
2968 "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
2969 );
2970 error_count += 1;
2971 }
2972 }
2973
2974 for strategy_id in &self.index.strategies {
2975 if !self.index.strategy_orders.contains_key(strategy_id) {
2976 log::error!(
2977 "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
2978 );
2979 error_count += 1;
2980 }
2981 }
2982
2983 for exec_algorithm_id in &self.index.exec_algorithms {
2984 if !self
2985 .index
2986 .exec_algorithm_orders
2987 .contains_key(exec_algorithm_id)
2988 {
2989 log::error!(
2990 "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
2991 );
2992 error_count += 1;
2993 }
2994 }
2995
2996 let total_us = SystemTime::now()
2997 .duration_since(UNIX_EPOCH)
2998 .expect("Time went backwards")
2999 .as_micros()
3000 - timestamp_us;
3001
3002 if error_count == 0 {
3003 log::info!("Integrity check passed in {total_us}μs");
3004 true
3005 } else {
3006 log::error!(
3007 "Integrity check failed with {error_count} error{} in {total_us}μs",
3008 if error_count == 1 { "" } else { "s" },
3009 );
3010 false
3011 }
3012 }
3013
3014 #[must_use]
3018 pub fn check_residuals(&self) -> bool {
3019 log::debug!("Checking residuals");
3020
3021 let mut residuals = false;
3022
3023 for order in self.orders_open(None, None, None, None, None) {
3025 residuals = true;
3026 log::warn!("Residual {order}");
3027 }
3028
3029 for position in self.positions_open(None, None, None, None, None) {
3031 residuals = true;
3032 log::warn!("Residual {position}");
3033 }
3034
3035 residuals
3036 }
3037
3038 pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
3044 log::debug!(
3045 "Purging closed orders{}",
3046 if buffer_secs > 0 {
3047 format!(" with buffer_secs={buffer_secs}")
3048 } else {
3049 String::new()
3050 }
3051 );
3052
3053 let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
3054
3055 let mut affected_order_list_ids: AHashSet<OrderListId> = AHashSet::new();
3056
3057 'outer: for client_order_id in self.index.orders_closed.clone() {
3058 let purge_target = self.orders.get(&client_order_id).and_then(|order_cell| {
3059 let order = order_cell.borrow();
3060 if order.is_closed()
3061 && let Some(ts_closed) = order.ts_closed()
3062 && ts_closed + buffer_ns <= ts_now
3063 {
3064 let linked = order.linked_order_ids().map(<[_]>::to_vec);
3065 let order_list_id = order.order_list_id();
3066 Some((linked, order_list_id))
3067 } else {
3068 None
3069 }
3070 });
3071
3072 let Some((linked, order_list_id)) = purge_target else {
3073 continue;
3074 };
3075
3076 if let Some(linked_order_ids) = linked {
3078 for linked_order_id in &linked_order_ids {
3079 if let Some(linked_order_cell) = self.orders.get(linked_order_id)
3080 && linked_order_cell.borrow().is_open()
3081 {
3082 continue 'outer;
3084 }
3085 }
3086 }
3087
3088 if let Some(order_list_id) = order_list_id {
3089 affected_order_list_ids.insert(order_list_id);
3090 }
3091
3092 self.purge_order(client_order_id);
3093 }
3094
3095 for order_list_id in affected_order_list_ids {
3096 if let Some(order_list) = self.order_lists.get(&order_list_id) {
3097 let all_purged = order_list
3098 .client_order_ids
3099 .iter()
3100 .all(|id| !self.orders.contains_key(id));
3101
3102 if all_purged {
3103 self.order_lists.remove(&order_list_id);
3104 log::info!("Purged {order_list_id}");
3105 }
3106 }
3107 }
3108 }
3109
3110 pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
3112 log::debug!(
3113 "Purging closed positions{}",
3114 if buffer_secs > 0 {
3115 format!(" with buffer_secs={buffer_secs}")
3116 } else {
3117 String::new()
3118 }
3119 );
3120
3121 let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
3122
3123 for position_id in self.index.positions_closed.clone() {
3124 let should_purge = self.positions.get(&position_id).is_some_and(|cell| {
3125 let position = cell.borrow();
3126 position.is_closed()
3127 && position
3128 .ts_closed
3129 .is_some_and(|ts_closed| ts_closed + buffer_ns <= ts_now)
3130 });
3131
3132 if should_purge {
3133 self.purge_position(position_id);
3134 }
3135 }
3136 }
3137
3138 pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
3142 let order_cell = self.orders.get(&client_order_id).cloned();
3144
3145 if let Some(ref order_cell) = order_cell
3147 && order_cell.borrow().is_open()
3148 {
3149 log::warn!("Order {client_order_id} found open when purging, skipping purge");
3150 return;
3151 }
3152
3153 if let Some(ref order_cell) = order_cell {
3155 let order = order_cell.borrow();
3156 self.orders.remove(&client_order_id);
3158
3159 if let Some(venue_orders) = self
3161 .index
3162 .venue_orders
3163 .get_mut(&order.instrument_id().venue)
3164 {
3165 venue_orders.remove(&client_order_id);
3166 if venue_orders.is_empty() {
3167 self.index.venue_orders.remove(&order.instrument_id().venue);
3168 }
3169 }
3170
3171 if let Some(venue_order_id) = order.venue_order_id() {
3173 self.index.venue_order_ids.remove(&venue_order_id);
3174 }
3175
3176 if let Some(instrument_orders) =
3178 self.index.instrument_orders.get_mut(&order.instrument_id())
3179 {
3180 instrument_orders.remove(&client_order_id);
3181 if instrument_orders.is_empty() {
3182 self.index.instrument_orders.remove(&order.instrument_id());
3183 }
3184 }
3185
3186 if let Some(position_id) = order.position_id()
3188 && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
3189 {
3190 position_orders.remove(&client_order_id);
3191 if position_orders.is_empty() {
3192 self.index.position_orders.remove(&position_id);
3193 }
3194 }
3195
3196 if let Some(exec_algorithm_id) = order.exec_algorithm_id()
3198 && let Some(exec_algorithm_orders) =
3199 self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
3200 {
3201 exec_algorithm_orders.remove(&client_order_id);
3202 if exec_algorithm_orders.is_empty() {
3203 self.index.exec_algorithm_orders.remove(&exec_algorithm_id);
3204 }
3205 }
3206
3207 if let Some(strategy_orders) = self.index.strategy_orders.get_mut(&order.strategy_id())
3209 {
3210 strategy_orders.remove(&client_order_id);
3211 if strategy_orders.is_empty() {
3212 self.index.strategy_orders.remove(&order.strategy_id());
3213 }
3214 }
3215
3216 if let Some(account_id) = order.account_id()
3218 && let Some(account_orders) = self.index.account_orders.get_mut(&account_id)
3219 {
3220 account_orders.remove(&client_order_id);
3221 if account_orders.is_empty() {
3222 self.index.account_orders.remove(&account_id);
3223 }
3224 }
3225
3226 if let Some(exec_spawn_id) = order.exec_spawn_id()
3228 && let Some(spawn_orders) = self.index.exec_spawn_orders.get_mut(&exec_spawn_id)
3229 {
3230 spawn_orders.remove(&client_order_id);
3231 if spawn_orders.is_empty() {
3232 self.index.exec_spawn_orders.remove(&exec_spawn_id);
3233 }
3234 }
3235
3236 log::info!("Purged order {client_order_id}");
3237 } else {
3238 log::warn!("Order {client_order_id} not found when purging");
3239 }
3240
3241 self.index.order_position.remove(&client_order_id);
3243 let strategy_id = self.index.order_strategy.remove(&client_order_id);
3244 self.index.order_client.remove(&client_order_id);
3245 self.index.client_order_ids.remove(&client_order_id);
3246
3247 if let Some(strategy_id) = strategy_id
3249 && let Some(strategy_orders) = self.index.strategy_orders.get_mut(&strategy_id)
3250 {
3251 strategy_orders.remove(&client_order_id);
3252 if strategy_orders.is_empty() {
3253 self.index.strategy_orders.remove(&strategy_id);
3254 }
3255 }
3256
3257 self.index.exec_spawn_orders.remove(&client_order_id);
3259
3260 self.index.orders.remove(&client_order_id);
3261 self.index.orders_active_local.remove(&client_order_id);
3262 self.index.orders_open.remove(&client_order_id);
3263 self.index.orders_closed.remove(&client_order_id);
3264 self.index.orders_emulated.remove(&client_order_id);
3265 self.index.orders_inflight.remove(&client_order_id);
3266 self.index.orders_pending_cancel.remove(&client_order_id);
3267 }
3268
3269 pub fn purge_position(&mut self, position_id: PositionId) {
3273 let position = self
3275 .positions
3276 .get(&position_id)
3277 .map(|cell| cell.borrow().clone());
3278
3279 if let Some(ref pos) = position
3281 && pos.is_open()
3282 {
3283 log::warn!("Position {position_id} found open when purging, skipping purge");
3284 return;
3285 }
3286
3287 if let Some(ref pos) = position {
3289 self.positions.remove(&position_id);
3290
3291 if let Some(venue_positions) =
3293 self.index.venue_positions.get_mut(&pos.instrument_id.venue)
3294 {
3295 venue_positions.remove(&position_id);
3296 if venue_positions.is_empty() {
3297 self.index.venue_positions.remove(&pos.instrument_id.venue);
3298 }
3299 }
3300
3301 if let Some(instrument_positions) =
3303 self.index.instrument_positions.get_mut(&pos.instrument_id)
3304 {
3305 instrument_positions.remove(&position_id);
3306 if instrument_positions.is_empty() {
3307 self.index.instrument_positions.remove(&pos.instrument_id);
3308 }
3309 }
3310
3311 if let Some(strategy_positions) =
3313 self.index.strategy_positions.get_mut(&pos.strategy_id)
3314 {
3315 strategy_positions.remove(&position_id);
3316 if strategy_positions.is_empty() {
3317 self.index.strategy_positions.remove(&pos.strategy_id);
3318 }
3319 }
3320
3321 if let Some(account_positions) = self.index.account_positions.get_mut(&pos.account_id) {
3323 account_positions.remove(&position_id);
3324 if account_positions.is_empty() {
3325 self.index.account_positions.remove(&pos.account_id);
3326 }
3327 }
3328
3329 for client_order_id in pos.client_order_ids() {
3331 self.index.order_position.remove(&client_order_id);
3332 }
3333
3334 log::info!("Purged position {position_id}");
3335 } else {
3336 log::warn!("Position {position_id} not found when purging");
3337 }
3338
3339 self.index.position_strategy.remove(&position_id);
3341 self.index.position_orders.remove(&position_id);
3342 self.index.positions.remove(&position_id);
3343 self.index.positions_open.remove(&position_id);
3344 self.index.positions_closed.remove(&position_id);
3345
3346 self.position_snapshots.remove(&position_id);
3348 }
3349
3350 fn purge_instrument_inner(&mut self, instrument_id: InstrumentId, skip_order_guard: bool) {
3374 #[cfg(feature = "defi")]
3375 let defi_found = self.defi.pools.contains_key(&instrument_id)
3376 || self.defi.pool_profilers.contains_key(&instrument_id);
3377 #[cfg(not(feature = "defi"))]
3378 let defi_found = false;
3379
3380 let found = self.instruments.contains_key(&instrument_id)
3381 || self.synthetics.contains_key(&instrument_id)
3382 || defi_found;
3383
3384 if !found {
3385 log::warn!("Instrument {instrument_id} not found when purging");
3386 return;
3387 }
3388
3389 if !skip_order_guard && let Some(orders) = self.index.instrument_orders.get(&instrument_id)
3390 {
3391 let has_non_terminal = orders
3392 .iter()
3393 .any(|client_order_id| !self.index.orders_closed.contains(client_order_id));
3394
3395 if has_non_terminal {
3396 log::warn!(
3397 "Instrument {instrument_id} has non-terminal orders when purging, skipping purge"
3398 );
3399 return;
3400 }
3401 }
3402
3403 if let Some(positions) = self.index.instrument_positions.get(&instrument_id) {
3404 let has_non_closed = positions
3405 .iter()
3406 .any(|position_id| !self.index.positions_closed.contains(position_id));
3407
3408 if has_non_closed {
3409 log::warn!(
3410 "Instrument {instrument_id} has non-closed positions when purging, skipping purge"
3411 );
3412 return;
3413 }
3414 }
3415
3416 self.instruments.remove(&instrument_id);
3417 self.synthetics.remove(&instrument_id);
3418 self.books.remove(&instrument_id);
3419 self.own_books.remove(&instrument_id);
3420 self.quotes.remove(&instrument_id);
3421 self.trades.remove(&instrument_id);
3422 self.mark_prices.remove(&instrument_id);
3423 self.index_prices.remove(&instrument_id);
3424 self.funding_rates.remove(&instrument_id);
3425 self.instrument_statuses.remove(&instrument_id);
3426 self.greeks.remove(&instrument_id);
3427 self.option_greeks.remove(&instrument_id);
3428
3429 self.bars
3430 .retain(|bar_type, _| bar_type.instrument_id() != instrument_id);
3431
3432 #[cfg(feature = "defi")]
3433 {
3434 self.defi.pools.remove(&instrument_id);
3435 self.defi.pool_profilers.remove(&instrument_id);
3436 }
3437
3438 self.index.instrument_orders.remove(&instrument_id);
3439 self.index.instrument_positions.remove(&instrument_id);
3440
3441 log::info!("Purged instrument {instrument_id}");
3442 }
3443
3444 pub fn purge_instrument(&mut self, instrument_id: InstrumentId) {
3449 self.purge_instrument_inner(instrument_id, false);
3450 }
3451
3452 pub fn purge_instrument_skip_order_guard(&mut self, instrument_id: InstrumentId) {
3461 self.purge_instrument_inner(instrument_id, true);
3462 }
3463
3464 pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
3469 log::debug!(
3470 "Purging account events{}",
3471 if lookback_secs > 0 {
3472 format!(" with lookback_secs={lookback_secs}")
3473 } else {
3474 String::new()
3475 }
3476 );
3477
3478 for account_cell in self.accounts.values() {
3479 let mut account = account_cell.borrow_mut();
3480 let event_count = account.event_count();
3481 account.purge_account_events(ts_now, lookback_secs);
3482 let count_diff = event_count - account.event_count();
3483 if count_diff > 0 {
3484 log::info!(
3485 "Purged {} event(s) from account {}",
3486 count_diff,
3487 account.id()
3488 );
3489 }
3490 }
3491 }
3492
3493 pub fn clear_index(&mut self) {
3495 self.index.clear();
3496 log::debug!("Cleared index");
3497 }
3498
3499 pub fn reset(&mut self) {
3505 log::debug!("Resetting cache");
3506
3507 self.general.clear();
3508 self.books.clear();
3509 self.own_books.clear();
3510 self.quotes.clear();
3511 self.trades.clear();
3512 self.mark_xrates.clear();
3513 self.mark_prices.clear();
3514 self.index_prices.clear();
3515 self.funding_rates.clear();
3516 self.instrument_statuses.clear();
3517 self.bars.clear();
3518 self.accounts.clear();
3519 self.orders.clear();
3520 self.order_lists.clear();
3521 self.positions.clear();
3522 self.position_snapshots.clear();
3523 self.greeks.clear();
3524 self.yield_curves.clear();
3525
3526 if self.config.drop_instruments_on_reset {
3527 self.currencies.clear();
3528 self.instruments.clear();
3529 self.synthetics.clear();
3530 }
3531
3532 #[cfg(feature = "defi")]
3533 {
3534 self.defi.pools.clear();
3535 self.defi.pool_profilers.clear();
3536 }
3537
3538 self.clear_index();
3539
3540 log::info!("Reset cache");
3541 }
3542
3543 pub fn dispose(&mut self) {
3547 self.reset();
3548
3549 if let Some(database) = &mut self.database
3550 && let Err(e) = database.close()
3551 {
3552 log::error!("Failed to close database during dispose: {e}");
3553 }
3554 }
3555
3556 pub fn flush_db(&mut self) {
3560 if let Some(database) = &mut self.database
3561 && let Err(e) = database.flush()
3562 {
3563 log::error!("Failed to flush database: {e}");
3564 }
3565 }
3566
3567 pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
3575 check_valid_string_ascii(key, stringify!(key))?;
3576 check_predicate_false(value.is_empty(), stringify!(value))?;
3577
3578 log::debug!("Adding general {key}");
3579 self.general.insert(key.to_string(), value.clone());
3580
3581 if let Some(database) = &mut self.database {
3582 database.add(key.to_string(), value)?;
3583 }
3584 Ok(())
3585 }
3586
3587 pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
3593 log::debug!("Adding `OrderBook` {}", book.instrument_id);
3594
3595 if self.config.save_market_data
3596 && let Some(database) = &mut self.database
3597 {
3598 database.add_order_book(&book)?;
3599 }
3600
3601 self.books.insert(book.instrument_id, book);
3602 Ok(())
3603 }
3604
3605 pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
3611 log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
3612
3613 self.own_books.insert(own_book.instrument_id, own_book);
3614 Ok(())
3615 }
3616
3617 pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
3623 log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
3624
3625 if self.config.save_market_data {
3626 }
3628
3629 let mark_prices_deque = self
3630 .mark_prices
3631 .entry(mark_price.instrument_id)
3632 .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
3633 mark_prices_deque.push_front(mark_price);
3634 Ok(())
3635 }
3636
3637 pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
3643 log::debug!(
3644 "Adding `IndexPriceUpdate` for {}",
3645 index_price.instrument_id
3646 );
3647
3648 if self.config.save_market_data {
3649 }
3651
3652 let index_prices_deque = self
3653 .index_prices
3654 .entry(index_price.instrument_id)
3655 .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
3656 index_prices_deque.push_front(index_price);
3657 Ok(())
3658 }
3659
3660 pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
3666 log::debug!(
3667 "Adding `FundingRateUpdate` for {}",
3668 funding_rate.instrument_id
3669 );
3670
3671 if self.config.save_market_data {
3672 }
3674
3675 let funding_rates_deque = self
3676 .funding_rates
3677 .entry(funding_rate.instrument_id)
3678 .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
3679 funding_rates_deque.push_front(funding_rate);
3680 Ok(())
3681 }
3682
3683 pub fn add_funding_rates(&mut self, funding_rates: &[FundingRateUpdate]) -> anyhow::Result<()> {
3689 check_slice_not_empty(funding_rates, stringify!(funding_rates))?;
3690
3691 let instrument_id = funding_rates[0].instrument_id;
3692 log::debug!(
3693 "Adding `FundingRateUpdate`[{}] {instrument_id}",
3694 funding_rates.len()
3695 );
3696
3697 if self.config.save_market_data
3698 && let Some(database) = &mut self.database
3699 {
3700 for funding_rate in funding_rates {
3701 database.add_funding_rate(funding_rate)?;
3702 }
3703 }
3704
3705 let funding_rate_deque = self
3706 .funding_rates
3707 .entry(instrument_id)
3708 .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
3709
3710 for funding_rate in funding_rates {
3711 funding_rate_deque.push_front(*funding_rate);
3712 }
3713 Ok(())
3714 }
3715
3716 pub fn add_instrument_status(&mut self, status: InstrumentStatus) -> anyhow::Result<()> {
3722 log::debug!("Adding `InstrumentStatus` for {}", status.instrument_id);
3723
3724 if self.config.save_market_data {
3725 }
3727
3728 let statuses_deque = self
3729 .instrument_statuses
3730 .entry(status.instrument_id)
3731 .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
3732 statuses_deque.push_front(status);
3733 Ok(())
3734 }
3735
3736 pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
3742 log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
3743
3744 if self.config.save_market_data
3745 && let Some(database) = &mut self.database
3746 {
3747 database.add_quote("e)?;
3748 }
3749
3750 let quotes_deque = self
3751 .quotes
3752 .entry(quote.instrument_id)
3753 .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
3754 quotes_deque.push_front(quote);
3755 Ok(())
3756 }
3757
3758 pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
3764 check_slice_not_empty(quotes, stringify!(quotes))?;
3765
3766 let instrument_id = quotes[0].instrument_id;
3767 log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
3768
3769 if self.config.save_market_data
3770 && let Some(database) = &mut self.database
3771 {
3772 for quote in quotes {
3773 database.add_quote(quote)?;
3774 }
3775 }
3776
3777 let quotes_deque = self
3778 .quotes
3779 .entry(instrument_id)
3780 .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
3781
3782 for quote in quotes {
3783 quotes_deque.push_front(*quote);
3784 }
3785 Ok(())
3786 }
3787
3788 pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
3794 log::debug!("Adding `TradeTick` {}", trade.instrument_id);
3795
3796 if self.config.save_market_data
3797 && let Some(database) = &mut self.database
3798 {
3799 database.add_trade(&trade)?;
3800 }
3801
3802 let trades_deque = self
3803 .trades
3804 .entry(trade.instrument_id)
3805 .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
3806 trades_deque.push_front(trade);
3807 Ok(())
3808 }
3809
3810 pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
3816 check_slice_not_empty(trades, stringify!(trades))?;
3817
3818 let instrument_id = trades[0].instrument_id;
3819 log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
3820
3821 if self.config.save_market_data
3822 && let Some(database) = &mut self.database
3823 {
3824 for trade in trades {
3825 database.add_trade(trade)?;
3826 }
3827 }
3828
3829 let trades_deque = self
3830 .trades
3831 .entry(instrument_id)
3832 .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
3833
3834 for trade in trades {
3835 trades_deque.push_front(*trade);
3836 }
3837 Ok(())
3838 }
3839
3840 pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
3846 log::debug!("Adding `Bar` {}", bar.bar_type);
3847
3848 if self.config.save_market_data
3849 && let Some(database) = &mut self.database
3850 {
3851 database.add_bar(&bar)?;
3852 }
3853
3854 let bars = self
3855 .bars
3856 .entry(bar.bar_type)
3857 .or_insert_with(|| BoundedVecDeque::new(self.config.bar_capacity));
3858 bars.push_front(bar);
3859 Ok(())
3860 }
3861
3862 pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
3868 check_slice_not_empty(bars, stringify!(bars))?;
3869
3870 let bar_type = bars[0].bar_type;
3871 log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
3872
3873 if self.config.save_market_data
3874 && let Some(database) = &mut self.database
3875 {
3876 for bar in bars {
3877 database.add_bar(bar)?;
3878 }
3879 }
3880
3881 let bars_deque = self
3882 .bars
3883 .entry(bar_type)
3884 .or_insert_with(|| BoundedVecDeque::new(self.config.bar_capacity));
3885
3886 for bar in bars {
3887 bars_deque.push_front(*bar);
3888 }
3889 Ok(())
3890 }
3891
3892 pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
3898 log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
3899
3900 if self.config.save_market_data
3901 && let Some(_database) = &mut self.database
3902 {
3903 }
3905
3906 self.greeks.insert(greeks.instrument_id, greeks);
3907 Ok(())
3908 }
3909
3910 pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
3912 self.greeks.get(instrument_id).cloned()
3913 }
3914
3915 pub fn add_option_greeks(&mut self, greeks: OptionGreeks) {
3917 log::debug!("Adding `OptionGreeks` {}", greeks.instrument_id);
3918 self.option_greeks.insert(greeks.instrument_id, greeks);
3919 }
3920
3921 #[must_use]
3923 pub fn option_greeks(&self, instrument_id: &InstrumentId) -> Option<&OptionGreeks> {
3924 self.option_greeks.get(instrument_id)
3925 }
3926
3927 pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
3933 log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
3934
3935 if self.config.save_market_data
3936 && let Some(_database) = &mut self.database
3937 {
3938 }
3940
3941 self.yield_curves
3942 .insert(yield_curve.curve_name.clone(), yield_curve);
3943 Ok(())
3944 }
3945
3946 pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
3948 self.yield_curves.get(key).map(|curve| {
3949 let curve_clone = curve.clone();
3950 Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
3951 as Box<dyn Fn(f64) -> f64>
3952 })
3953 }
3954
3955 pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
3961 if self.currencies.contains_key(¤cy.code) {
3962 return Ok(());
3963 }
3964 log::debug!("Adding `Currency` {}", currency.code);
3965
3966 if let Some(database) = &mut self.database {
3967 database.add_currency(¤cy)?;
3968 }
3969
3970 self.currencies.insert(currency.code, currency);
3971 Ok(())
3972 }
3973
3974 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
3980 log::debug!("Adding `Instrument` {}", instrument.id());
3981
3982 if let Some(base_currency) = instrument.base_currency() {
3984 self.add_currency(base_currency)?;
3985 }
3986 self.add_currency(instrument.quote_currency())?;
3987 self.add_currency(instrument.settlement_currency())?;
3988
3989 if let Some(database) = &mut self.database {
3990 database.add_instrument(&instrument)?;
3991 }
3992
3993 self.instruments.insert(instrument.id(), instrument);
3994 Ok(())
3995 }
3996
3997 pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
4003 log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
4004
4005 if let Some(database) = &mut self.database {
4006 database.add_synthetic(&synthetic)?;
4007 }
4008
4009 self.synthetics.insert(synthetic.id, synthetic);
4010 Ok(())
4011 }
4012
4013 pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
4019 log::debug!("Adding `Account` {}", account.id());
4020
4021 if let Some(database) = &mut self.database {
4022 database.add_account(&account)?;
4023 }
4024
4025 let account_id = account.id();
4026 self.accounts.insert(account_id, SharedCell::new(account));
4027 self.index
4028 .venue_account
4029 .insert(account_id.get_issuer(), account_id);
4030 Ok(())
4031 }
4032
4033 pub fn add_venue_order_id(
4041 &mut self,
4042 client_order_id: &ClientOrderId,
4043 venue_order_id: &VenueOrderId,
4044 overwrite: bool,
4045 ) -> anyhow::Result<()> {
4046 if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
4047 && !overwrite
4048 && existing_venue_order_id != venue_order_id
4049 {
4050 anyhow::bail!(
4051 "Existing {existing_venue_order_id} for {client_order_id}
4052 did not match the given {venue_order_id}.
4053 If you are writing a test then try a different `venue_order_id`,
4054 otherwise this is probably a bug."
4055 );
4056 }
4057
4058 self.index
4059 .client_order_ids
4060 .insert(*client_order_id, *venue_order_id);
4061 self.index
4062 .venue_order_ids
4063 .insert(*venue_order_id, *client_order_id);
4064
4065 Ok(())
4066 }
4067
4068 pub fn add_order(
4080 &mut self,
4081 order: OrderAny,
4082 position_id: Option<PositionId>,
4083 client_id: Option<ClientId>,
4084 replace_existing: bool,
4085 ) -> anyhow::Result<()> {
4086 let instrument_id = order.instrument_id();
4087 let venue = instrument_id.venue;
4088 let client_order_id = order.client_order_id();
4089 let strategy_id = order.strategy_id();
4090 let exec_algorithm_id = order.exec_algorithm_id();
4091 let exec_spawn_id = order.exec_spawn_id();
4092
4093 if !replace_existing {
4094 check_key_not_in_map(
4095 &client_order_id,
4096 &self.orders,
4097 stringify!(client_order_id),
4098 stringify!(orders),
4099 )?;
4100 }
4101
4102 log::debug!("Adding {order:?}");
4103
4104 self.index.orders.insert(client_order_id);
4105
4106 if order.is_active_local() {
4107 self.index.orders_active_local.insert(client_order_id);
4108 }
4109 self.index
4110 .order_strategy
4111 .insert(client_order_id, strategy_id);
4112 self.index.strategies.insert(strategy_id);
4113
4114 self.index
4116 .venue_orders
4117 .entry(venue)
4118 .or_default()
4119 .insert(client_order_id);
4120
4121 self.index
4123 .instrument_orders
4124 .entry(instrument_id)
4125 .or_default()
4126 .insert(client_order_id);
4127
4128 self.index
4130 .strategy_orders
4131 .entry(strategy_id)
4132 .or_default()
4133 .insert(client_order_id);
4134
4135 if let Some(account_id) = order.account_id() {
4137 self.index
4138 .account_orders
4139 .entry(account_id)
4140 .or_default()
4141 .insert(client_order_id);
4142 }
4143
4144 if let Some(exec_algorithm_id) = exec_algorithm_id {
4146 self.index.exec_algorithms.insert(exec_algorithm_id);
4147
4148 self.index
4149 .exec_algorithm_orders
4150 .entry(exec_algorithm_id)
4151 .or_default()
4152 .insert(client_order_id);
4153 }
4154
4155 if let Some(exec_spawn_id) = exec_spawn_id {
4157 self.index
4158 .exec_spawn_orders
4159 .entry(exec_spawn_id)
4160 .or_default()
4161 .insert(client_order_id);
4162 }
4163
4164 if let Some(emulation_trigger) = order.emulation_trigger()
4166 && emulation_trigger != TriggerType::NoTrigger
4167 {
4168 self.index.orders_emulated.insert(client_order_id);
4169 }
4170
4171 if let Some(position_id) = position_id {
4173 self.add_position_id(
4174 &position_id,
4175 &order.instrument_id().venue,
4176 &client_order_id,
4177 &strategy_id,
4178 )?;
4179 }
4180
4181 if let Some(client_id) = client_id {
4183 self.index.order_client.insert(client_order_id, client_id);
4184 log::debug!("Indexed {client_id:?}");
4185 }
4186
4187 if let Some(database) = &mut self.database {
4188 database.add_order(&order, client_id)?;
4189 }
4194
4195 match self.orders.get(&client_order_id) {
4196 Some(order_cell) => *order_cell.borrow_mut() = order,
4199 None => {
4200 self.orders.insert(client_order_id, SharedCell::new(order));
4201 }
4202 }
4203
4204 Ok(())
4205 }
4206
4207 pub fn add_order_list(&mut self, order_list: OrderList) -> anyhow::Result<()> {
4213 let order_list_id = order_list.id;
4214 check_key_not_in_map(
4215 &order_list_id,
4216 &self.order_lists,
4217 stringify!(order_list_id),
4218 stringify!(order_lists),
4219 )?;
4220
4221 log::debug!("Adding {order_list:?}");
4222 self.order_lists.insert(order_list_id, order_list);
4223 Ok(())
4224 }
4225
4226 pub fn add_position_id(
4232 &mut self,
4233 position_id: &PositionId,
4234 venue: &Venue,
4235 client_order_id: &ClientOrderId,
4236 strategy_id: &StrategyId,
4237 ) -> anyhow::Result<()> {
4238 self.index
4239 .order_position
4240 .insert(*client_order_id, *position_id);
4241
4242 if let Some(database) = &mut self.database {
4244 database.index_order_position(*client_order_id, *position_id)?;
4245 }
4246
4247 self.index
4249 .position_strategy
4250 .insert(*position_id, *strategy_id);
4251
4252 self.index
4254 .position_orders
4255 .entry(*position_id)
4256 .or_default()
4257 .insert(*client_order_id);
4258
4259 self.index
4261 .strategy_positions
4262 .entry(*strategy_id)
4263 .or_default()
4264 .insert(*position_id);
4265
4266 self.index
4268 .venue_positions
4269 .entry(*venue)
4270 .or_default()
4271 .insert(*position_id);
4272
4273 Ok(())
4274 }
4275
4276 fn assign_position_ids_to_contingencies(&mut self) {
4285 let mut assignments: Vec<(PositionId, ClientOrderId)> = Vec::new();
4286
4287 for parent_order_cell in self.orders.values() {
4288 let parent = parent_order_cell.borrow();
4289 if parent.contingency_type() != Some(ContingencyType::Oto) {
4290 continue;
4291 }
4292 let Some(parent_position_id) = parent.position_id() else {
4293 continue;
4294 };
4295 let Some(linked_order_ids) = parent.linked_order_ids() else {
4296 continue;
4297 };
4298
4299 for client_order_id in linked_order_ids {
4300 match self.orders.get(client_order_id) {
4301 None => {
4302 log::error!("Contingency order {client_order_id} not found");
4303 }
4304 Some(contingent_order_cell) => {
4305 if contingent_order_cell.borrow().position_id().is_none() {
4306 assignments.push((parent_position_id, *client_order_id));
4307 }
4308 }
4309 }
4310 }
4311 }
4312
4313 for (position_id, client_order_id) in assignments {
4314 let Some((venue, strategy_id)) = self.orders.get(&client_order_id).map(|order_cell| {
4315 let mut contingent = order_cell.borrow_mut();
4316 contingent.set_position_id(Some(position_id));
4317 (contingent.instrument_id().venue, contingent.strategy_id())
4318 }) else {
4319 continue;
4320 };
4321
4322 if let Err(e) =
4325 self.add_position_id(&position_id, &venue, &client_order_id, &strategy_id)
4326 {
4327 log::error!("Failed to re-index {client_order_id} -> {position_id}: {e}");
4328 }
4329 }
4330 }
4331
4332 pub fn add_position(&mut self, position: &Position, _oms_type: OmsType) -> anyhow::Result<()> {
4338 self.positions
4339 .insert(position.id, SharedCell::new(position.clone()));
4340 self.index.positions.insert(position.id);
4341 self.index.positions_open.insert(position.id);
4342 self.index.positions_closed.remove(&position.id); log::debug!("Adding {position}");
4345
4346 self.add_position_id(
4347 &position.id,
4348 &position.instrument_id.venue,
4349 &position.opening_order_id,
4350 &position.strategy_id,
4351 )?;
4352
4353 let venue = position.instrument_id.venue;
4354 let venue_positions = self.index.venue_positions.entry(venue).or_default();
4355 venue_positions.insert(position.id);
4356
4357 let instrument_id = position.instrument_id;
4359 let instrument_positions = self
4360 .index
4361 .instrument_positions
4362 .entry(instrument_id)
4363 .or_default();
4364 instrument_positions.insert(position.id);
4365
4366 self.index
4368 .account_positions
4369 .entry(position.account_id)
4370 .or_default()
4371 .insert(position.id);
4372
4373 if let Some(database) = &mut self.database {
4374 database.add_position(position)?;
4375 }
4384
4385 Ok(())
4386 }
4387
4388 pub fn update_account(&mut self, account: &AccountAny) -> anyhow::Result<()> {
4397 let account_id = account.id();
4398 match self.accounts.get(&account_id) {
4399 Some(account_cell) => *account_cell.borrow_mut() = account.clone(),
4400 None => {
4401 self.accounts
4402 .insert(account_id, SharedCell::new(account.clone()));
4403 }
4404 }
4405
4406 if let Some(database) = &mut self.database {
4407 database.update_account(account)?;
4408 }
4409 Ok(())
4410 }
4411
4412 #[must_use]
4427 pub fn take_account(&mut self, account_id: &AccountId) -> Option<AccountAny> {
4428 self.accounts.remove(account_id).map(|cell| {
4429 let rc: Rc<RefCell<AccountAny>> = cell.into();
4430 Rc::try_unwrap(rc).map_or_else(
4431 |_| panic!("take_account: cache must be sole owner of {account_id} cell"),
4432 RefCell::into_inner,
4433 )
4434 })
4435 }
4436
4437 pub fn cache_account_owned(&mut self, account: AccountAny) {
4439 let account_id = account.id();
4440 self.index
4441 .venue_account
4442 .insert(account_id.get_issuer(), account_id);
4443 match self.accounts.get(&account_id) {
4444 Some(account_cell) => *account_cell.borrow_mut() = account,
4445 None => {
4446 self.accounts.insert(account_id, SharedCell::new(account));
4447 }
4448 }
4449 }
4450
4451 pub fn update_account_owned(&mut self, account: AccountAny) -> anyhow::Result<()> {
4457 let account_id = account.id();
4458 self.cache_account_owned(account);
4459
4460 if let Some(database) = &mut self.database {
4461 let Some(account_cell) = self.accounts.get(&account_id) else {
4462 anyhow::bail!("Account {account_id} not found after cache update");
4463 };
4464 database.update_account(&account_cell.borrow())?;
4465 }
4466 Ok(())
4467 }
4468
4469 pub fn update_account_state(&mut self, event: &AccountState) -> anyhow::Result<()> {
4479 let Some(cell) = self.accounts.get(&event.account_id) else {
4480 return self.add_account(AccountAny::from_events(std::slice::from_ref(event))?);
4481 };
4482
4483 cell.borrow_mut().apply(event.clone())?;
4484
4485 if let Some(database) = &mut self.database {
4486 database.update_account(&cell.borrow())?;
4487 }
4488 Ok(())
4489 }
4490
4491 pub fn replace_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
4500 self.refresh_order(order)?;
4501
4502 let client_order_id = order.client_order_id();
4503 match self.orders.get(&client_order_id) {
4504 Some(order_cell) => *order_cell.borrow_mut() = order.clone(),
4507 None => {
4508 self.orders
4509 .insert(client_order_id, SharedCell::new(order.clone()));
4510 }
4511 }
4512
4513 Ok(())
4514 }
4515
4516 pub fn update_order(&mut self, event: &OrderEventAny) -> anyhow::Result<OrderAny> {
4522 let event_client_order_id = event.client_order_id();
4523 let client_order_id = if self.order_exists(&event_client_order_id) {
4524 event_client_order_id
4525 } else if let Some(venue_order_id) = event.venue_order_id() {
4526 self.index
4527 .venue_order_ids
4528 .get(&venue_order_id)
4529 .copied()
4530 .ok_or(OrderError::NotFound(event_client_order_id))?
4531 } else {
4532 return Err(OrderError::NotFound(event_client_order_id).into());
4533 };
4534
4535 let order_cell = self
4536 .orders
4537 .get(&client_order_id)
4538 .cloned()
4539 .ok_or(OrderError::NotFound(client_order_id))?;
4540
4541 let mut snapshot = order_cell.borrow().clone();
4545 snapshot.apply(event.clone())?;
4546 *order_cell.borrow_mut() = snapshot.clone();
4547
4548 if let Err(e) = self.refresh_order(&snapshot) {
4549 log::error!("Error updating order in cache: {e}");
4550 }
4551
4552 Ok(snapshot)
4553 }
4554
4555 fn refresh_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
4556 let client_order_id = order.client_order_id();
4557
4558 if order.is_active_local() {
4559 self.index.orders_active_local.insert(client_order_id);
4560 } else {
4561 self.index.orders_active_local.remove(&client_order_id);
4562 }
4563
4564 if let Some(venue_order_id) = order.venue_order_id() {
4566 if !self.index.venue_order_ids.contains_key(&venue_order_id) {
4569 let overwrite = matches!(order.last_event(), OrderEventAny::Updated(_));
4570 if let Err(e) =
4571 self.add_venue_order_id(&order.client_order_id(), &venue_order_id, overwrite)
4572 {
4573 log::error!("Error indexing venue order ID in cache: {e}");
4574 }
4575 }
4576 }
4577
4578 if order.is_inflight() {
4580 self.index.orders_inflight.insert(client_order_id);
4581 } else {
4582 self.index.orders_inflight.remove(&client_order_id);
4583 }
4584
4585 if order.is_open() {
4587 self.index.orders_closed.remove(&client_order_id);
4588 self.index.orders_open.insert(client_order_id);
4589 } else if order.is_closed() {
4590 self.index.orders_open.remove(&client_order_id);
4591 self.index.orders_pending_cancel.remove(&client_order_id);
4592 self.index.orders_closed.insert(client_order_id);
4593 }
4594
4595 if let Some(emulation_trigger) = order.emulation_trigger()
4597 && emulation_trigger != TriggerType::NoTrigger
4598 && !order.is_closed()
4599 {
4600 self.index.orders_emulated.insert(client_order_id);
4601 } else {
4602 self.index.orders_emulated.remove(&client_order_id);
4603 }
4604
4605 if let Some(account_id) = order.account_id() {
4607 self.index
4608 .account_orders
4609 .entry(account_id)
4610 .or_default()
4611 .insert(client_order_id);
4612 }
4613
4614 if !self.own_books.is_empty() {
4616 let own_book = self.own_order_book(&order.instrument_id());
4617 if (own_book.is_some() && order.is_closed()) || should_handle_own_book_order(order) {
4618 self.update_own_order_book(order);
4619 }
4620 }
4621
4622 if let Some(database) = &mut self.database {
4623 database.update_order(order.last_event())?;
4624 }
4629
4630 Ok(())
4631 }
4632
4633 pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
4635 self.index
4636 .orders_pending_cancel
4637 .insert(order.client_order_id());
4638 }
4639
4640 pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
4649 if position.is_open() {
4652 self.index.positions_open.insert(position.id);
4653 self.index.positions_closed.remove(&position.id);
4654 } else {
4655 self.index.positions_closed.insert(position.id);
4656 self.index.positions_open.remove(&position.id);
4657 }
4658
4659 if let Some(database) = &mut self.database {
4660 database.update_position(position)?;
4661 }
4666
4667 match self.positions.get(&position.id) {
4668 Some(position_cell) => *position_cell.borrow_mut() = position.clone(),
4669 None => {
4670 self.positions
4671 .insert(position.id, SharedCell::new(position.clone()));
4672 }
4673 }
4674
4675 Ok(())
4676 }
4677
4678 pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<CacheSnapshotRef> {
4685 let position_id = position.id;
4686
4687 let mut copied_position = position.clone();
4688 let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
4689 copied_position.id = PositionId::new(new_id);
4690
4691 let position_serialized = serde_json::to_vec(&copied_position)?;
4693 let snapshot_index = self.position_snapshot_count(&position_id);
4694 let blob_ref = format!(
4695 "cache://position-snapshots/{}/{}",
4696 position_id.as_str(),
4697 snapshot_index,
4698 );
4699 let snapshot_blob = Bytes::from(position_serialized);
4700
4701 self.add(&blob_ref, snapshot_blob.clone())?;
4702 self.position_snapshots
4703 .entry(position_id)
4704 .or_default()
4705 .push(snapshot_blob.clone());
4706
4707 log::debug!("Snapshot {copied_position}");
4708 Ok(CacheSnapshotRef::new(blob_ref, snapshot_blob))
4709 }
4710
4711 pub fn load_snapshot_blob(&mut self, blob_ref: &str) -> anyhow::Result<Option<Bytes>> {
4721 if let Some(blob) = self.snapshot_blob(blob_ref) {
4722 return Ok(Some(blob));
4723 }
4724
4725 if self.database.is_some() {
4726 self.cache_general()?;
4727 }
4728
4729 Ok(self.snapshot_blob(blob_ref))
4730 }
4731
4732 pub fn restore_snapshot_blob(&mut self, blob_ref: &str, blob: Bytes) -> anyhow::Result<()> {
4742 let (position_id, snapshot_index) = parse_position_snapshot_blob_ref(blob_ref)?;
4743 validate_position_snapshot_blob(&position_id, blob.as_ref())?;
4744
4745 let frames = self.position_snapshots.entry(position_id).or_default();
4746 match frames.get(snapshot_index) {
4747 Some(existing) if existing == &blob => {}
4748 Some(_) => {
4749 anyhow::bail!(
4750 "position snapshot frame {snapshot_index} for {position_id} already exists with different bytes"
4751 );
4752 }
4753 None if frames.len() == snapshot_index => frames.push(blob.clone()),
4754 None => {
4755 anyhow::bail!(
4756 "position snapshot blob_ref {blob_ref} skips missing frame {}",
4757 frames.len()
4758 );
4759 }
4760 }
4761
4762 self.general.insert(blob_ref.to_string(), blob);
4763 Ok(())
4764 }
4765
4766 fn snapshot_blob(&self, blob_ref: &str) -> Option<Bytes> {
4767 if let Some(blob) = self.general.get(blob_ref) {
4768 return Some(blob.clone());
4769 }
4770
4771 let (position_id, snapshot_index) = parse_position_snapshot_blob_ref(blob_ref).ok()?;
4772 self.position_snapshots
4773 .get(&position_id)
4774 .and_then(|frames| frames.get(snapshot_index))
4775 .cloned()
4776 }
4777
4778 pub fn snapshot_position_state(
4784 &mut self,
4785 position: &Position,
4786 ts_snapshot: UnixNanos,
4787 unrealized_pnl: Option<Money>,
4788 open_only: Option<bool>,
4789 ) -> anyhow::Result<()> {
4790 let open_only = open_only.unwrap_or(true);
4791
4792 if open_only && !position.is_open() {
4793 return Ok(());
4794 }
4795
4796 if let Some(database) = &mut self.database {
4797 database
4798 .snapshot_position_state(position, ts_snapshot, unrealized_pnl)
4799 .map_err(|e| {
4800 log::error!(
4801 "Failed to snapshot position state for {}: {e:?}",
4802 position.id
4803 );
4804 e
4805 })?;
4806 } else {
4807 log::warn!(
4808 "Cannot snapshot position state for {} (no database configured)",
4809 position.id
4810 );
4811 }
4812
4813 Ok(())
4814 }
4815
4816 #[must_use]
4818 pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
4819 if self.index.position_strategy.contains_key(position_id) {
4821 Some(OmsType::Netting)
4824 } else {
4825 None
4826 }
4827 }
4828
4829 #[must_use]
4834 pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<Vec<u8>>> {
4835 self.position_snapshots
4836 .get(position_id)
4837 .map(|frames| frames.iter().map(|b| b.to_vec()).collect())
4838 }
4839
4840 #[must_use]
4844 pub fn position_snapshot_count(&self, position_id: &PositionId) -> usize {
4845 self.position_snapshots.get(position_id).map_or(0, Vec::len)
4846 }
4847
4848 #[must_use]
4854 pub fn position_snapshots(
4855 &self,
4856 position_id: Option<&PositionId>,
4857 account_id: Option<&AccountId>,
4858 ) -> Vec<Position> {
4859 let frames: Box<dyn Iterator<Item = &Bytes> + '_> = match position_id {
4860 Some(pid) => match self.position_snapshots.get(pid) {
4861 Some(v) => Box::new(v.iter()),
4862 None => Box::new(std::iter::empty()),
4863 },
4864 None => Box::new(self.position_snapshots.values().flat_map(|v| v.iter())),
4865 };
4866
4867 let mut results: Vec<Position> = frames
4868 .filter_map(|bytes| match serde_json::from_slice::<Position>(bytes) {
4869 Ok(position) => Some(position),
4870 Err(e) => {
4871 log::warn!("Failed to decode position snapshot: {e}");
4872 None
4873 }
4874 })
4875 .collect();
4876
4877 if let Some(aid) = account_id {
4878 results.retain(|p| p.account_id == *aid);
4879 }
4880
4881 results
4882 }
4883
4884 #[must_use]
4890 pub fn position_snapshots_from(&self, position_id: &PositionId, skip: usize) -> Vec<Position> {
4891 let Some(frames) = self.position_snapshots.get(position_id) else {
4892 return Vec::new();
4893 };
4894
4895 frames
4896 .iter()
4897 .skip(skip)
4898 .filter_map(|bytes| match serde_json::from_slice::<Position>(bytes) {
4899 Ok(position) => Some(position),
4900 Err(e) => {
4901 log::warn!("Failed to decode position snapshot: {e}");
4902 None
4903 }
4904 })
4905 .collect()
4906 }
4907
4908 #[must_use]
4910 pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> AHashSet<PositionId> {
4911 let mut result = AHashSet::new();
4913
4914 for (position_id, _) in &self.position_snapshots {
4915 if let Some(position_cell) = self.positions.get(position_id)
4917 && position_cell.borrow().instrument_id == *instrument_id
4918 {
4919 result.insert(*position_id);
4920 }
4921 }
4922 result
4923 }
4924
4925 pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
4931 let Some(database) = &self.database else {
4932 log::warn!(
4933 "Cannot snapshot order state for {} (no database configured)",
4934 order.client_order_id()
4935 );
4936 return Ok(());
4937 };
4938
4939 database.snapshot_order_state(order)
4940 }
4941
4942 fn collect_order_filter_sources<'a>(
4953 &'a self,
4954 venue: Option<&Venue>,
4955 instrument_id: Option<&InstrumentId>,
4956 strategy_id: Option<&StrategyId>,
4957 account_id: Option<&AccountId>,
4958 ) -> FilterSources<'a, ClientOrderId> {
4959 let mut sources: Vec<&AHashSet<ClientOrderId>> = Vec::with_capacity(4);
4960
4961 if let Some(venue) = venue {
4962 match self.index.venue_orders.get(venue) {
4963 Some(set) => sources.push(set),
4964 None => return FilterSources::Empty,
4965 }
4966 }
4967
4968 if let Some(instrument_id) = instrument_id {
4969 match self.index.instrument_orders.get(instrument_id) {
4970 Some(set) => sources.push(set),
4971 None => return FilterSources::Empty,
4972 }
4973 }
4974
4975 if let Some(strategy_id) = strategy_id {
4976 match self.index.strategy_orders.get(strategy_id) {
4977 Some(set) => sources.push(set),
4978 None => return FilterSources::Empty,
4979 }
4980 }
4981
4982 if let Some(account_id) = account_id {
4983 match self.index.account_orders.get(account_id) {
4984 Some(set) => sources.push(set),
4985 None => return FilterSources::Empty,
4986 }
4987 }
4988
4989 if sources.is_empty() {
4990 FilterSources::Unfiltered
4991 } else {
4992 FilterSources::Sets(sources)
4993 }
4994 }
4995
4996 fn collect_position_filter_sources<'a>(
4997 &'a self,
4998 venue: Option<&Venue>,
4999 instrument_id: Option<&InstrumentId>,
5000 strategy_id: Option<&StrategyId>,
5001 account_id: Option<&AccountId>,
5002 ) -> FilterSources<'a, PositionId> {
5003 let mut sources: Vec<&AHashSet<PositionId>> = Vec::with_capacity(4);
5004
5005 if let Some(venue) = venue {
5006 match self.index.venue_positions.get(venue) {
5007 Some(set) => sources.push(set),
5008 None => return FilterSources::Empty,
5009 }
5010 }
5011
5012 if let Some(instrument_id) = instrument_id {
5013 match self.index.instrument_positions.get(instrument_id) {
5014 Some(set) => sources.push(set),
5015 None => return FilterSources::Empty,
5016 }
5017 }
5018
5019 if let Some(strategy_id) = strategy_id {
5020 match self.index.strategy_positions.get(strategy_id) {
5021 Some(set) => sources.push(set),
5022 None => return FilterSources::Empty,
5023 }
5024 }
5025
5026 if let Some(account_id) = account_id {
5027 match self.index.account_positions.get(account_id) {
5028 Some(set) => sources.push(set),
5029 None => return FilterSources::Empty,
5030 }
5031 }
5032
5033 if sources.is_empty() {
5034 FilterSources::Unfiltered
5035 } else {
5036 FilterSources::Sets(sources)
5037 }
5038 }
5039
5040 fn query_orders_in_bucket(
5046 &self,
5047 bucket: &AHashSet<ClientOrderId>,
5048 venue: Option<&Venue>,
5049 instrument_id: Option<&InstrumentId>,
5050 strategy_id: Option<&StrategyId>,
5051 account_id: Option<&AccountId>,
5052 ) -> AHashSet<ClientOrderId> {
5053 match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
5054 FilterSources::Empty => AHashSet::new(),
5055 FilterSources::Unfiltered => bucket.clone(),
5056 FilterSources::Sets(sources) => intersect_pair_or_many(bucket, sources),
5057 }
5058 }
5059
5060 fn query_positions_in_bucket(
5061 &self,
5062 bucket: &AHashSet<PositionId>,
5063 venue: Option<&Venue>,
5064 instrument_id: Option<&InstrumentId>,
5065 strategy_id: Option<&StrategyId>,
5066 account_id: Option<&AccountId>,
5067 ) -> AHashSet<PositionId> {
5068 match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
5069 FilterSources::Empty => AHashSet::new(),
5070 FilterSources::Unfiltered => bucket.clone(),
5071 FilterSources::Sets(sources) => intersect_pair_or_many(bucket, sources),
5072 }
5073 }
5074
5075 fn view_orders_in_bucket<'a>(
5078 &'a self,
5079 bucket: &'a AHashSet<ClientOrderId>,
5080 venue: Option<&Venue>,
5081 instrument_id: Option<&InstrumentId>,
5082 strategy_id: Option<&StrategyId>,
5083 account_id: Option<&AccountId>,
5084 ) -> Cow<'a, AHashSet<ClientOrderId>> {
5085 match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
5086 FilterSources::Empty => Cow::Owned(AHashSet::new()),
5087 FilterSources::Unfiltered => Cow::Borrowed(bucket),
5088 FilterSources::Sets(sources) => Cow::Owned(intersect_pair_or_many(bucket, sources)),
5089 }
5090 }
5091
5092 fn view_positions_in_bucket<'a>(
5093 &'a self,
5094 bucket: &'a AHashSet<PositionId>,
5095 venue: Option<&Venue>,
5096 instrument_id: Option<&InstrumentId>,
5097 strategy_id: Option<&StrategyId>,
5098 account_id: Option<&AccountId>,
5099 ) -> Cow<'a, AHashSet<PositionId>> {
5100 match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
5101 FilterSources::Empty => Cow::Owned(AHashSet::new()),
5102 FilterSources::Unfiltered => Cow::Borrowed(bucket),
5103 FilterSources::Sets(sources) => Cow::Owned(intersect_pair_or_many(bucket, sources)),
5104 }
5105 }
5106
5107 fn iter_orders_in_bucket<'a>(
5112 &'a self,
5113 bucket: &'a AHashSet<ClientOrderId>,
5114 venue: Option<&Venue>,
5115 instrument_id: Option<&InstrumentId>,
5116 strategy_id: Option<&StrategyId>,
5117 account_id: Option<&AccountId>,
5118 ) -> Box<dyn Iterator<Item = ClientOrderId> + 'a> {
5119 match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
5120 FilterSources::Empty => Box::new(std::iter::empty()),
5121 FilterSources::Unfiltered => Box::new(bucket.iter().copied()),
5122 FilterSources::Sets(mut sources) => {
5123 sources.push(bucket);
5124 sources.sort_unstable_by_key(|s| s.len());
5125 let driver = sources[0];
5126 let rest: Vec<&'a AHashSet<ClientOrderId>> = sources[1..].to_vec();
5127 Box::new(
5128 driver
5129 .iter()
5130 .copied()
5131 .filter(move |id| rest.iter().all(|s| s.contains(id))),
5132 )
5133 }
5134 }
5135 }
5136
5137 fn iter_positions_in_bucket<'a>(
5138 &'a self,
5139 bucket: &'a AHashSet<PositionId>,
5140 venue: Option<&Venue>,
5141 instrument_id: Option<&InstrumentId>,
5142 strategy_id: Option<&StrategyId>,
5143 account_id: Option<&AccountId>,
5144 ) -> Box<dyn Iterator<Item = PositionId> + 'a> {
5145 match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
5146 FilterSources::Empty => Box::new(std::iter::empty()),
5147 FilterSources::Unfiltered => Box::new(bucket.iter().copied()),
5148 FilterSources::Sets(mut sources) => {
5149 sources.push(bucket);
5150 sources.sort_unstable_by_key(|s| s.len());
5151 let driver = sources[0];
5152 let rest: Vec<&'a AHashSet<PositionId>> = sources[1..].to_vec();
5153 Box::new(
5154 driver
5155 .iter()
5156 .copied()
5157 .filter(move |id| rest.iter().all(|s| s.contains(id))),
5158 )
5159 }
5160 }
5161 }
5162
5163 fn count_orders_in_bucket(
5169 &self,
5170 bucket: &AHashSet<ClientOrderId>,
5171 venue: Option<&Venue>,
5172 instrument_id: Option<&InstrumentId>,
5173 strategy_id: Option<&StrategyId>,
5174 account_id: Option<&AccountId>,
5175 side: Option<OrderSide>,
5176 ) -> usize {
5177 let side = side.unwrap_or(OrderSide::NoOrderSide);
5178
5179 match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
5180 FilterSources::Empty => 0,
5181 FilterSources::Unfiltered => {
5182 if side == OrderSide::NoOrderSide {
5183 bucket.len()
5184 } else {
5185 bucket
5186 .iter()
5187 .filter(|id| self.order_side_matches(id, side))
5188 .count()
5189 }
5190 }
5191 FilterSources::Sets(mut sources) => {
5192 sources.push(bucket);
5193 sources.sort_unstable_by_key(|s| s.len());
5194 let driver = sources[0];
5195 let rest = &sources[1..];
5196
5197 driver
5198 .iter()
5199 .filter(|id| rest.iter().all(|s| s.contains(id)))
5200 .filter(|id| {
5201 side == OrderSide::NoOrderSide || self.order_side_matches(id, side)
5202 })
5203 .count()
5204 }
5205 }
5206 }
5207
5208 fn count_positions_in_bucket(
5209 &self,
5210 bucket: &AHashSet<PositionId>,
5211 venue: Option<&Venue>,
5212 instrument_id: Option<&InstrumentId>,
5213 strategy_id: Option<&StrategyId>,
5214 account_id: Option<&AccountId>,
5215 side: Option<PositionSide>,
5216 ) -> usize {
5217 let side = side.unwrap_or(PositionSide::NoPositionSide);
5218
5219 match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
5220 FilterSources::Empty => 0,
5221 FilterSources::Unfiltered => {
5222 if side == PositionSide::NoPositionSide {
5223 bucket.len()
5224 } else {
5225 bucket
5226 .iter()
5227 .filter(|id| self.position_side_matches(id, side))
5228 .count()
5229 }
5230 }
5231 FilterSources::Sets(mut sources) => {
5232 sources.push(bucket);
5233 sources.sort_unstable_by_key(|s| s.len());
5234 let driver = sources[0];
5235 let rest = &sources[1..];
5236
5237 driver
5238 .iter()
5239 .filter(|id| rest.iter().all(|s| s.contains(id)))
5240 .filter(|id| {
5241 side == PositionSide::NoPositionSide || self.position_side_matches(id, side)
5242 })
5243 .count()
5244 }
5245 }
5246 }
5247
5248 fn any_orders_in_bucket(
5254 &self,
5255 bucket: &AHashSet<ClientOrderId>,
5256 venue: Option<&Venue>,
5257 instrument_id: Option<&InstrumentId>,
5258 strategy_id: Option<&StrategyId>,
5259 account_id: Option<&AccountId>,
5260 side: Option<OrderSide>,
5261 ) -> bool {
5262 let side = side.unwrap_or(OrderSide::NoOrderSide);
5263
5264 match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
5265 FilterSources::Empty => false,
5266 FilterSources::Unfiltered => {
5267 if side == OrderSide::NoOrderSide {
5268 !bucket.is_empty()
5269 } else {
5270 bucket.iter().any(|id| self.order_side_matches(id, side))
5271 }
5272 }
5273 FilterSources::Sets(mut sources) => {
5274 sources.push(bucket);
5275 sources.sort_unstable_by_key(|s| s.len());
5276 let driver = sources[0];
5277 let rest = &sources[1..];
5278
5279 driver
5280 .iter()
5281 .filter(|id| rest.iter().all(|s| s.contains(id)))
5282 .any(|id| side == OrderSide::NoOrderSide || self.order_side_matches(id, side))
5283 }
5284 }
5285 }
5286
5287 fn any_positions_in_bucket(
5288 &self,
5289 bucket: &AHashSet<PositionId>,
5290 venue: Option<&Venue>,
5291 instrument_id: Option<&InstrumentId>,
5292 strategy_id: Option<&StrategyId>,
5293 account_id: Option<&AccountId>,
5294 side: Option<PositionSide>,
5295 ) -> bool {
5296 let side = side.unwrap_or(PositionSide::NoPositionSide);
5297
5298 match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
5299 FilterSources::Empty => false,
5300 FilterSources::Unfiltered => {
5301 if side == PositionSide::NoPositionSide {
5302 !bucket.is_empty()
5303 } else {
5304 bucket.iter().any(|id| self.position_side_matches(id, side))
5305 }
5306 }
5307 FilterSources::Sets(mut sources) => {
5308 sources.push(bucket);
5309 sources.sort_unstable_by_key(|s| s.len());
5310 let driver = sources[0];
5311 let rest = &sources[1..];
5312
5313 driver
5314 .iter()
5315 .filter(|id| rest.iter().all(|s| s.contains(id)))
5316 .any(|id| {
5317 side == PositionSide::NoPositionSide || self.position_side_matches(id, side)
5318 })
5319 }
5320 }
5321 }
5322
5323 fn order_side_matches(&self, client_order_id: &ClientOrderId, side: OrderSide) -> bool {
5324 self.orders
5325 .get(client_order_id)
5326 .is_some_and(|cell| cell.borrow().order_side() == side)
5327 }
5328
5329 fn position_side_matches(&self, position_id: &PositionId, side: PositionSide) -> bool {
5330 self.positions
5331 .get(position_id)
5332 .is_some_and(|cell| cell.borrow().side == side)
5333 }
5334
5335 fn get_orders_for_ids(
5341 &self,
5342 client_order_ids: &AHashSet<ClientOrderId>,
5343 side: Option<OrderSide>,
5344 ) -> Vec<OrderRef<'_>> {
5345 let side = side.unwrap_or(OrderSide::NoOrderSide);
5346 let mut orders = Vec::new();
5347
5348 for client_order_id in client_order_ids {
5349 let order_cell = self
5350 .orders
5351 .get(client_order_id)
5352 .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
5353 let order = OrderRef::new(order_cell.borrow());
5354
5355 if side == OrderSide::NoOrderSide || side == order.order_side() {
5356 orders.push(order);
5357 }
5358 }
5359
5360 orders.sort_by_key(|o| o.client_order_id());
5363 orders
5364 }
5365
5366 fn get_positions_for_ids(
5376 &self,
5377 position_ids: &AHashSet<PositionId>,
5378 side: Option<PositionSide>,
5379 ) -> Vec<PositionRef<'_>> {
5380 let side = side.unwrap_or(PositionSide::NoPositionSide);
5381 let mut positions = Vec::new();
5382
5383 for position_id in position_ids {
5384 let position_cell = self
5385 .positions
5386 .get(position_id)
5387 .unwrap_or_else(|| panic!("Position {position_id} not found"));
5388 let position = PositionRef::new(position_cell.borrow());
5389
5390 if side == PositionSide::NoPositionSide || side == position.side {
5391 positions.push(position);
5392 }
5393 }
5394
5395 positions.sort_by_key(|p| p.id);
5398 positions
5399 }
5400
5401 #[must_use]
5403 pub fn client_order_ids(
5404 &self,
5405 venue: Option<&Venue>,
5406 instrument_id: Option<&InstrumentId>,
5407 strategy_id: Option<&StrategyId>,
5408 account_id: Option<&AccountId>,
5409 ) -> AHashSet<ClientOrderId> {
5410 self.query_orders_in_bucket(
5411 &self.index.orders,
5412 venue,
5413 instrument_id,
5414 strategy_id,
5415 account_id,
5416 )
5417 }
5418
5419 #[must_use]
5421 pub fn client_order_ids_open(
5422 &self,
5423 venue: Option<&Venue>,
5424 instrument_id: Option<&InstrumentId>,
5425 strategy_id: Option<&StrategyId>,
5426 account_id: Option<&AccountId>,
5427 ) -> AHashSet<ClientOrderId> {
5428 self.query_orders_in_bucket(
5429 &self.index.orders_open,
5430 venue,
5431 instrument_id,
5432 strategy_id,
5433 account_id,
5434 )
5435 }
5436
5437 #[must_use]
5439 pub fn client_order_ids_closed(
5440 &self,
5441 venue: Option<&Venue>,
5442 instrument_id: Option<&InstrumentId>,
5443 strategy_id: Option<&StrategyId>,
5444 account_id: Option<&AccountId>,
5445 ) -> AHashSet<ClientOrderId> {
5446 self.query_orders_in_bucket(
5447 &self.index.orders_closed,
5448 venue,
5449 instrument_id,
5450 strategy_id,
5451 account_id,
5452 )
5453 }
5454
5455 #[must_use]
5460 pub fn client_order_ids_active_local(
5461 &self,
5462 venue: Option<&Venue>,
5463 instrument_id: Option<&InstrumentId>,
5464 strategy_id: Option<&StrategyId>,
5465 account_id: Option<&AccountId>,
5466 ) -> AHashSet<ClientOrderId> {
5467 self.query_orders_in_bucket(
5468 &self.index.orders_active_local,
5469 venue,
5470 instrument_id,
5471 strategy_id,
5472 account_id,
5473 )
5474 }
5475
5476 #[must_use]
5478 pub fn client_order_ids_emulated(
5479 &self,
5480 venue: Option<&Venue>,
5481 instrument_id: Option<&InstrumentId>,
5482 strategy_id: Option<&StrategyId>,
5483 account_id: Option<&AccountId>,
5484 ) -> AHashSet<ClientOrderId> {
5485 self.query_orders_in_bucket(
5486 &self.index.orders_emulated,
5487 venue,
5488 instrument_id,
5489 strategy_id,
5490 account_id,
5491 )
5492 }
5493
5494 #[must_use]
5496 pub fn client_order_ids_inflight(
5497 &self,
5498 venue: Option<&Venue>,
5499 instrument_id: Option<&InstrumentId>,
5500 strategy_id: Option<&StrategyId>,
5501 account_id: Option<&AccountId>,
5502 ) -> AHashSet<ClientOrderId> {
5503 self.query_orders_in_bucket(
5504 &self.index.orders_inflight,
5505 venue,
5506 instrument_id,
5507 strategy_id,
5508 account_id,
5509 )
5510 }
5511
5512 #[must_use]
5514 pub fn position_ids(
5515 &self,
5516 venue: Option<&Venue>,
5517 instrument_id: Option<&InstrumentId>,
5518 strategy_id: Option<&StrategyId>,
5519 account_id: Option<&AccountId>,
5520 ) -> AHashSet<PositionId> {
5521 self.query_positions_in_bucket(
5522 &self.index.positions,
5523 venue,
5524 instrument_id,
5525 strategy_id,
5526 account_id,
5527 )
5528 }
5529
5530 #[must_use]
5532 pub fn position_open_ids(
5533 &self,
5534 venue: Option<&Venue>,
5535 instrument_id: Option<&InstrumentId>,
5536 strategy_id: Option<&StrategyId>,
5537 account_id: Option<&AccountId>,
5538 ) -> AHashSet<PositionId> {
5539 self.query_positions_in_bucket(
5540 &self.index.positions_open,
5541 venue,
5542 instrument_id,
5543 strategy_id,
5544 account_id,
5545 )
5546 }
5547
5548 #[must_use]
5550 pub fn position_closed_ids(
5551 &self,
5552 venue: Option<&Venue>,
5553 instrument_id: Option<&InstrumentId>,
5554 strategy_id: Option<&StrategyId>,
5555 account_id: Option<&AccountId>,
5556 ) -> AHashSet<PositionId> {
5557 self.query_positions_in_bucket(
5558 &self.index.positions_closed,
5559 venue,
5560 instrument_id,
5561 strategy_id,
5562 account_id,
5563 )
5564 }
5565
5566 #[must_use]
5573 pub fn client_order_ids_view(
5574 &self,
5575 venue: Option<&Venue>,
5576 instrument_id: Option<&InstrumentId>,
5577 strategy_id: Option<&StrategyId>,
5578 account_id: Option<&AccountId>,
5579 ) -> Cow<'_, AHashSet<ClientOrderId>> {
5580 self.view_orders_in_bucket(
5581 &self.index.orders,
5582 venue,
5583 instrument_id,
5584 strategy_id,
5585 account_id,
5586 )
5587 }
5588
5589 #[must_use]
5591 pub fn client_order_ids_open_view(
5592 &self,
5593 venue: Option<&Venue>,
5594 instrument_id: Option<&InstrumentId>,
5595 strategy_id: Option<&StrategyId>,
5596 account_id: Option<&AccountId>,
5597 ) -> Cow<'_, AHashSet<ClientOrderId>> {
5598 self.view_orders_in_bucket(
5599 &self.index.orders_open,
5600 venue,
5601 instrument_id,
5602 strategy_id,
5603 account_id,
5604 )
5605 }
5606
5607 #[must_use]
5609 pub fn client_order_ids_closed_view(
5610 &self,
5611 venue: Option<&Venue>,
5612 instrument_id: Option<&InstrumentId>,
5613 strategy_id: Option<&StrategyId>,
5614 account_id: Option<&AccountId>,
5615 ) -> Cow<'_, AHashSet<ClientOrderId>> {
5616 self.view_orders_in_bucket(
5617 &self.index.orders_closed,
5618 venue,
5619 instrument_id,
5620 strategy_id,
5621 account_id,
5622 )
5623 }
5624
5625 #[must_use]
5627 pub fn client_order_ids_active_local_view(
5628 &self,
5629 venue: Option<&Venue>,
5630 instrument_id: Option<&InstrumentId>,
5631 strategy_id: Option<&StrategyId>,
5632 account_id: Option<&AccountId>,
5633 ) -> Cow<'_, AHashSet<ClientOrderId>> {
5634 self.view_orders_in_bucket(
5635 &self.index.orders_active_local,
5636 venue,
5637 instrument_id,
5638 strategy_id,
5639 account_id,
5640 )
5641 }
5642
5643 #[must_use]
5645 pub fn client_order_ids_emulated_view(
5646 &self,
5647 venue: Option<&Venue>,
5648 instrument_id: Option<&InstrumentId>,
5649 strategy_id: Option<&StrategyId>,
5650 account_id: Option<&AccountId>,
5651 ) -> Cow<'_, AHashSet<ClientOrderId>> {
5652 self.view_orders_in_bucket(
5653 &self.index.orders_emulated,
5654 venue,
5655 instrument_id,
5656 strategy_id,
5657 account_id,
5658 )
5659 }
5660
5661 #[must_use]
5663 pub fn client_order_ids_inflight_view(
5664 &self,
5665 venue: Option<&Venue>,
5666 instrument_id: Option<&InstrumentId>,
5667 strategy_id: Option<&StrategyId>,
5668 account_id: Option<&AccountId>,
5669 ) -> Cow<'_, AHashSet<ClientOrderId>> {
5670 self.view_orders_in_bucket(
5671 &self.index.orders_inflight,
5672 venue,
5673 instrument_id,
5674 strategy_id,
5675 account_id,
5676 )
5677 }
5678
5679 #[must_use]
5681 pub fn position_ids_view(
5682 &self,
5683 venue: Option<&Venue>,
5684 instrument_id: Option<&InstrumentId>,
5685 strategy_id: Option<&StrategyId>,
5686 account_id: Option<&AccountId>,
5687 ) -> Cow<'_, AHashSet<PositionId>> {
5688 self.view_positions_in_bucket(
5689 &self.index.positions,
5690 venue,
5691 instrument_id,
5692 strategy_id,
5693 account_id,
5694 )
5695 }
5696
5697 #[must_use]
5699 pub fn position_open_ids_view(
5700 &self,
5701 venue: Option<&Venue>,
5702 instrument_id: Option<&InstrumentId>,
5703 strategy_id: Option<&StrategyId>,
5704 account_id: Option<&AccountId>,
5705 ) -> Cow<'_, AHashSet<PositionId>> {
5706 self.view_positions_in_bucket(
5707 &self.index.positions_open,
5708 venue,
5709 instrument_id,
5710 strategy_id,
5711 account_id,
5712 )
5713 }
5714
5715 #[must_use]
5717 pub fn position_closed_ids_view(
5718 &self,
5719 venue: Option<&Venue>,
5720 instrument_id: Option<&InstrumentId>,
5721 strategy_id: Option<&StrategyId>,
5722 account_id: Option<&AccountId>,
5723 ) -> Cow<'_, AHashSet<PositionId>> {
5724 self.view_positions_in_bucket(
5725 &self.index.positions_closed,
5726 venue,
5727 instrument_id,
5728 strategy_id,
5729 account_id,
5730 )
5731 }
5732
5733 pub fn iter_client_order_ids(
5739 &self,
5740 venue: Option<&Venue>,
5741 instrument_id: Option<&InstrumentId>,
5742 strategy_id: Option<&StrategyId>,
5743 account_id: Option<&AccountId>,
5744 ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
5745 self.iter_orders_in_bucket(
5746 &self.index.orders,
5747 venue,
5748 instrument_id,
5749 strategy_id,
5750 account_id,
5751 )
5752 }
5753
5754 pub fn iter_client_order_ids_open(
5756 &self,
5757 venue: Option<&Venue>,
5758 instrument_id: Option<&InstrumentId>,
5759 strategy_id: Option<&StrategyId>,
5760 account_id: Option<&AccountId>,
5761 ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
5762 self.iter_orders_in_bucket(
5763 &self.index.orders_open,
5764 venue,
5765 instrument_id,
5766 strategy_id,
5767 account_id,
5768 )
5769 }
5770
5771 pub fn iter_client_order_ids_closed(
5773 &self,
5774 venue: Option<&Venue>,
5775 instrument_id: Option<&InstrumentId>,
5776 strategy_id: Option<&StrategyId>,
5777 account_id: Option<&AccountId>,
5778 ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
5779 self.iter_orders_in_bucket(
5780 &self.index.orders_closed,
5781 venue,
5782 instrument_id,
5783 strategy_id,
5784 account_id,
5785 )
5786 }
5787
5788 pub fn iter_client_order_ids_active_local(
5790 &self,
5791 venue: Option<&Venue>,
5792 instrument_id: Option<&InstrumentId>,
5793 strategy_id: Option<&StrategyId>,
5794 account_id: Option<&AccountId>,
5795 ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
5796 self.iter_orders_in_bucket(
5797 &self.index.orders_active_local,
5798 venue,
5799 instrument_id,
5800 strategy_id,
5801 account_id,
5802 )
5803 }
5804
5805 pub fn iter_client_order_ids_emulated(
5807 &self,
5808 venue: Option<&Venue>,
5809 instrument_id: Option<&InstrumentId>,
5810 strategy_id: Option<&StrategyId>,
5811 account_id: Option<&AccountId>,
5812 ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
5813 self.iter_orders_in_bucket(
5814 &self.index.orders_emulated,
5815 venue,
5816 instrument_id,
5817 strategy_id,
5818 account_id,
5819 )
5820 }
5821
5822 pub fn iter_client_order_ids_inflight(
5824 &self,
5825 venue: Option<&Venue>,
5826 instrument_id: Option<&InstrumentId>,
5827 strategy_id: Option<&StrategyId>,
5828 account_id: Option<&AccountId>,
5829 ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
5830 self.iter_orders_in_bucket(
5831 &self.index.orders_inflight,
5832 venue,
5833 instrument_id,
5834 strategy_id,
5835 account_id,
5836 )
5837 }
5838
5839 pub fn iter_position_ids(
5841 &self,
5842 venue: Option<&Venue>,
5843 instrument_id: Option<&InstrumentId>,
5844 strategy_id: Option<&StrategyId>,
5845 account_id: Option<&AccountId>,
5846 ) -> Box<dyn Iterator<Item = PositionId> + '_> {
5847 self.iter_positions_in_bucket(
5848 &self.index.positions,
5849 venue,
5850 instrument_id,
5851 strategy_id,
5852 account_id,
5853 )
5854 }
5855
5856 pub fn iter_position_open_ids(
5858 &self,
5859 venue: Option<&Venue>,
5860 instrument_id: Option<&InstrumentId>,
5861 strategy_id: Option<&StrategyId>,
5862 account_id: Option<&AccountId>,
5863 ) -> Box<dyn Iterator<Item = PositionId> + '_> {
5864 self.iter_positions_in_bucket(
5865 &self.index.positions_open,
5866 venue,
5867 instrument_id,
5868 strategy_id,
5869 account_id,
5870 )
5871 }
5872
5873 pub fn iter_position_closed_ids(
5875 &self,
5876 venue: Option<&Venue>,
5877 instrument_id: Option<&InstrumentId>,
5878 strategy_id: Option<&StrategyId>,
5879 account_id: Option<&AccountId>,
5880 ) -> Box<dyn Iterator<Item = PositionId> + '_> {
5881 self.iter_positions_in_bucket(
5882 &self.index.positions_closed,
5883 venue,
5884 instrument_id,
5885 strategy_id,
5886 account_id,
5887 )
5888 }
5889
5890 #[must_use]
5892 pub fn actor_ids(&self) -> AHashSet<ComponentId> {
5893 self.index.actors.clone()
5894 }
5895
5896 #[must_use]
5898 pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
5899 self.index.strategies.clone()
5900 }
5901
5902 #[must_use]
5904 pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
5905 self.index.exec_algorithms.clone()
5906 }
5907
5908 #[must_use]
5917 pub fn order_ref(&self, client_order_id: &ClientOrderId) -> Option<OrderRef<'_>> {
5918 self.orders
5919 .get(client_order_id)
5920 .map(|order_cell| OrderRef::new(order_cell.borrow()))
5921 }
5922
5923 #[must_use]
5927 pub fn order(&self, client_order_id: &ClientOrderId) -> Option<OrderRef<'_>> {
5928 self.order_ref(client_order_id)
5929 }
5930
5931 pub fn try_order_ref(
5937 &self,
5938 client_order_id: &ClientOrderId,
5939 ) -> Result<OrderRef<'_>, OrderLookupError> {
5940 self.orders
5941 .get(client_order_id)
5942 .map(|order_cell| OrderRef::new(order_cell.borrow()))
5943 .ok_or_else(|| OrderLookupError::not_found(*client_order_id))
5944 }
5945
5946 pub fn try_order(
5954 &self,
5955 client_order_id: &ClientOrderId,
5956 ) -> Result<OrderRef<'_>, OrderLookupError> {
5957 self.try_order_ref(client_order_id)
5958 }
5959
5960 #[must_use]
5970 pub fn order_mut(&mut self, client_order_id: &ClientOrderId) -> Option<OrderRefMut<'_>> {
5971 self.orders
5972 .get(client_order_id)
5973 .map(|order_cell| OrderRefMut::new(order_cell.borrow_mut()))
5974 }
5975
5976 #[must_use]
5981 pub fn order_owned(&self, client_order_id: &ClientOrderId) -> Option<OrderAny> {
5982 self.orders
5983 .get(client_order_id)
5984 .map(|order_cell| order_cell.borrow().clone())
5985 }
5986
5987 pub fn try_order_owned(
5993 &self,
5994 client_order_id: &ClientOrderId,
5995 ) -> Result<OrderAny, OrderLookupError> {
5996 self.try_order_ref(client_order_id)
5997 .map(|order| order.cloned())
5998 }
5999
6000 #[must_use]
6002 pub fn orders_for_ids(
6003 &self,
6004 client_order_ids: &[ClientOrderId],
6005 context: &dyn Display,
6006 ) -> Vec<OrderAny> {
6007 let mut orders = Vec::with_capacity(client_order_ids.len());
6008 for id in client_order_ids {
6009 match self.orders.get(id) {
6010 Some(order_cell) => orders.push(order_cell.borrow().clone()),
6011 None => log::error!("Order {id} not found in cache for {context}"),
6012 }
6013 }
6014 orders
6015 }
6016
6017 #[must_use]
6019 pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
6020 self.index.venue_order_ids.get(venue_order_id)
6021 }
6022
6023 #[must_use]
6025 pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
6026 self.index.client_order_ids.get(client_order_id)
6027 }
6028
6029 #[must_use]
6031 pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
6032 self.index.order_client.get(client_order_id)
6033 }
6034
6035 #[must_use]
6041 pub fn orders_refs(
6042 &self,
6043 venue: Option<&Venue>,
6044 instrument_id: Option<&InstrumentId>,
6045 strategy_id: Option<&StrategyId>,
6046 account_id: Option<&AccountId>,
6047 side: Option<OrderSide>,
6048 ) -> Vec<OrderRef<'_>> {
6049 let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id, account_id);
6050 self.get_orders_for_ids(&client_order_ids, side)
6051 }
6052
6053 #[must_use]
6057 pub fn orders(
6058 &self,
6059 venue: Option<&Venue>,
6060 instrument_id: Option<&InstrumentId>,
6061 strategy_id: Option<&StrategyId>,
6062 account_id: Option<&AccountId>,
6063 side: Option<OrderSide>,
6064 ) -> Vec<OrderRef<'_>> {
6065 self.orders_refs(venue, instrument_id, strategy_id, account_id, side)
6066 }
6067
6068 #[must_use]
6070 pub fn orders_open_refs(
6071 &self,
6072 venue: Option<&Venue>,
6073 instrument_id: Option<&InstrumentId>,
6074 strategy_id: Option<&StrategyId>,
6075 account_id: Option<&AccountId>,
6076 side: Option<OrderSide>,
6077 ) -> Vec<OrderRef<'_>> {
6078 let client_order_ids =
6079 self.client_order_ids_open(venue, instrument_id, strategy_id, account_id);
6080 self.get_orders_for_ids(&client_order_ids, side)
6081 }
6082
6083 #[must_use]
6087 pub fn orders_open(
6088 &self,
6089 venue: Option<&Venue>,
6090 instrument_id: Option<&InstrumentId>,
6091 strategy_id: Option<&StrategyId>,
6092 account_id: Option<&AccountId>,
6093 side: Option<OrderSide>,
6094 ) -> Vec<OrderRef<'_>> {
6095 self.orders_open_refs(venue, instrument_id, strategy_id, account_id, side)
6096 }
6097
6098 #[must_use]
6100 pub fn orders_closed_refs(
6101 &self,
6102 venue: Option<&Venue>,
6103 instrument_id: Option<&InstrumentId>,
6104 strategy_id: Option<&StrategyId>,
6105 account_id: Option<&AccountId>,
6106 side: Option<OrderSide>,
6107 ) -> Vec<OrderRef<'_>> {
6108 let client_order_ids =
6109 self.client_order_ids_closed(venue, instrument_id, strategy_id, account_id);
6110 self.get_orders_for_ids(&client_order_ids, side)
6111 }
6112
6113 #[must_use]
6117 pub fn orders_closed(
6118 &self,
6119 venue: Option<&Venue>,
6120 instrument_id: Option<&InstrumentId>,
6121 strategy_id: Option<&StrategyId>,
6122 account_id: Option<&AccountId>,
6123 side: Option<OrderSide>,
6124 ) -> Vec<OrderRef<'_>> {
6125 self.orders_closed_refs(venue, instrument_id, strategy_id, account_id, side)
6126 }
6127
6128 #[must_use]
6133 pub fn orders_active_local_refs(
6134 &self,
6135 venue: Option<&Venue>,
6136 instrument_id: Option<&InstrumentId>,
6137 strategy_id: Option<&StrategyId>,
6138 account_id: Option<&AccountId>,
6139 side: Option<OrderSide>,
6140 ) -> Vec<OrderRef<'_>> {
6141 let client_order_ids =
6142 self.client_order_ids_active_local(venue, instrument_id, strategy_id, account_id);
6143 self.get_orders_for_ids(&client_order_ids, side)
6144 }
6145
6146 #[must_use]
6150 pub fn orders_active_local(
6151 &self,
6152 venue: Option<&Venue>,
6153 instrument_id: Option<&InstrumentId>,
6154 strategy_id: Option<&StrategyId>,
6155 account_id: Option<&AccountId>,
6156 side: Option<OrderSide>,
6157 ) -> Vec<OrderRef<'_>> {
6158 self.orders_active_local_refs(venue, instrument_id, strategy_id, account_id, side)
6159 }
6160
6161 #[must_use]
6163 pub fn orders_emulated_refs(
6164 &self,
6165 venue: Option<&Venue>,
6166 instrument_id: Option<&InstrumentId>,
6167 strategy_id: Option<&StrategyId>,
6168 account_id: Option<&AccountId>,
6169 side: Option<OrderSide>,
6170 ) -> Vec<OrderRef<'_>> {
6171 let client_order_ids =
6172 self.client_order_ids_emulated(venue, instrument_id, strategy_id, account_id);
6173 self.get_orders_for_ids(&client_order_ids, side)
6174 }
6175
6176 #[must_use]
6180 pub fn orders_emulated(
6181 &self,
6182 venue: Option<&Venue>,
6183 instrument_id: Option<&InstrumentId>,
6184 strategy_id: Option<&StrategyId>,
6185 account_id: Option<&AccountId>,
6186 side: Option<OrderSide>,
6187 ) -> Vec<OrderRef<'_>> {
6188 self.orders_emulated_refs(venue, instrument_id, strategy_id, account_id, side)
6189 }
6190
6191 #[must_use]
6193 pub fn orders_inflight_refs(
6194 &self,
6195 venue: Option<&Venue>,
6196 instrument_id: Option<&InstrumentId>,
6197 strategy_id: Option<&StrategyId>,
6198 account_id: Option<&AccountId>,
6199 side: Option<OrderSide>,
6200 ) -> Vec<OrderRef<'_>> {
6201 let client_order_ids =
6202 self.client_order_ids_inflight(venue, instrument_id, strategy_id, account_id);
6203 self.get_orders_for_ids(&client_order_ids, side)
6204 }
6205
6206 #[must_use]
6210 pub fn orders_inflight(
6211 &self,
6212 venue: Option<&Venue>,
6213 instrument_id: Option<&InstrumentId>,
6214 strategy_id: Option<&StrategyId>,
6215 account_id: Option<&AccountId>,
6216 side: Option<OrderSide>,
6217 ) -> Vec<OrderRef<'_>> {
6218 self.orders_inflight_refs(venue, instrument_id, strategy_id, account_id, side)
6219 }
6220
6221 #[must_use]
6223 pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<OrderRef<'_>> {
6224 match self.index.position_orders.get(position_id) {
6225 Some(client_order_ids) => self.get_orders_for_ids(client_order_ids, None),
6226 None => Vec::new(),
6227 }
6228 }
6229
6230 #[must_use]
6232 pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
6233 self.index.orders.contains(client_order_id)
6234 }
6235
6236 #[must_use]
6238 pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
6239 self.index.orders_open.contains(client_order_id)
6240 }
6241
6242 #[must_use]
6244 pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
6245 self.index.orders_closed.contains(client_order_id)
6246 }
6247
6248 #[must_use]
6253 pub fn is_order_active_local(&self, client_order_id: &ClientOrderId) -> bool {
6254 self.index.orders_active_local.contains(client_order_id)
6255 }
6256
6257 #[must_use]
6259 pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
6260 self.index.orders_emulated.contains(client_order_id)
6261 }
6262
6263 #[must_use]
6265 pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
6266 self.index.orders_inflight.contains(client_order_id)
6267 }
6268
6269 #[must_use]
6271 pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
6272 self.index.orders_pending_cancel.contains(client_order_id)
6273 }
6274
6275 #[must_use]
6277 pub fn orders_open_count(
6278 &self,
6279 venue: Option<&Venue>,
6280 instrument_id: Option<&InstrumentId>,
6281 strategy_id: Option<&StrategyId>,
6282 account_id: Option<&AccountId>,
6283 side: Option<OrderSide>,
6284 ) -> usize {
6285 self.count_orders_in_bucket(
6286 &self.index.orders_open,
6287 venue,
6288 instrument_id,
6289 strategy_id,
6290 account_id,
6291 side,
6292 )
6293 }
6294
6295 #[must_use]
6297 pub fn orders_closed_count(
6298 &self,
6299 venue: Option<&Venue>,
6300 instrument_id: Option<&InstrumentId>,
6301 strategy_id: Option<&StrategyId>,
6302 account_id: Option<&AccountId>,
6303 side: Option<OrderSide>,
6304 ) -> usize {
6305 self.count_orders_in_bucket(
6306 &self.index.orders_closed,
6307 venue,
6308 instrument_id,
6309 strategy_id,
6310 account_id,
6311 side,
6312 )
6313 }
6314
6315 #[must_use]
6320 pub fn orders_active_local_count(
6321 &self,
6322 venue: Option<&Venue>,
6323 instrument_id: Option<&InstrumentId>,
6324 strategy_id: Option<&StrategyId>,
6325 account_id: Option<&AccountId>,
6326 side: Option<OrderSide>,
6327 ) -> usize {
6328 self.count_orders_in_bucket(
6329 &self.index.orders_active_local,
6330 venue,
6331 instrument_id,
6332 strategy_id,
6333 account_id,
6334 side,
6335 )
6336 }
6337
6338 #[must_use]
6340 pub fn orders_emulated_count(
6341 &self,
6342 venue: Option<&Venue>,
6343 instrument_id: Option<&InstrumentId>,
6344 strategy_id: Option<&StrategyId>,
6345 account_id: Option<&AccountId>,
6346 side: Option<OrderSide>,
6347 ) -> usize {
6348 self.count_orders_in_bucket(
6349 &self.index.orders_emulated,
6350 venue,
6351 instrument_id,
6352 strategy_id,
6353 account_id,
6354 side,
6355 )
6356 }
6357
6358 #[must_use]
6360 pub fn orders_inflight_count(
6361 &self,
6362 venue: Option<&Venue>,
6363 instrument_id: Option<&InstrumentId>,
6364 strategy_id: Option<&StrategyId>,
6365 account_id: Option<&AccountId>,
6366 side: Option<OrderSide>,
6367 ) -> usize {
6368 self.count_orders_in_bucket(
6369 &self.index.orders_inflight,
6370 venue,
6371 instrument_id,
6372 strategy_id,
6373 account_id,
6374 side,
6375 )
6376 }
6377
6378 #[must_use]
6380 pub fn orders_total_count(
6381 &self,
6382 venue: Option<&Venue>,
6383 instrument_id: Option<&InstrumentId>,
6384 strategy_id: Option<&StrategyId>,
6385 account_id: Option<&AccountId>,
6386 side: Option<OrderSide>,
6387 ) -> usize {
6388 self.count_orders_in_bucket(
6389 &self.index.orders,
6390 venue,
6391 instrument_id,
6392 strategy_id,
6393 account_id,
6394 side,
6395 )
6396 }
6397
6398 #[must_use]
6404 pub fn has_orders_open(
6405 &self,
6406 venue: Option<&Venue>,
6407 instrument_id: Option<&InstrumentId>,
6408 strategy_id: Option<&StrategyId>,
6409 account_id: Option<&AccountId>,
6410 side: Option<OrderSide>,
6411 ) -> bool {
6412 self.any_orders_in_bucket(
6413 &self.index.orders_open,
6414 venue,
6415 instrument_id,
6416 strategy_id,
6417 account_id,
6418 side,
6419 )
6420 }
6421
6422 #[must_use]
6424 pub fn has_orders_closed(
6425 &self,
6426 venue: Option<&Venue>,
6427 instrument_id: Option<&InstrumentId>,
6428 strategy_id: Option<&StrategyId>,
6429 account_id: Option<&AccountId>,
6430 side: Option<OrderSide>,
6431 ) -> bool {
6432 self.any_orders_in_bucket(
6433 &self.index.orders_closed,
6434 venue,
6435 instrument_id,
6436 strategy_id,
6437 account_id,
6438 side,
6439 )
6440 }
6441
6442 #[must_use]
6446 pub fn has_orders_active_local(
6447 &self,
6448 venue: Option<&Venue>,
6449 instrument_id: Option<&InstrumentId>,
6450 strategy_id: Option<&StrategyId>,
6451 account_id: Option<&AccountId>,
6452 side: Option<OrderSide>,
6453 ) -> bool {
6454 self.any_orders_in_bucket(
6455 &self.index.orders_active_local,
6456 venue,
6457 instrument_id,
6458 strategy_id,
6459 account_id,
6460 side,
6461 )
6462 }
6463
6464 #[must_use]
6466 pub fn has_orders_emulated(
6467 &self,
6468 venue: Option<&Venue>,
6469 instrument_id: Option<&InstrumentId>,
6470 strategy_id: Option<&StrategyId>,
6471 account_id: Option<&AccountId>,
6472 side: Option<OrderSide>,
6473 ) -> bool {
6474 self.any_orders_in_bucket(
6475 &self.index.orders_emulated,
6476 venue,
6477 instrument_id,
6478 strategy_id,
6479 account_id,
6480 side,
6481 )
6482 }
6483
6484 #[must_use]
6486 pub fn has_orders_inflight(
6487 &self,
6488 venue: Option<&Venue>,
6489 instrument_id: Option<&InstrumentId>,
6490 strategy_id: Option<&StrategyId>,
6491 account_id: Option<&AccountId>,
6492 side: Option<OrderSide>,
6493 ) -> bool {
6494 self.any_orders_in_bucket(
6495 &self.index.orders_inflight,
6496 venue,
6497 instrument_id,
6498 strategy_id,
6499 account_id,
6500 side,
6501 )
6502 }
6503
6504 #[must_use]
6506 pub fn has_orders(
6507 &self,
6508 venue: Option<&Venue>,
6509 instrument_id: Option<&InstrumentId>,
6510 strategy_id: Option<&StrategyId>,
6511 account_id: Option<&AccountId>,
6512 side: Option<OrderSide>,
6513 ) -> bool {
6514 self.any_orders_in_bucket(
6515 &self.index.orders,
6516 venue,
6517 instrument_id,
6518 strategy_id,
6519 account_id,
6520 side,
6521 )
6522 }
6523
6524 #[must_use]
6526 pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
6527 self.order_lists.get(order_list_id)
6528 }
6529
6530 pub fn try_order_list(
6536 &self,
6537 order_list_id: &OrderListId,
6538 ) -> Result<&OrderList, OrderListLookupError> {
6539 self.order_lists
6540 .get(order_list_id)
6541 .ok_or_else(|| OrderListLookupError::not_found(*order_list_id))
6542 }
6543
6544 #[must_use]
6546 pub fn order_lists(
6547 &self,
6548 venue: Option<&Venue>,
6549 instrument_id: Option<&InstrumentId>,
6550 strategy_id: Option<&StrategyId>,
6551 account_id: Option<&AccountId>,
6552 ) -> Vec<&OrderList> {
6553 let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
6554
6555 if let Some(venue) = venue {
6556 order_lists.retain(|ol| &ol.instrument_id.venue == venue);
6557 }
6558
6559 if let Some(instrument_id) = instrument_id {
6560 order_lists.retain(|ol| &ol.instrument_id == instrument_id);
6561 }
6562
6563 if let Some(strategy_id) = strategy_id {
6564 order_lists.retain(|ol| &ol.strategy_id == strategy_id);
6565 }
6566
6567 if let Some(account_id) = account_id {
6568 order_lists.retain(|ol| {
6569 ol.client_order_ids.iter().any(|client_order_id| {
6570 self.orders.get(client_order_id).is_some_and(|order_cell| {
6571 order_cell.borrow().account_id().as_ref() == Some(account_id)
6572 })
6573 })
6574 });
6575 }
6576
6577 order_lists
6578 }
6579
6580 #[must_use]
6582 pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
6583 self.order_lists.contains_key(order_list_id)
6584 }
6585
6586 #[must_use]
6591 pub fn orders_for_exec_algorithm(
6592 &self,
6593 exec_algorithm_id: &ExecAlgorithmId,
6594 venue: Option<&Venue>,
6595 instrument_id: Option<&InstrumentId>,
6596 strategy_id: Option<&StrategyId>,
6597 account_id: Option<&AccountId>,
6598 side: Option<OrderSide>,
6599 ) -> Vec<OrderRef<'_>> {
6600 let Some(exec_algorithm_order_ids) =
6601 self.index.exec_algorithm_orders.get(exec_algorithm_id)
6602 else {
6603 return Vec::new();
6604 };
6605
6606 let filtered = self.query_orders_in_bucket(
6607 exec_algorithm_order_ids,
6608 venue,
6609 instrument_id,
6610 strategy_id,
6611 account_id,
6612 );
6613 self.get_orders_for_ids(&filtered, side)
6614 }
6615
6616 #[must_use]
6618 pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<OrderRef<'_>> {
6619 match self.index.exec_spawn_orders.get(exec_spawn_id) {
6620 Some(ids) => self.get_orders_for_ids(ids, None),
6621 None => Vec::new(),
6622 }
6623 }
6624
6625 #[must_use]
6627 pub fn exec_spawn_total_quantity(
6628 &self,
6629 exec_spawn_id: &ClientOrderId,
6630 active_only: bool,
6631 ) -> Option<Quantity> {
6632 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
6633
6634 let mut total_quantity: Option<Quantity> = None;
6635
6636 for spawn_order in exec_spawn_orders {
6637 if active_only && spawn_order.is_closed() {
6638 continue;
6639 }
6640
6641 match total_quantity.as_mut() {
6642 Some(total) => *total = *total + spawn_order.quantity(),
6643 None => total_quantity = Some(spawn_order.quantity()),
6644 }
6645 }
6646
6647 total_quantity
6648 }
6649
6650 #[must_use]
6652 pub fn exec_spawn_total_filled_qty(
6653 &self,
6654 exec_spawn_id: &ClientOrderId,
6655 active_only: bool,
6656 ) -> Option<Quantity> {
6657 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
6658
6659 let mut total_quantity: Option<Quantity> = None;
6660
6661 for spawn_order in exec_spawn_orders {
6662 if active_only && spawn_order.is_closed() {
6663 continue;
6664 }
6665
6666 match total_quantity.as_mut() {
6667 Some(total) => *total = *total + spawn_order.filled_qty(),
6668 None => total_quantity = Some(spawn_order.filled_qty()),
6669 }
6670 }
6671
6672 total_quantity
6673 }
6674
6675 #[must_use]
6677 pub fn exec_spawn_total_leaves_qty(
6678 &self,
6679 exec_spawn_id: &ClientOrderId,
6680 active_only: bool,
6681 ) -> Option<Quantity> {
6682 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
6683
6684 let mut total_quantity: Option<Quantity> = None;
6685
6686 for spawn_order in exec_spawn_orders {
6687 if active_only && spawn_order.is_closed() {
6688 continue;
6689 }
6690
6691 match total_quantity.as_mut() {
6692 Some(total) => *total = *total + spawn_order.leaves_qty(),
6693 None => total_quantity = Some(spawn_order.leaves_qty()),
6694 }
6695 }
6696
6697 total_quantity
6698 }
6699
6700 #[must_use]
6704 pub fn position_ref(&self, position_id: &PositionId) -> Option<PositionRef<'_>> {
6705 self.positions
6706 .get(position_id)
6707 .map(|position_cell| PositionRef::new(position_cell.borrow()))
6708 }
6709
6710 #[must_use]
6714 pub fn position(&self, position_id: &PositionId) -> Option<PositionRef<'_>> {
6715 self.position_ref(position_id)
6716 }
6717
6718 pub fn try_position_ref(
6724 &self,
6725 position_id: &PositionId,
6726 ) -> Result<PositionRef<'_>, PositionLookupError> {
6727 self.positions
6728 .get(position_id)
6729 .map(|position_cell| PositionRef::new(position_cell.borrow()))
6730 .ok_or_else(|| PositionLookupError::not_found(*position_id))
6731 }
6732
6733 pub fn try_position(
6741 &self,
6742 position_id: &PositionId,
6743 ) -> Result<PositionRef<'_>, PositionLookupError> {
6744 self.try_position_ref(position_id)
6745 }
6746
6747 #[must_use]
6757 pub fn position_mut(&mut self, position_id: &PositionId) -> Option<PositionRefMut<'_>> {
6758 self.positions
6759 .get(position_id)
6760 .map(|position_cell| PositionRefMut::new(position_cell.borrow_mut()))
6761 }
6762
6763 #[must_use]
6768 pub fn position_owned(&self, position_id: &PositionId) -> Option<Position> {
6769 self.positions
6770 .get(position_id)
6771 .map(|position_cell| position_cell.borrow().clone())
6772 }
6773
6774 #[must_use]
6776 pub fn position_for_order_ref(
6777 &self,
6778 client_order_id: &ClientOrderId,
6779 ) -> Option<PositionRef<'_>> {
6780 self.index
6781 .order_position
6782 .get(client_order_id)
6783 .and_then(|position_id| self.positions.get(position_id))
6784 .map(|position_cell| PositionRef::new(position_cell.borrow()))
6785 }
6786
6787 #[must_use]
6791 pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<PositionRef<'_>> {
6792 self.position_for_order_ref(client_order_id)
6793 }
6794
6795 #[must_use]
6797 pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
6798 self.index.order_position.get(client_order_id)
6799 }
6800
6801 #[must_use]
6807 pub fn positions_refs(
6808 &self,
6809 venue: Option<&Venue>,
6810 instrument_id: Option<&InstrumentId>,
6811 strategy_id: Option<&StrategyId>,
6812 account_id: Option<&AccountId>,
6813 side: Option<PositionSide>,
6814 ) -> Vec<PositionRef<'_>> {
6815 let position_ids = self.position_ids(venue, instrument_id, strategy_id, account_id);
6816 self.get_positions_for_ids(&position_ids, side)
6817 }
6818
6819 #[must_use]
6823 pub fn positions(
6824 &self,
6825 venue: Option<&Venue>,
6826 instrument_id: Option<&InstrumentId>,
6827 strategy_id: Option<&StrategyId>,
6828 account_id: Option<&AccountId>,
6829 side: Option<PositionSide>,
6830 ) -> Vec<PositionRef<'_>> {
6831 self.positions_refs(venue, instrument_id, strategy_id, account_id, side)
6832 }
6833
6834 #[must_use]
6836 pub fn positions_open_refs(
6837 &self,
6838 venue: Option<&Venue>,
6839 instrument_id: Option<&InstrumentId>,
6840 strategy_id: Option<&StrategyId>,
6841 account_id: Option<&AccountId>,
6842 side: Option<PositionSide>,
6843 ) -> Vec<PositionRef<'_>> {
6844 let position_ids = self.position_open_ids(venue, instrument_id, strategy_id, account_id);
6845 self.get_positions_for_ids(&position_ids, side)
6846 }
6847
6848 #[must_use]
6852 pub fn positions_open(
6853 &self,
6854 venue: Option<&Venue>,
6855 instrument_id: Option<&InstrumentId>,
6856 strategy_id: Option<&StrategyId>,
6857 account_id: Option<&AccountId>,
6858 side: Option<PositionSide>,
6859 ) -> Vec<PositionRef<'_>> {
6860 self.positions_open_refs(venue, instrument_id, strategy_id, account_id, side)
6861 }
6862
6863 #[must_use]
6865 pub fn positions_closed_refs(
6866 &self,
6867 venue: Option<&Venue>,
6868 instrument_id: Option<&InstrumentId>,
6869 strategy_id: Option<&StrategyId>,
6870 account_id: Option<&AccountId>,
6871 side: Option<PositionSide>,
6872 ) -> Vec<PositionRef<'_>> {
6873 let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id, account_id);
6874 self.get_positions_for_ids(&position_ids, side)
6875 }
6876
6877 #[must_use]
6881 pub fn positions_closed(
6882 &self,
6883 venue: Option<&Venue>,
6884 instrument_id: Option<&InstrumentId>,
6885 strategy_id: Option<&StrategyId>,
6886 account_id: Option<&AccountId>,
6887 side: Option<PositionSide>,
6888 ) -> Vec<PositionRef<'_>> {
6889 self.positions_closed_refs(venue, instrument_id, strategy_id, account_id, side)
6890 }
6891
6892 #[must_use]
6894 pub fn position_exists(&self, position_id: &PositionId) -> bool {
6895 self.index.positions.contains(position_id)
6896 }
6897
6898 #[must_use]
6900 pub fn is_position_open(&self, position_id: &PositionId) -> bool {
6901 self.index.positions_open.contains(position_id)
6902 }
6903
6904 #[must_use]
6906 pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
6907 self.index.positions_closed.contains(position_id)
6908 }
6909
6910 #[must_use]
6912 pub fn positions_open_count(
6913 &self,
6914 venue: Option<&Venue>,
6915 instrument_id: Option<&InstrumentId>,
6916 strategy_id: Option<&StrategyId>,
6917 account_id: Option<&AccountId>,
6918 side: Option<PositionSide>,
6919 ) -> usize {
6920 self.count_positions_in_bucket(
6921 &self.index.positions_open,
6922 venue,
6923 instrument_id,
6924 strategy_id,
6925 account_id,
6926 side,
6927 )
6928 }
6929
6930 #[must_use]
6932 pub fn positions_closed_count(
6933 &self,
6934 venue: Option<&Venue>,
6935 instrument_id: Option<&InstrumentId>,
6936 strategy_id: Option<&StrategyId>,
6937 account_id: Option<&AccountId>,
6938 side: Option<PositionSide>,
6939 ) -> usize {
6940 self.count_positions_in_bucket(
6941 &self.index.positions_closed,
6942 venue,
6943 instrument_id,
6944 strategy_id,
6945 account_id,
6946 side,
6947 )
6948 }
6949
6950 #[must_use]
6952 pub fn positions_total_count(
6953 &self,
6954 venue: Option<&Venue>,
6955 instrument_id: Option<&InstrumentId>,
6956 strategy_id: Option<&StrategyId>,
6957 account_id: Option<&AccountId>,
6958 side: Option<PositionSide>,
6959 ) -> usize {
6960 self.count_positions_in_bucket(
6961 &self.index.positions,
6962 venue,
6963 instrument_id,
6964 strategy_id,
6965 account_id,
6966 side,
6967 )
6968 }
6969
6970 #[must_use]
6976 pub fn has_positions_open(
6977 &self,
6978 venue: Option<&Venue>,
6979 instrument_id: Option<&InstrumentId>,
6980 strategy_id: Option<&StrategyId>,
6981 account_id: Option<&AccountId>,
6982 side: Option<PositionSide>,
6983 ) -> bool {
6984 self.any_positions_in_bucket(
6985 &self.index.positions_open,
6986 venue,
6987 instrument_id,
6988 strategy_id,
6989 account_id,
6990 side,
6991 )
6992 }
6993
6994 #[must_use]
6996 pub fn has_positions_closed(
6997 &self,
6998 venue: Option<&Venue>,
6999 instrument_id: Option<&InstrumentId>,
7000 strategy_id: Option<&StrategyId>,
7001 account_id: Option<&AccountId>,
7002 side: Option<PositionSide>,
7003 ) -> bool {
7004 self.any_positions_in_bucket(
7005 &self.index.positions_closed,
7006 venue,
7007 instrument_id,
7008 strategy_id,
7009 account_id,
7010 side,
7011 )
7012 }
7013
7014 #[must_use]
7016 pub fn has_positions(
7017 &self,
7018 venue: Option<&Venue>,
7019 instrument_id: Option<&InstrumentId>,
7020 strategy_id: Option<&StrategyId>,
7021 account_id: Option<&AccountId>,
7022 side: Option<PositionSide>,
7023 ) -> bool {
7024 self.any_positions_in_bucket(
7025 &self.index.positions,
7026 venue,
7027 instrument_id,
7028 strategy_id,
7029 account_id,
7030 side,
7031 )
7032 }
7033
7034 #[must_use]
7038 pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
7039 self.index.order_strategy.get(client_order_id)
7040 }
7041
7042 #[must_use]
7044 pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
7045 self.index.position_strategy.get(position_id)
7046 }
7047
7048 pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
7056 check_valid_string_ascii(key, stringify!(key))?;
7057
7058 Ok(self.general.get(key))
7059 }
7060
7061 #[must_use]
7070 pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
7071 match price_type {
7072 PriceType::Bid => self
7073 .quotes
7074 .get(instrument_id)
7075 .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
7076 PriceType::Ask => self
7077 .quotes
7078 .get(instrument_id)
7079 .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
7080 PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
7081 quotes.front().map(|quote| {
7082 let mid = (quote.ask_price.as_decimal() + quote.bid_price.as_decimal())
7083 / Decimal::TWO;
7084
7085 Price::from_decimal_dp(mid, quote.bid_price.precision + 1)
7086 .expect("Invalid mid price for Cache::price")
7087 })
7088 }),
7089 PriceType::Last => self
7090 .trades
7091 .get(instrument_id)
7092 .and_then(|trades| trades.front().map(|trade| trade.price)),
7093 PriceType::Mark => self
7094 .mark_prices
7095 .get(instrument_id)
7096 .and_then(|marks| marks.front().map(|mark| mark.value)),
7097 }
7098 }
7099
7100 #[must_use]
7102 pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
7103 self.quotes
7104 .get(instrument_id)
7105 .map(|quotes| quotes.iter().copied().collect())
7106 }
7107
7108 #[must_use]
7110 pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
7111 self.trades
7112 .get(instrument_id)
7113 .map(|trades| trades.iter().copied().collect())
7114 }
7115
7116 #[must_use]
7118 pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
7119 self.mark_prices
7120 .get(instrument_id)
7121 .map(|mark_prices| mark_prices.iter().copied().collect())
7122 }
7123
7124 #[must_use]
7126 pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
7127 self.index_prices
7128 .get(instrument_id)
7129 .map(|index_prices| index_prices.iter().copied().collect())
7130 }
7131
7132 #[must_use]
7134 pub fn funding_rates(&self, instrument_id: &InstrumentId) -> Option<Vec<FundingRateUpdate>> {
7135 self.funding_rates
7136 .get(instrument_id)
7137 .map(|funding_rates| funding_rates.iter().copied().collect())
7138 }
7139
7140 #[must_use]
7142 pub fn instrument_statuses(
7143 &self,
7144 instrument_id: &InstrumentId,
7145 ) -> Option<Vec<InstrumentStatus>> {
7146 self.instrument_statuses
7147 .get(instrument_id)
7148 .map(|statuses| statuses.iter().copied().collect())
7149 }
7150
7151 #[must_use]
7153 pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
7154 self.bars
7155 .get(bar_type)
7156 .map(|bars| bars.iter().copied().collect())
7157 }
7158
7159 #[must_use]
7161 pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
7162 self.books.get(instrument_id)
7163 }
7164
7165 pub fn try_order_book(
7171 &self,
7172 instrument_id: &InstrumentId,
7173 ) -> Result<&OrderBook, OrderBookLookupError> {
7174 self.books
7175 .get(instrument_id)
7176 .ok_or_else(|| OrderBookLookupError::not_found(*instrument_id))
7177 }
7178
7179 #[must_use]
7181 pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
7182 self.books.get_mut(instrument_id)
7183 }
7184
7185 #[must_use]
7187 pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
7188 self.own_books.get(instrument_id)
7189 }
7190
7191 pub fn try_own_order_book(
7198 &self,
7199 instrument_id: &InstrumentId,
7200 ) -> Result<&OwnOrderBook, OwnOrderBookLookupError> {
7201 self.own_books
7202 .get(instrument_id)
7203 .ok_or_else(|| OwnOrderBookLookupError::not_found(*instrument_id))
7204 }
7205
7206 #[must_use]
7208 pub fn own_order_book_mut(
7209 &mut self,
7210 instrument_id: &InstrumentId,
7211 ) -> Option<&mut OwnOrderBook> {
7212 self.own_books.get_mut(instrument_id)
7213 }
7214
7215 #[must_use]
7217 pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
7218 self.quotes
7219 .get(instrument_id)
7220 .and_then(|quotes| quotes.front())
7221 }
7222
7223 #[must_use]
7227 pub fn quote_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<&QuoteTick> {
7228 self.quotes
7229 .get(instrument_id)
7230 .and_then(|quotes| quotes.get(index))
7231 }
7232
7233 #[must_use]
7235 pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
7236 self.trades
7237 .get(instrument_id)
7238 .and_then(|trades| trades.front())
7239 }
7240
7241 #[must_use]
7245 pub fn trade_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<&TradeTick> {
7246 self.trades
7247 .get(instrument_id)
7248 .and_then(|trades| trades.get(index))
7249 }
7250
7251 #[must_use]
7253 pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
7254 self.mark_prices
7255 .get(instrument_id)
7256 .and_then(|mark_prices| mark_prices.front())
7257 }
7258
7259 #[must_use]
7261 pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
7262 self.index_prices
7263 .get(instrument_id)
7264 .and_then(|index_prices| index_prices.front())
7265 }
7266
7267 #[must_use]
7269 pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
7270 self.funding_rates
7271 .get(instrument_id)
7272 .and_then(|funding_rates| funding_rates.front())
7273 }
7274
7275 #[must_use]
7277 pub fn instrument_status(&self, instrument_id: &InstrumentId) -> Option<&InstrumentStatus> {
7278 self.instrument_statuses
7279 .get(instrument_id)
7280 .and_then(|statuses| statuses.front())
7281 }
7282
7283 #[must_use]
7285 pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
7286 self.bars.get(bar_type).and_then(|bars| bars.front())
7287 }
7288
7289 #[must_use]
7293 pub fn bar_at_index(&self, bar_type: &BarType, index: usize) -> Option<&Bar> {
7294 self.bars.get(bar_type).and_then(|bars| bars.get(index))
7295 }
7296
7297 #[must_use]
7299 pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
7300 self.books
7301 .get(instrument_id)
7302 .map_or(0, |book| book.update_count) as usize
7303 }
7304
7305 #[must_use]
7307 pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
7308 self.quotes
7309 .get(instrument_id)
7310 .map_or(0, BoundedVecDeque::len)
7311 }
7312
7313 #[must_use]
7315 pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
7316 self.trades
7317 .get(instrument_id)
7318 .map_or(0, BoundedVecDeque::len)
7319 }
7320
7321 #[must_use]
7323 pub fn bar_count(&self, bar_type: &BarType) -> usize {
7324 self.bars.get(bar_type).map_or(0, BoundedVecDeque::len)
7325 }
7326
7327 #[must_use]
7329 pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
7330 self.books.contains_key(instrument_id)
7331 }
7332
7333 #[must_use]
7335 pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
7336 self.quote_count(instrument_id) > 0
7337 }
7338
7339 #[must_use]
7341 pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
7342 self.trade_count(instrument_id) > 0
7343 }
7344
7345 #[must_use]
7347 pub fn has_bars(&self, bar_type: &BarType) -> bool {
7348 self.bar_count(bar_type) > 0
7349 }
7350
7351 #[must_use]
7352 pub fn get_xrate(
7353 &self,
7354 venue: Venue,
7355 from_currency: Currency,
7356 to_currency: Currency,
7357 price_type: PriceType,
7358 ) -> Option<Decimal> {
7359 if from_currency == to_currency {
7360 return Some(Decimal::ONE);
7363 }
7364
7365 let (bid_quote, ask_quote) = self.build_quote_table(&venue);
7366
7367 match get_exchange_rate(
7368 from_currency.code,
7369 to_currency.code,
7370 price_type,
7371 bid_quote,
7372 ask_quote,
7373 ) {
7374 Ok(rate) => rate,
7375 Err(e) => {
7376 log::error!("Failed to calculate xrate: {e}");
7377 None
7378 }
7379 }
7380 }
7381
7382 fn build_quote_table(
7383 &self,
7384 venue: &Venue,
7385 ) -> (AHashMap<Ustr, Decimal>, AHashMap<Ustr, Decimal>) {
7386 let mut bid_quotes = AHashMap::new();
7387 let mut ask_quotes = AHashMap::new();
7388
7389 for instrument_id in self.instruments.keys() {
7390 if instrument_id.venue != *venue {
7391 continue;
7392 }
7393
7394 let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
7395 if let Some(tick) = ticks.front() {
7396 (tick.bid_price, tick.ask_price)
7397 } else {
7398 continue; }
7400 } else {
7401 let bid_bar = self
7402 .bars
7403 .iter()
7404 .find(|(k, _)| {
7405 k.instrument_id() == *instrument_id
7406 && matches!(k.spec().price_type, PriceType::Bid)
7407 })
7408 .map(|(_, v)| v);
7409
7410 let ask_bar = self
7411 .bars
7412 .iter()
7413 .find(|(k, _)| {
7414 k.instrument_id() == *instrument_id
7415 && matches!(k.spec().price_type, PriceType::Ask)
7416 })
7417 .map(|(_, v)| v);
7418
7419 match (bid_bar, ask_bar) {
7420 (Some(bid), Some(ask)) => {
7421 match (bid.front(), ask.front()) {
7422 (Some(bid_bar), Some(ask_bar)) => (bid_bar.close, ask_bar.close),
7423 _ => {
7424 continue;
7426 }
7427 }
7428 }
7429 _ => continue,
7430 }
7431 };
7432
7433 bid_quotes.insert(instrument_id.symbol.inner(), bid_price.as_decimal());
7434 ask_quotes.insert(instrument_id.symbol.inner(), ask_price.as_decimal());
7435 }
7436
7437 (bid_quotes, ask_quotes)
7438 }
7439
7440 #[must_use]
7442 pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
7443 self.mark_xrates.get(&(from_currency, to_currency)).copied()
7444 }
7445
7446 pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
7452 assert!(xrate > 0.0, "xrate was zero");
7453 self.mark_xrates.insert((from_currency, to_currency), xrate);
7454 self.mark_xrates
7455 .insert((to_currency, from_currency), 1.0 / xrate);
7456 }
7457
7458 pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
7460 let _ = self.mark_xrates.remove(&(from_currency, to_currency));
7461 }
7462
7463 pub fn clear_mark_xrates(&mut self) {
7465 self.mark_xrates.clear();
7466 }
7467
7468 #[must_use]
7470 pub fn currency(&self, code: &Ustr) -> Option<&Currency> {
7471 self.currencies.get(code)
7472 }
7473
7474 pub fn try_currency(&self, code: &Ustr) -> Result<&Currency, CurrencyLookupError> {
7480 self.currencies
7481 .get(code)
7482 .ok_or_else(|| CurrencyLookupError::not_found(*code))
7483 }
7484
7485 #[must_use]
7489 pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
7490 self.instruments.get(instrument_id)
7491 }
7492
7493 pub fn try_instrument(
7499 &self,
7500 instrument_id: &InstrumentId,
7501 ) -> Result<&InstrumentAny, InstrumentLookupError> {
7502 self.instruments
7503 .get(instrument_id)
7504 .ok_or_else(|| InstrumentLookupError::not_found(*instrument_id))
7505 }
7506
7507 #[must_use]
7509 pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
7510 match venue {
7511 Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
7512 None => self.instruments.keys().collect(),
7513 }
7514 }
7515
7516 #[must_use]
7518 pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
7519 self.instruments
7520 .values()
7521 .filter(|i| &i.id().venue == venue)
7522 .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
7523 .collect()
7524 }
7525
7526 #[must_use]
7533 pub fn instruments_by_parent(
7534 &self,
7535 venue: &Venue,
7536 root: &Ustr,
7537 class: InstrumentClass,
7538 ) -> Vec<&InstrumentAny> {
7539 self.instruments
7540 .values()
7541 .filter(|i| &i.id().venue == venue)
7542 .filter(|i| i.underlying() == Some(*root))
7543 .filter(|i| i.instrument_class() == class)
7544 .collect()
7545 }
7546
7547 #[must_use]
7549 pub fn bar_types(
7550 &self,
7551 instrument_id: Option<&InstrumentId>,
7552 price_type: Option<&PriceType>,
7553 aggregation_source: AggregationSource,
7554 ) -> Vec<&BarType> {
7555 let mut bar_types = self
7556 .bars
7557 .keys()
7558 .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
7559 .collect::<Vec<&BarType>>();
7560
7561 if let Some(instrument_id) = instrument_id {
7562 bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
7563 }
7564
7565 if let Some(price_type) = price_type {
7566 bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
7567 }
7568
7569 bar_types
7570 }
7571
7572 #[must_use]
7576 pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
7577 self.synthetics.get(instrument_id)
7578 }
7579
7580 pub fn try_synthetic(
7587 &self,
7588 instrument_id: &InstrumentId,
7589 ) -> Result<&SyntheticInstrument, SyntheticInstrumentLookupError> {
7590 self.synthetics
7591 .get(instrument_id)
7592 .ok_or_else(|| SyntheticInstrumentLookupError::not_found(*instrument_id))
7593 }
7594
7595 #[must_use]
7597 pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
7598 self.synthetics.keys().collect()
7599 }
7600
7601 #[must_use]
7603 pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
7604 self.synthetics.values().collect()
7605 }
7606
7607 #[must_use]
7611 pub fn account_ref(&self, account_id: &AccountId) -> Option<AccountRef<'_>> {
7612 self.accounts
7613 .get(account_id)
7614 .map(|account_cell| AccountRef::new(account_cell.borrow()))
7615 }
7616
7617 #[must_use]
7621 pub fn account(&self, account_id: &AccountId) -> Option<AccountRef<'_>> {
7622 self.account_ref(account_id)
7623 }
7624
7625 pub fn try_account_ref(
7631 &self,
7632 account_id: &AccountId,
7633 ) -> Result<AccountRef<'_>, AccountLookupError> {
7634 self.accounts
7635 .get(account_id)
7636 .map(|account_cell| AccountRef::new(account_cell.borrow()))
7637 .ok_or_else(|| AccountLookupError::not_found(*account_id))
7638 }
7639
7640 pub fn try_account(
7648 &self,
7649 account_id: &AccountId,
7650 ) -> Result<AccountRef<'_>, AccountLookupError> {
7651 self.try_account_ref(account_id)
7652 }
7653
7654 #[must_use]
7664 pub fn account_mut(&mut self, account_id: &AccountId) -> Option<AccountRefMut<'_>> {
7665 self.accounts
7666 .get(account_id)
7667 .map(|account_cell| AccountRefMut::new(account_cell.borrow_mut()))
7668 }
7669
7670 #[must_use]
7675 pub fn account_owned(&self, account_id: &AccountId) -> Option<AccountAny> {
7676 self.accounts
7677 .get(account_id)
7678 .map(|account_cell| account_cell.borrow().clone())
7679 }
7680
7681 #[must_use]
7683 pub fn account_for_venue(&self, venue: &Venue) -> Option<AccountRef<'_>> {
7684 self.index
7685 .venue_account
7686 .get(venue)
7687 .and_then(|account_id| self.accounts.get(account_id))
7688 .map(|account_cell| AccountRef::new(account_cell.borrow()))
7689 }
7690
7691 #[must_use]
7696 pub fn account_for_venue_owned(&self, venue: &Venue) -> Option<AccountAny> {
7697 self.index
7698 .venue_account
7699 .get(venue)
7700 .and_then(|account_id| self.accounts.get(account_id))
7701 .map(|account_cell| account_cell.borrow().clone())
7702 }
7703
7704 #[must_use]
7706 pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
7707 self.index.venue_account.get(venue)
7708 }
7709
7710 #[must_use]
7716 pub fn accounts(&self, account_id: &AccountId) -> Vec<AccountRef<'_>> {
7717 self.accounts
7718 .values()
7719 .filter(|account_cell| &account_cell.borrow().id() == account_id)
7720 .map(|account_cell| AccountRef::new(account_cell.borrow()))
7721 .collect()
7722 }
7723
7724 #[must_use]
7726 pub fn accounts_all_owned(&self) -> Vec<AccountAny> {
7727 self.accounts
7728 .values()
7729 .map(|account_cell| account_cell.borrow().clone())
7730 .collect()
7731 }
7732
7733 pub fn update_own_order_book(&mut self, order: &OrderAny) {
7741 if !order.has_price() {
7742 return;
7743 }
7744
7745 let instrument_id = order.instrument_id();
7746
7747 if !self.own_books.contains_key(&instrument_id) {
7748 if order.is_closed() {
7749 return;
7750 }
7751
7752 self.own_books
7753 .insert(instrument_id, OwnOrderBook::new(instrument_id));
7754 }
7755
7756 let Some(own_book) = self.own_books.get_mut(&instrument_id) else {
7757 return;
7758 };
7759
7760 let own_book_order = order.to_own_book_order();
7761
7762 if order.is_closed() {
7763 if let Err(e) = own_book.delete(own_book_order) {
7764 log::debug!(
7765 "Failed to delete order {} from own book: {e}",
7766 order.client_order_id(),
7767 );
7768 } else {
7769 log::debug!("Deleted order {} from own book", order.client_order_id());
7770 }
7771 } else {
7772 if let Err(e) = own_book.update(own_book_order) {
7774 log::debug!(
7775 "Failed to update order {} in own book: {e}; inserting instead",
7776 order.client_order_id(),
7777 );
7778 own_book.add(own_book_order);
7779 }
7780 log::debug!("Updated order {} in own book", order.client_order_id());
7781 }
7782 }
7783
7784 pub fn force_remove_from_own_order_book(&mut self, client_order_id: &ClientOrderId) {
7790 let Some(order_cell) = self.orders.get(client_order_id) else {
7791 return;
7792 };
7793 let order = order_cell.borrow();
7794 let instrument_id = order.instrument_id();
7795 let own_book_order = if order.has_price() {
7796 Some(order.to_own_book_order())
7797 } else {
7798 None
7799 };
7800 drop(order);
7801
7802 self.index.orders_open.remove(client_order_id);
7803 self.index.orders_pending_cancel.remove(client_order_id);
7804 self.index.orders_inflight.remove(client_order_id);
7805 self.index.orders_emulated.remove(client_order_id);
7806 self.index.orders_active_local.remove(client_order_id);
7807
7808 if let Some(own_book) = self.own_books.get_mut(&instrument_id)
7809 && let Some(own_book_order) = own_book_order
7810 {
7811 if let Err(e) = own_book.delete(own_book_order) {
7812 log::debug!("Could not force delete {client_order_id} from own book: {e}");
7813 } else {
7814 log::debug!("Force deleted {client_order_id} from own book");
7815 }
7816 }
7817
7818 self.index.orders_closed.insert(*client_order_id);
7819 }
7820
7821 pub fn audit_own_order_books(&mut self) {
7828 log::debug!("Starting own books audit");
7829 let start = std::time::Instant::now();
7830
7831 let valid_order_ids: AHashSet<ClientOrderId> = self
7834 .index
7835 .orders_open
7836 .union(&self.index.orders_inflight)
7837 .copied()
7838 .collect();
7839
7840 for own_book in self.own_books.values_mut() {
7841 own_book.audit_open_orders(&valid_order_ids);
7842 }
7843
7844 log::debug!("Completed own books audit in {:?}", start.elapsed());
7845 }
7846}
7847
7848fn parse_position_snapshot_blob_ref(blob_ref: &str) -> anyhow::Result<(PositionId, usize)> {
7849 let Some(rest) = blob_ref.strip_prefix("cache://position-snapshots/") else {
7850 anyhow::bail!("unsupported cache snapshot blob_ref {blob_ref}");
7851 };
7852
7853 let Some((position_id, snapshot_index)) = rest.rsplit_once('/') else {
7854 anyhow::bail!("malformed position snapshot blob_ref {blob_ref}");
7855 };
7856
7857 if position_id.is_empty() {
7858 anyhow::bail!("position snapshot blob_ref {blob_ref} has empty position id");
7859 }
7860
7861 let snapshot_index = snapshot_index.parse::<usize>().map_err(|e| {
7862 anyhow::anyhow!("position snapshot blob_ref {blob_ref} has invalid frame index: {e}")
7863 })?;
7864
7865 Ok((PositionId::new(position_id), snapshot_index))
7866}
7867
7868fn validate_position_snapshot_blob(position_id: &PositionId, blob: &[u8]) -> anyhow::Result<()> {
7869 let snapshot = serde_json::from_slice::<Position>(blob)?;
7870 let expected_prefix = format!("{}-", position_id.as_str());
7871
7872 let Some(snapshot_uuid) = snapshot.id.as_str().strip_prefix(&expected_prefix) else {
7873 anyhow::bail!(
7874 "position snapshot id {} does not match blob_ref position {position_id}",
7875 snapshot.id
7876 );
7877 };
7878
7879 if UUID4::from_str(snapshot_uuid).is_err() {
7880 anyhow::bail!(
7881 "position snapshot id {} does not match blob_ref position {position_id}",
7882 snapshot.id
7883 );
7884 }
7885
7886 Ok(())
7887}