1pub mod config;
21pub mod database;
22pub mod fifo;
23pub mod quote;
24pub mod refs;
25
26mod bounded;
27mod index;
28
29#[cfg(test)]
30mod tests;
31
32use std::{
33 borrow::Cow,
34 cell::{Ref, RefCell},
35 fmt::{Debug, Display},
36 rc::Rc,
37 str::FromStr,
38 time::{SystemTime, UNIX_EPOCH},
39};
40
41use ahash::{AHashMap, AHashSet};
42use bounded::BoundedVecDeque;
43use bytes::Bytes;
44pub use config::CacheConfig; use database::{CacheDatabaseAdapter, CacheMap};
46use index::CacheIndex;
47use nautilus_core::{
48 SharedCell, UUID4, UnixNanos,
49 correctness::{
50 check_key_not_in_map, check_predicate_false, check_slice_not_empty,
51 check_valid_string_ascii,
52 },
53 datetime::secs_to_nanos_unchecked,
54};
55use nautilus_model::{
56 accounts::{Account, AccountAny},
57 data::{
58 Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, InstrumentStatus,
59 MarkPriceUpdate, QuoteTick, TradeTick, YieldCurveData, option_chain::OptionGreeks,
60 },
61 enums::{
62 AggregationSource, ContingencyType, InstrumentClass, OmsType, OrderSide, PositionSide,
63 PriceType, TriggerType,
64 },
65 events::{AccountState, OrderEventAny},
66 identifiers::{
67 AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
68 OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
69 },
70 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
71 orderbook::{
72 OrderBook,
73 own::{OwnOrderBook, should_handle_own_book_order},
74 },
75 orders::{Order, OrderAny, OrderError, OrderList},
76 position::Position,
77 types::{Currency, Money, Price, Quantity},
78};
79pub use refs::{AccountRef, AccountRefMut, OrderRef, OrderRefMut, PositionRef, PositionRefMut};
80use rust_decimal::Decimal;
81use ustr::Ustr;
82
83use crate::xrate::get_exchange_rate;
84
85#[derive(Clone, Debug, PartialEq, Eq)]
90pub struct CacheSnapshotRef {
91 pub blob_ref: String,
93 pub blob: Bytes,
95}
96
97impl CacheSnapshotRef {
98 #[must_use]
100 pub fn new(blob_ref: impl Into<String>, blob: impl Into<Bytes>) -> Self {
101 Self {
102 blob_ref: blob_ref.into(),
103 blob: blob.into(),
104 }
105 }
106}
107
108#[derive(Clone, Debug)]
113pub struct CacheView {
114 inner: Rc<RefCell<Cache>>,
115}
116
117impl CacheView {
118 #[must_use]
120 pub fn new(inner: Rc<RefCell<Cache>>) -> Self {
121 Self { inner }
122 }
123
124 pub fn borrow(&self) -> Ref<'_, Cache> {
130 self.inner.borrow()
131 }
132}
133
134impl From<Rc<RefCell<Cache>>> for CacheView {
135 fn from(inner: Rc<RefCell<Cache>>) -> Self {
136 Self::new(inner)
137 }
138}
139
140enum FilterSources<'a, K> {
147 Unfiltered,
148 Empty,
149 Sets(Vec<&'a AHashSet<K>>),
150}
151
152fn intersect_filter_sources<K>(mut sources: Vec<&AHashSet<K>>) -> AHashSet<K>
158where
159 K: Copy + Eq + std::hash::Hash,
160{
161 debug_assert!(!sources.is_empty());
162 sources.sort_unstable_by_key(|s| s.len());
163 let driver = sources[0];
164 let rest = &sources[1..];
165
166 if rest.is_empty() {
167 return driver.clone();
168 }
169
170 driver
171 .iter()
172 .filter(|id| rest.iter().all(|s| s.contains(id)))
173 .copied()
174 .collect()
175}
176
177fn intersect_pair_or_many<'a, K>(
185 bucket: &'a AHashSet<K>,
186 mut sources: Vec<&'a AHashSet<K>>,
187) -> AHashSet<K>
188where
189 K: Copy + Eq + std::hash::Hash,
190{
191 debug_assert!(!sources.is_empty());
192 if sources.len() == 1 {
193 let filter = sources[0];
194 let (larger, smaller) = if bucket.len() >= filter.len() {
195 (bucket, filter)
196 } else {
197 (filter, bucket)
198 };
199 return larger.intersection(smaller).copied().collect();
200 }
201
202 sources.push(bucket);
203 intersect_filter_sources(sources)
204}
205
206#[cfg_attr(
208 feature = "python",
209 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
210)]
211pub struct Cache {
212 config: CacheConfig,
213 index: CacheIndex,
214 database: Option<Box<dyn CacheDatabaseAdapter>>,
215 general: AHashMap<String, Bytes>,
216 currencies: AHashMap<Ustr, Currency>,
217 instruments: AHashMap<InstrumentId, InstrumentAny>,
218 synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
219 books: AHashMap<InstrumentId, OrderBook>,
220 own_books: AHashMap<InstrumentId, OwnOrderBook>,
221 quotes: AHashMap<InstrumentId, BoundedVecDeque<QuoteTick>>,
222 trades: AHashMap<InstrumentId, BoundedVecDeque<TradeTick>>,
223 mark_xrates: AHashMap<(Currency, Currency), f64>,
224 mark_prices: AHashMap<InstrumentId, BoundedVecDeque<MarkPriceUpdate>>,
225 index_prices: AHashMap<InstrumentId, BoundedVecDeque<IndexPriceUpdate>>,
226 funding_rates: AHashMap<InstrumentId, BoundedVecDeque<FundingRateUpdate>>,
227 instrument_statuses: AHashMap<InstrumentId, BoundedVecDeque<InstrumentStatus>>,
228 bars: AHashMap<BarType, BoundedVecDeque<Bar>>,
229 greeks: AHashMap<InstrumentId, GreeksData>,
230 option_greeks: AHashMap<InstrumentId, OptionGreeks>,
231 yield_curves: AHashMap<String, YieldCurveData>,
232 accounts: AHashMap<AccountId, SharedCell<AccountAny>>,
233 orders: AHashMap<ClientOrderId, SharedCell<OrderAny>>,
234 order_lists: AHashMap<OrderListId, OrderList>,
235 positions: AHashMap<PositionId, SharedCell<Position>>,
236 position_snapshots: AHashMap<PositionId, Vec<Bytes>>,
237 #[cfg(feature = "defi")]
238 pub(crate) defi: crate::defi::cache::DefiCache,
239}
240
241impl Debug for Cache {
242 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243 f.debug_struct(stringify!(Cache))
244 .field("config", &self.config)
245 .field("index", &self.index)
246 .field("general", &self.general)
247 .field("currencies", &self.currencies)
248 .field("instruments", &self.instruments)
249 .field("synthetics", &self.synthetics)
250 .field("books", &self.books)
251 .field("own_books", &self.own_books)
252 .field("quotes", &self.quotes)
253 .field("trades", &self.trades)
254 .field("mark_xrates", &self.mark_xrates)
255 .field("mark_prices", &self.mark_prices)
256 .field("index_prices", &self.index_prices)
257 .field("funding_rates", &self.funding_rates)
258 .field("instrument_statuses", &self.instrument_statuses)
259 .field("bars", &self.bars)
260 .field("greeks", &self.greeks)
261 .field("option_greeks", &self.option_greeks)
262 .field("yield_curves", &self.yield_curves)
263 .field("accounts", &self.accounts)
264 .field("orders", &self.orders)
265 .field("order_lists", &self.order_lists)
266 .field("positions", &self.positions)
267 .field("position_snapshots", &self.position_snapshots)
268 .finish()
269 }
270}
271
272impl Default for Cache {
273 fn default() -> Self {
275 Self::new(Some(CacheConfig::default()), None)
276 }
277}
278
279impl Cache {
280 #[must_use]
282 pub fn new(
290 config: Option<CacheConfig>,
291 database: Option<Box<dyn CacheDatabaseAdapter>>,
292 ) -> Self {
293 let config = config.unwrap_or_default();
294 config.validate().expect("invalid `CacheConfig`");
295
296 Self {
297 config,
298 index: CacheIndex::default(),
299 database,
300 general: AHashMap::new(),
301 currencies: AHashMap::new(),
302 instruments: AHashMap::new(),
303 synthetics: AHashMap::new(),
304 books: AHashMap::new(),
305 own_books: AHashMap::new(),
306 quotes: AHashMap::new(),
307 trades: AHashMap::new(),
308 mark_xrates: AHashMap::new(),
309 mark_prices: AHashMap::new(),
310 index_prices: AHashMap::new(),
311 funding_rates: AHashMap::new(),
312 instrument_statuses: AHashMap::new(),
313 bars: AHashMap::new(),
314 greeks: AHashMap::new(),
315 option_greeks: AHashMap::new(),
316 yield_curves: AHashMap::new(),
317 accounts: AHashMap::new(),
318 orders: AHashMap::new(),
319 order_lists: AHashMap::new(),
320 positions: AHashMap::new(),
321 position_snapshots: AHashMap::new(),
322 #[cfg(feature = "defi")]
323 defi: crate::defi::cache::DefiCache::default(),
324 }
325 }
326
327 #[must_use]
329 pub fn memory_address(&self) -> String {
330 format!("{:?}", std::ptr::from_ref(self))
331 }
332
333 pub fn set_database(&mut self, database: Box<dyn CacheDatabaseAdapter>) {
337 let type_name = std::any::type_name_of_val(&*database);
338 log::info!("Cache database adapter set: {type_name}");
339 self.database = Some(database);
340 }
341
342 pub fn cache_general(&mut self) -> anyhow::Result<()> {
350 self.general = match &mut self.database {
351 Some(db) => db.load()?,
352 None => AHashMap::new(),
353 };
354
355 log::info!(
356 "Cached {} general object(s) from database",
357 self.general.len()
358 );
359 Ok(())
360 }
361
362 pub async fn cache_all(&mut self) -> anyhow::Result<()> {
368 let cache_map = match &self.database {
369 Some(db) => db.load_all().await?,
370 None => CacheMap::default(),
371 };
372
373 self.currencies = cache_map.currencies;
374 self.instruments = cache_map.instruments;
375 self.synthetics = cache_map.synthetics;
376 self.accounts = cache_map
377 .accounts
378 .into_iter()
379 .map(|(id, account)| (id, SharedCell::new(account)))
380 .collect();
381 self.orders = cache_map
382 .orders
383 .into_iter()
384 .map(|(id, order)| (id, SharedCell::new(order)))
385 .collect();
386 self.positions = cache_map
387 .positions
388 .into_iter()
389 .map(|(id, position)| (id, SharedCell::new(position)))
390 .collect();
391
392 self.assign_position_ids_to_contingencies();
393 Ok(())
394 }
395
396 pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
402 self.currencies = match &mut self.database {
403 Some(db) => db.load_currencies().await?,
404 None => AHashMap::new(),
405 };
406
407 log::info!("Cached {} currencies from database", self.general.len());
408 Ok(())
409 }
410
411 pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
417 self.instruments = match &mut self.database {
418 Some(db) => db.load_instruments().await?,
419 None => AHashMap::new(),
420 };
421
422 log::info!("Cached {} instruments from database", self.general.len());
423 Ok(())
424 }
425
426 pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
432 self.synthetics = match &mut self.database {
433 Some(db) => db.load_synthetics().await?,
434 None => AHashMap::new(),
435 };
436
437 log::info!(
438 "Cached {} synthetic instruments from database",
439 self.general.len()
440 );
441 Ok(())
442 }
443
444 pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
450 self.accounts = match &mut self.database {
451 Some(db) => db
452 .load_accounts()
453 .await?
454 .into_iter()
455 .map(|(id, account)| (id, SharedCell::new(account)))
456 .collect(),
457 None => AHashMap::new(),
458 };
459
460 log::info!(
461 "Cached {} synthetic instruments from database",
462 self.general.len()
463 );
464 Ok(())
465 }
466
467 pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
473 self.orders = match &mut self.database {
474 Some(db) => db
475 .load_orders()
476 .await?
477 .into_iter()
478 .map(|(id, order)| (id, SharedCell::new(order)))
479 .collect(),
480 None => AHashMap::new(),
481 };
482
483 log::info!("Cached {} orders from database", self.general.len());
484
485 self.assign_position_ids_to_contingencies();
486 Ok(())
487 }
488
489 pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
495 self.positions = match &mut self.database {
496 Some(db) => db
497 .load_positions()
498 .await?
499 .into_iter()
500 .map(|(id, position)| (id, SharedCell::new(position)))
501 .collect(),
502 None => AHashMap::new(),
503 };
504
505 log::info!("Cached {} positions from database", self.general.len());
506 Ok(())
507 }
508
509 pub fn build_index(&mut self) {
511 log::debug!("Building index");
512
513 for account_id in self.accounts.keys() {
515 self.index
516 .venue_account
517 .insert(account_id.get_issuer(), *account_id);
518 }
519
520 for (client_order_id, order_cell) in &self.orders {
522 let order = order_cell.borrow();
523 let instrument_id = order.instrument_id();
524 let venue = instrument_id.venue;
525 let strategy_id = order.strategy_id();
526
527 self.index
529 .venue_orders
530 .entry(venue)
531 .or_default()
532 .insert(*client_order_id);
533
534 if let Some(venue_order_id) = order.venue_order_id() {
536 self.index
537 .venue_order_ids
538 .insert(venue_order_id, *client_order_id);
539 }
540
541 if let Some(position_id) = order.position_id() {
543 self.index
544 .order_position
545 .insert(*client_order_id, position_id);
546 }
547
548 self.index
550 .order_strategy
551 .insert(*client_order_id, order.strategy_id());
552
553 self.index
555 .instrument_orders
556 .entry(instrument_id)
557 .or_default()
558 .insert(*client_order_id);
559
560 self.index
562 .strategy_orders
563 .entry(strategy_id)
564 .or_default()
565 .insert(*client_order_id);
566
567 if let Some(account_id) = order.account_id() {
569 self.index
570 .account_orders
571 .entry(account_id)
572 .or_default()
573 .insert(*client_order_id);
574 }
575
576 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
578 self.index
579 .exec_algorithm_orders
580 .entry(exec_algorithm_id)
581 .or_default()
582 .insert(*client_order_id);
583 }
584
585 if let Some(exec_spawn_id) = order.exec_spawn_id() {
587 self.index
588 .exec_spawn_orders
589 .entry(exec_spawn_id)
590 .or_default()
591 .insert(*client_order_id);
592 }
593
594 self.index.orders.insert(*client_order_id);
596
597 if order.is_active_local() {
599 self.index.orders_active_local.insert(*client_order_id);
600 }
601
602 if order.is_open() {
604 self.index.orders_open.insert(*client_order_id);
605 }
606
607 if order.is_closed() {
609 self.index.orders_closed.insert(*client_order_id);
610 }
611
612 if let Some(emulation_trigger) = order.emulation_trigger()
614 && emulation_trigger != TriggerType::NoTrigger
615 && !order.is_closed()
616 {
617 self.index.orders_emulated.insert(*client_order_id);
618 }
619
620 if order.is_inflight() {
622 self.index.orders_inflight.insert(*client_order_id);
623 }
624
625 self.index.strategies.insert(strategy_id);
627
628 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
630 self.index.exec_algorithms.insert(exec_algorithm_id);
631 }
632 }
633
634 for (position_id, position_cell) in &self.positions {
636 let position = position_cell.borrow();
637 let instrument_id = position.instrument_id;
638 let venue = instrument_id.venue;
639 let strategy_id = position.strategy_id;
640
641 self.index
643 .venue_positions
644 .entry(venue)
645 .or_default()
646 .insert(*position_id);
647
648 self.index
650 .position_strategy
651 .insert(*position_id, position.strategy_id);
652
653 self.index
655 .position_orders
656 .entry(*position_id)
657 .or_default()
658 .extend(position.client_order_ids());
659
660 self.index
662 .instrument_positions
663 .entry(instrument_id)
664 .or_default()
665 .insert(*position_id);
666
667 self.index
669 .strategy_positions
670 .entry(strategy_id)
671 .or_default()
672 .insert(*position_id);
673
674 self.index
676 .account_positions
677 .entry(position.account_id)
678 .or_default()
679 .insert(*position_id);
680
681 self.index.positions.insert(*position_id);
683
684 if position.is_open() {
686 self.index.positions_open.insert(*position_id);
687 }
688
689 if position.is_closed() {
691 self.index.positions_closed.insert(*position_id);
692 }
693
694 self.index.strategies.insert(strategy_id);
696 }
697 }
698
699 #[must_use]
701 pub const fn has_backing(&self) -> bool {
702 self.database.is_some()
703 }
704
705 #[must_use]
707 pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
708 let Some(quote) = self.quote(&position.instrument_id) else {
709 log::warn!(
710 "Cannot calculate unrealized PnL for {}, no quotes for {}",
711 position.id,
712 position.instrument_id
713 );
714 return None;
715 };
716
717 let last = match position.side {
719 PositionSide::Flat | PositionSide::NoPositionSide => {
720 return Some(Money::new(0.0, position.settlement_currency));
721 }
722 PositionSide::Long => quote.bid_price,
723 PositionSide::Short => quote.ask_price,
724 };
725
726 Some(position.unrealized_pnl(last))
727 }
728
729 #[must_use]
738 pub fn check_integrity(&mut self) -> bool {
739 let mut error_count = 0;
740 let failure = "Integrity failure";
741
742 let timestamp_us = SystemTime::now()
744 .duration_since(UNIX_EPOCH)
745 .expect("Time went backwards")
746 .as_micros();
747
748 log::info!("Checking data integrity");
749
750 for account_id in self.accounts.keys() {
752 if !self
753 .index
754 .venue_account
755 .contains_key(&account_id.get_issuer())
756 {
757 log::error!(
758 "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
759 );
760 error_count += 1;
761 }
762 }
763
764 for (client_order_id, order_cell) in &self.orders {
765 let order = order_cell.borrow();
766
767 if !self.index.order_strategy.contains_key(client_order_id) {
768 log::error!(
769 "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
770 );
771 error_count += 1;
772 }
773
774 if !self.index.orders.contains(client_order_id) {
775 log::error!(
776 "{failure} in orders: {client_order_id} not found in `self.index.orders`",
777 );
778 error_count += 1;
779 }
780
781 if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
782 log::error!(
783 "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
784 );
785 error_count += 1;
786 }
787
788 if order.is_active_local() && !self.index.orders_active_local.contains(client_order_id)
789 {
790 log::error!(
791 "{failure} in orders: {client_order_id} not found in `self.index.orders_active_local`",
792 );
793 error_count += 1;
794 }
795
796 if order.is_open() && !self.index.orders_open.contains(client_order_id) {
797 log::error!(
798 "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
799 );
800 error_count += 1;
801 }
802
803 if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
804 log::error!(
805 "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
806 );
807 error_count += 1;
808 }
809
810 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
811 if !self
812 .index
813 .exec_algorithm_orders
814 .contains_key(&exec_algorithm_id)
815 {
816 log::error!(
817 "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
818 );
819 error_count += 1;
820 }
821
822 if order.exec_spawn_id().is_none()
823 && !self.index.exec_spawn_orders.contains_key(client_order_id)
824 {
825 log::error!(
826 "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
827 );
828 error_count += 1;
829 }
830 }
831 }
832
833 for (position_id, position_cell) in &self.positions {
834 let position = position_cell.borrow();
835
836 if !self.index.position_strategy.contains_key(position_id) {
837 log::error!(
838 "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
839 );
840 error_count += 1;
841 }
842
843 if !self.index.position_orders.contains_key(position_id) {
844 log::error!(
845 "{failure} in positions: {position_id} not found in `self.index.position_orders`",
846 );
847 error_count += 1;
848 }
849
850 if !self.index.positions.contains(position_id) {
851 log::error!(
852 "{failure} in positions: {position_id} not found in `self.index.positions`",
853 );
854 error_count += 1;
855 }
856
857 if position.is_open() && !self.index.positions_open.contains(position_id) {
858 log::error!(
859 "{failure} in positions: {position_id} not found in `self.index.positions_open`",
860 );
861 error_count += 1;
862 }
863
864 if position.is_closed() && !self.index.positions_closed.contains(position_id) {
865 log::error!(
866 "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
867 );
868 error_count += 1;
869 }
870 }
871
872 for account_id in self.index.venue_account.values() {
874 if !self.accounts.contains_key(account_id) {
875 log::error!(
876 "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
877 );
878 error_count += 1;
879 }
880 }
881
882 for client_order_id in self.index.venue_order_ids.values() {
883 if !self.orders.contains_key(client_order_id) {
884 log::error!(
885 "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
886 );
887 error_count += 1;
888 }
889 }
890
891 for client_order_id in self.index.client_order_ids.keys() {
892 if !self.orders.contains_key(client_order_id) {
893 log::error!(
894 "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
895 );
896 error_count += 1;
897 }
898 }
899
900 for client_order_id in self.index.order_position.keys() {
901 if !self.orders.contains_key(client_order_id) {
902 log::error!(
903 "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
904 );
905 error_count += 1;
906 }
907 }
908
909 for client_order_id in self.index.order_strategy.keys() {
911 if !self.orders.contains_key(client_order_id) {
912 log::error!(
913 "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
914 );
915 error_count += 1;
916 }
917 }
918
919 for position_id in self.index.position_strategy.keys() {
920 if !self.positions.contains_key(position_id) {
921 log::error!(
922 "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
923 );
924 error_count += 1;
925 }
926 }
927
928 for position_id in self.index.position_orders.keys() {
929 if !self.positions.contains_key(position_id) {
930 log::error!(
931 "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
932 );
933 error_count += 1;
934 }
935 }
936
937 for (instrument_id, client_order_ids) in &self.index.instrument_orders {
938 for client_order_id in client_order_ids {
939 if !self.orders.contains_key(client_order_id) {
940 log::error!(
941 "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
942 );
943 error_count += 1;
944 }
945 }
946 }
947
948 for instrument_id in self.index.instrument_positions.keys() {
949 if !self.index.instrument_orders.contains_key(instrument_id) {
950 log::error!(
951 "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
952 );
953 error_count += 1;
954 }
955 }
956
957 for client_order_ids in self.index.strategy_orders.values() {
958 for client_order_id in client_order_ids {
959 if !self.orders.contains_key(client_order_id) {
960 log::error!(
961 "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
962 );
963 error_count += 1;
964 }
965 }
966 }
967
968 for position_ids in self.index.strategy_positions.values() {
969 for position_id in position_ids {
970 if !self.positions.contains_key(position_id) {
971 log::error!(
972 "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
973 );
974 error_count += 1;
975 }
976 }
977 }
978
979 for client_order_id in &self.index.orders {
980 if !self.orders.contains_key(client_order_id) {
981 log::error!(
982 "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
983 );
984 error_count += 1;
985 }
986 }
987
988 for client_order_id in &self.index.orders_emulated {
989 if !self.orders.contains_key(client_order_id) {
990 log::error!(
991 "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
992 );
993 error_count += 1;
994 }
995 }
996
997 for client_order_id in &self.index.orders_active_local {
998 if !self.orders.contains_key(client_order_id) {
999 log::error!(
1000 "{failure} in `index.orders_active_local`: {client_order_id} not found in `self.orders`",
1001 );
1002 error_count += 1;
1003 }
1004 }
1005
1006 for client_order_id in &self.index.orders_inflight {
1007 if !self.orders.contains_key(client_order_id) {
1008 log::error!(
1009 "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
1010 );
1011 error_count += 1;
1012 }
1013 }
1014
1015 for client_order_id in &self.index.orders_open {
1016 if !self.orders.contains_key(client_order_id) {
1017 log::error!(
1018 "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
1019 );
1020 error_count += 1;
1021 }
1022 }
1023
1024 for client_order_id in &self.index.orders_closed {
1025 if !self.orders.contains_key(client_order_id) {
1026 log::error!(
1027 "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
1028 );
1029 error_count += 1;
1030 }
1031 }
1032
1033 for position_id in &self.index.positions {
1034 if !self.positions.contains_key(position_id) {
1035 log::error!(
1036 "{failure} in `index.positions`: {position_id} not found in `self.positions`",
1037 );
1038 error_count += 1;
1039 }
1040 }
1041
1042 for position_id in &self.index.positions_open {
1043 if !self.positions.contains_key(position_id) {
1044 log::error!(
1045 "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
1046 );
1047 error_count += 1;
1048 }
1049 }
1050
1051 for position_id in &self.index.positions_closed {
1052 if !self.positions.contains_key(position_id) {
1053 log::error!(
1054 "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
1055 );
1056 error_count += 1;
1057 }
1058 }
1059
1060 for strategy_id in &self.index.strategies {
1061 if !self.index.strategy_orders.contains_key(strategy_id) {
1062 log::error!(
1063 "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
1064 );
1065 error_count += 1;
1066 }
1067 }
1068
1069 for exec_algorithm_id in &self.index.exec_algorithms {
1070 if !self
1071 .index
1072 .exec_algorithm_orders
1073 .contains_key(exec_algorithm_id)
1074 {
1075 log::error!(
1076 "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
1077 );
1078 error_count += 1;
1079 }
1080 }
1081
1082 let total_us = SystemTime::now()
1083 .duration_since(UNIX_EPOCH)
1084 .expect("Time went backwards")
1085 .as_micros()
1086 - timestamp_us;
1087
1088 if error_count == 0 {
1089 log::info!("Integrity check passed in {total_us}μs");
1090 true
1091 } else {
1092 log::error!(
1093 "Integrity check failed with {error_count} error{} in {total_us}μs",
1094 if error_count == 1 { "" } else { "s" },
1095 );
1096 false
1097 }
1098 }
1099
1100 #[must_use]
1104 pub fn check_residuals(&self) -> bool {
1105 log::debug!("Checking residuals");
1106
1107 let mut residuals = false;
1108
1109 for order in self.orders_open(None, None, None, None, None) {
1111 residuals = true;
1112 log::warn!("Residual {order}");
1113 }
1114
1115 for position in self.positions_open(None, None, None, None, None) {
1117 residuals = true;
1118 log::warn!("Residual {position}");
1119 }
1120
1121 residuals
1122 }
1123
1124 pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
1130 log::debug!(
1131 "Purging closed orders{}",
1132 if buffer_secs > 0 {
1133 format!(" with buffer_secs={buffer_secs}")
1134 } else {
1135 String::new()
1136 }
1137 );
1138
1139 let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
1140
1141 let mut affected_order_list_ids: AHashSet<OrderListId> = AHashSet::new();
1142
1143 'outer: for client_order_id in self.index.orders_closed.clone() {
1144 let purge_target = self.orders.get(&client_order_id).and_then(|order_cell| {
1145 let order = order_cell.borrow();
1146 if order.is_closed()
1147 && let Some(ts_closed) = order.ts_closed()
1148 && ts_closed + buffer_ns <= ts_now
1149 {
1150 let linked = order.linked_order_ids().map(<[_]>::to_vec);
1151 let order_list_id = order.order_list_id();
1152 Some((linked, order_list_id))
1153 } else {
1154 None
1155 }
1156 });
1157
1158 let Some((linked, order_list_id)) = purge_target else {
1159 continue;
1160 };
1161
1162 if let Some(linked_order_ids) = linked {
1164 for linked_order_id in &linked_order_ids {
1165 if let Some(linked_order_cell) = self.orders.get(linked_order_id)
1166 && linked_order_cell.borrow().is_open()
1167 {
1168 continue 'outer;
1170 }
1171 }
1172 }
1173
1174 if let Some(order_list_id) = order_list_id {
1175 affected_order_list_ids.insert(order_list_id);
1176 }
1177
1178 self.purge_order(client_order_id);
1179 }
1180
1181 for order_list_id in affected_order_list_ids {
1182 if let Some(order_list) = self.order_lists.get(&order_list_id) {
1183 let all_purged = order_list
1184 .client_order_ids
1185 .iter()
1186 .all(|id| !self.orders.contains_key(id));
1187
1188 if all_purged {
1189 self.order_lists.remove(&order_list_id);
1190 log::info!("Purged {order_list_id}");
1191 }
1192 }
1193 }
1194 }
1195
1196 pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
1198 log::debug!(
1199 "Purging closed positions{}",
1200 if buffer_secs > 0 {
1201 format!(" with buffer_secs={buffer_secs}")
1202 } else {
1203 String::new()
1204 }
1205 );
1206
1207 let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
1208
1209 for position_id in self.index.positions_closed.clone() {
1210 let should_purge = self.positions.get(&position_id).is_some_and(|cell| {
1211 let position = cell.borrow();
1212 position.is_closed()
1213 && position
1214 .ts_closed
1215 .is_some_and(|ts_closed| ts_closed + buffer_ns <= ts_now)
1216 });
1217
1218 if should_purge {
1219 self.purge_position(position_id);
1220 }
1221 }
1222 }
1223
1224 pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
1228 let order_cell = self.orders.get(&client_order_id).cloned();
1230
1231 if let Some(ref order_cell) = order_cell
1233 && order_cell.borrow().is_open()
1234 {
1235 log::warn!("Order {client_order_id} found open when purging, skipping purge");
1236 return;
1237 }
1238
1239 if let Some(ref order_cell) = order_cell {
1241 let order = order_cell.borrow();
1242 self.orders.remove(&client_order_id);
1244
1245 if let Some(venue_orders) = self
1247 .index
1248 .venue_orders
1249 .get_mut(&order.instrument_id().venue)
1250 {
1251 venue_orders.remove(&client_order_id);
1252 if venue_orders.is_empty() {
1253 self.index.venue_orders.remove(&order.instrument_id().venue);
1254 }
1255 }
1256
1257 if let Some(venue_order_id) = order.venue_order_id() {
1259 self.index.venue_order_ids.remove(&venue_order_id);
1260 }
1261
1262 if let Some(instrument_orders) =
1264 self.index.instrument_orders.get_mut(&order.instrument_id())
1265 {
1266 instrument_orders.remove(&client_order_id);
1267 if instrument_orders.is_empty() {
1268 self.index.instrument_orders.remove(&order.instrument_id());
1269 }
1270 }
1271
1272 if let Some(position_id) = order.position_id()
1274 && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
1275 {
1276 position_orders.remove(&client_order_id);
1277 if position_orders.is_empty() {
1278 self.index.position_orders.remove(&position_id);
1279 }
1280 }
1281
1282 if let Some(exec_algorithm_id) = order.exec_algorithm_id()
1284 && let Some(exec_algorithm_orders) =
1285 self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
1286 {
1287 exec_algorithm_orders.remove(&client_order_id);
1288 if exec_algorithm_orders.is_empty() {
1289 self.index.exec_algorithm_orders.remove(&exec_algorithm_id);
1290 }
1291 }
1292
1293 if let Some(strategy_orders) = self.index.strategy_orders.get_mut(&order.strategy_id())
1295 {
1296 strategy_orders.remove(&client_order_id);
1297 if strategy_orders.is_empty() {
1298 self.index.strategy_orders.remove(&order.strategy_id());
1299 }
1300 }
1301
1302 if let Some(account_id) = order.account_id()
1304 && let Some(account_orders) = self.index.account_orders.get_mut(&account_id)
1305 {
1306 account_orders.remove(&client_order_id);
1307 if account_orders.is_empty() {
1308 self.index.account_orders.remove(&account_id);
1309 }
1310 }
1311
1312 if let Some(exec_spawn_id) = order.exec_spawn_id()
1314 && let Some(spawn_orders) = self.index.exec_spawn_orders.get_mut(&exec_spawn_id)
1315 {
1316 spawn_orders.remove(&client_order_id);
1317 if spawn_orders.is_empty() {
1318 self.index.exec_spawn_orders.remove(&exec_spawn_id);
1319 }
1320 }
1321
1322 log::info!("Purged order {client_order_id}");
1323 } else {
1324 log::warn!("Order {client_order_id} not found when purging");
1325 }
1326
1327 self.index.order_position.remove(&client_order_id);
1329 let strategy_id = self.index.order_strategy.remove(&client_order_id);
1330 self.index.order_client.remove(&client_order_id);
1331 self.index.client_order_ids.remove(&client_order_id);
1332
1333 if let Some(strategy_id) = strategy_id
1335 && let Some(strategy_orders) = self.index.strategy_orders.get_mut(&strategy_id)
1336 {
1337 strategy_orders.remove(&client_order_id);
1338 if strategy_orders.is_empty() {
1339 self.index.strategy_orders.remove(&strategy_id);
1340 }
1341 }
1342
1343 self.index.exec_spawn_orders.remove(&client_order_id);
1345
1346 self.index.orders.remove(&client_order_id);
1347 self.index.orders_active_local.remove(&client_order_id);
1348 self.index.orders_open.remove(&client_order_id);
1349 self.index.orders_closed.remove(&client_order_id);
1350 self.index.orders_emulated.remove(&client_order_id);
1351 self.index.orders_inflight.remove(&client_order_id);
1352 self.index.orders_pending_cancel.remove(&client_order_id);
1353 }
1354
1355 pub fn purge_position(&mut self, position_id: PositionId) {
1359 let position = self
1361 .positions
1362 .get(&position_id)
1363 .map(|cell| cell.borrow().clone());
1364
1365 if let Some(ref pos) = position
1367 && pos.is_open()
1368 {
1369 log::warn!("Position {position_id} found open when purging, skipping purge");
1370 return;
1371 }
1372
1373 if let Some(ref pos) = position {
1375 self.positions.remove(&position_id);
1376
1377 if let Some(venue_positions) =
1379 self.index.venue_positions.get_mut(&pos.instrument_id.venue)
1380 {
1381 venue_positions.remove(&position_id);
1382 if venue_positions.is_empty() {
1383 self.index.venue_positions.remove(&pos.instrument_id.venue);
1384 }
1385 }
1386
1387 if let Some(instrument_positions) =
1389 self.index.instrument_positions.get_mut(&pos.instrument_id)
1390 {
1391 instrument_positions.remove(&position_id);
1392 if instrument_positions.is_empty() {
1393 self.index.instrument_positions.remove(&pos.instrument_id);
1394 }
1395 }
1396
1397 if let Some(strategy_positions) =
1399 self.index.strategy_positions.get_mut(&pos.strategy_id)
1400 {
1401 strategy_positions.remove(&position_id);
1402 if strategy_positions.is_empty() {
1403 self.index.strategy_positions.remove(&pos.strategy_id);
1404 }
1405 }
1406
1407 if let Some(account_positions) = self.index.account_positions.get_mut(&pos.account_id) {
1409 account_positions.remove(&position_id);
1410 if account_positions.is_empty() {
1411 self.index.account_positions.remove(&pos.account_id);
1412 }
1413 }
1414
1415 for client_order_id in pos.client_order_ids() {
1417 self.index.order_position.remove(&client_order_id);
1418 }
1419
1420 log::info!("Purged position {position_id}");
1421 } else {
1422 log::warn!("Position {position_id} not found when purging");
1423 }
1424
1425 self.index.position_strategy.remove(&position_id);
1427 self.index.position_orders.remove(&position_id);
1428 self.index.positions.remove(&position_id);
1429 self.index.positions_open.remove(&position_id);
1430 self.index.positions_closed.remove(&position_id);
1431
1432 self.position_snapshots.remove(&position_id);
1434 }
1435
1436 pub fn purge_instrument(&mut self, instrument_id: InstrumentId) {
1460 #[cfg(feature = "defi")]
1461 let defi_found = self.defi.pools.contains_key(&instrument_id)
1462 || self.defi.pool_profilers.contains_key(&instrument_id);
1463 #[cfg(not(feature = "defi"))]
1464 let defi_found = false;
1465
1466 let found = self.instruments.contains_key(&instrument_id)
1467 || self.synthetics.contains_key(&instrument_id)
1468 || defi_found;
1469
1470 if !found {
1471 log::warn!("Instrument {instrument_id} not found when purging");
1472 return;
1473 }
1474
1475 if let Some(orders) = self.index.instrument_orders.get(&instrument_id) {
1476 let has_non_terminal = orders
1477 .iter()
1478 .any(|client_order_id| !self.index.orders_closed.contains(client_order_id));
1479
1480 if has_non_terminal {
1481 log::warn!(
1482 "Instrument {instrument_id} has non-terminal orders when purging, skipping purge"
1483 );
1484 return;
1485 }
1486 }
1487
1488 if let Some(positions) = self.index.instrument_positions.get(&instrument_id) {
1489 let has_non_closed = positions
1490 .iter()
1491 .any(|position_id| !self.index.positions_closed.contains(position_id));
1492
1493 if has_non_closed {
1494 log::warn!(
1495 "Instrument {instrument_id} has non-closed positions when purging, skipping purge"
1496 );
1497 return;
1498 }
1499 }
1500
1501 self.instruments.remove(&instrument_id);
1502 self.synthetics.remove(&instrument_id);
1503 self.books.remove(&instrument_id);
1504 self.own_books.remove(&instrument_id);
1505 self.quotes.remove(&instrument_id);
1506 self.trades.remove(&instrument_id);
1507 self.mark_prices.remove(&instrument_id);
1508 self.index_prices.remove(&instrument_id);
1509 self.funding_rates.remove(&instrument_id);
1510 self.instrument_statuses.remove(&instrument_id);
1511 self.greeks.remove(&instrument_id);
1512 self.option_greeks.remove(&instrument_id);
1513
1514 self.bars
1515 .retain(|bar_type, _| bar_type.instrument_id() != instrument_id);
1516
1517 #[cfg(feature = "defi")]
1518 {
1519 self.defi.pools.remove(&instrument_id);
1520 self.defi.pool_profilers.remove(&instrument_id);
1521 }
1522
1523 self.index.instrument_orders.remove(&instrument_id);
1524 self.index.instrument_positions.remove(&instrument_id);
1525
1526 log::info!("Purged instrument {instrument_id}");
1527 }
1528
1529 pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1534 log::debug!(
1535 "Purging account events{}",
1536 if lookback_secs > 0 {
1537 format!(" with lookback_secs={lookback_secs}")
1538 } else {
1539 String::new()
1540 }
1541 );
1542
1543 for account_cell in self.accounts.values() {
1544 let mut account = account_cell.borrow_mut();
1545 let event_count = account.event_count();
1546 account.purge_account_events(ts_now, lookback_secs);
1547 let count_diff = event_count - account.event_count();
1548 if count_diff > 0 {
1549 log::info!(
1550 "Purged {} event(s) from account {}",
1551 count_diff,
1552 account.id()
1553 );
1554 }
1555 }
1556 }
1557
1558 pub fn clear_index(&mut self) {
1560 self.index.clear();
1561 log::debug!("Cleared index");
1562 }
1563
1564 pub fn reset(&mut self) {
1570 log::debug!("Resetting cache");
1571
1572 self.general.clear();
1573 self.books.clear();
1574 self.own_books.clear();
1575 self.quotes.clear();
1576 self.trades.clear();
1577 self.mark_xrates.clear();
1578 self.mark_prices.clear();
1579 self.index_prices.clear();
1580 self.funding_rates.clear();
1581 self.instrument_statuses.clear();
1582 self.bars.clear();
1583 self.accounts.clear();
1584 self.orders.clear();
1585 self.order_lists.clear();
1586 self.positions.clear();
1587 self.position_snapshots.clear();
1588 self.greeks.clear();
1589 self.yield_curves.clear();
1590
1591 if self.config.drop_instruments_on_reset {
1592 self.currencies.clear();
1593 self.instruments.clear();
1594 self.synthetics.clear();
1595 }
1596
1597 #[cfg(feature = "defi")]
1598 {
1599 self.defi.pools.clear();
1600 self.defi.pool_profilers.clear();
1601 }
1602
1603 self.clear_index();
1604
1605 log::info!("Reset cache");
1606 }
1607
1608 pub fn dispose(&mut self) {
1612 self.reset();
1613
1614 if let Some(database) = &mut self.database
1615 && let Err(e) = database.close()
1616 {
1617 log::error!("Failed to close database during dispose: {e}");
1618 }
1619 }
1620
1621 pub fn flush_db(&mut self) {
1625 if let Some(database) = &mut self.database
1626 && let Err(e) = database.flush()
1627 {
1628 log::error!("Failed to flush database: {e}");
1629 }
1630 }
1631
1632 pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1640 check_valid_string_ascii(key, stringify!(key))?;
1641 check_predicate_false(value.is_empty(), stringify!(value))?;
1642
1643 log::debug!("Adding general {key}");
1644 self.general.insert(key.to_string(), value.clone());
1645
1646 if let Some(database) = &mut self.database {
1647 database.add(key.to_string(), value)?;
1648 }
1649 Ok(())
1650 }
1651
1652 pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1658 log::debug!("Adding `OrderBook` {}", book.instrument_id);
1659
1660 if self.config.save_market_data
1661 && let Some(database) = &mut self.database
1662 {
1663 database.add_order_book(&book)?;
1664 }
1665
1666 self.books.insert(book.instrument_id, book);
1667 Ok(())
1668 }
1669
1670 pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1676 log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1677
1678 self.own_books.insert(own_book.instrument_id, own_book);
1679 Ok(())
1680 }
1681
1682 pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1688 log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1689
1690 if self.config.save_market_data {
1691 }
1693
1694 let mark_prices_deque = self
1695 .mark_prices
1696 .entry(mark_price.instrument_id)
1697 .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
1698 mark_prices_deque.push_front(mark_price);
1699 Ok(())
1700 }
1701
1702 pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1708 log::debug!(
1709 "Adding `IndexPriceUpdate` for {}",
1710 index_price.instrument_id
1711 );
1712
1713 if self.config.save_market_data {
1714 }
1716
1717 let index_prices_deque = self
1718 .index_prices
1719 .entry(index_price.instrument_id)
1720 .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
1721 index_prices_deque.push_front(index_price);
1722 Ok(())
1723 }
1724
1725 pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
1731 log::debug!(
1732 "Adding `FundingRateUpdate` for {}",
1733 funding_rate.instrument_id
1734 );
1735
1736 if self.config.save_market_data {
1737 }
1739
1740 let funding_rates_deque = self
1741 .funding_rates
1742 .entry(funding_rate.instrument_id)
1743 .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
1744 funding_rates_deque.push_front(funding_rate);
1745 Ok(())
1746 }
1747
1748 pub fn add_funding_rates(&mut self, funding_rates: &[FundingRateUpdate]) -> anyhow::Result<()> {
1754 check_slice_not_empty(funding_rates, stringify!(funding_rates))?;
1755
1756 let instrument_id = funding_rates[0].instrument_id;
1757 log::debug!(
1758 "Adding `FundingRateUpdate`[{}] {instrument_id}",
1759 funding_rates.len()
1760 );
1761
1762 if self.config.save_market_data
1763 && let Some(database) = &mut self.database
1764 {
1765 for funding_rate in funding_rates {
1766 database.add_funding_rate(funding_rate)?;
1767 }
1768 }
1769
1770 let funding_rate_deque = self
1771 .funding_rates
1772 .entry(instrument_id)
1773 .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
1774
1775 for funding_rate in funding_rates {
1776 funding_rate_deque.push_front(*funding_rate);
1777 }
1778 Ok(())
1779 }
1780
1781 pub fn add_instrument_status(&mut self, status: InstrumentStatus) -> anyhow::Result<()> {
1787 log::debug!("Adding `InstrumentStatus` for {}", status.instrument_id);
1788
1789 if self.config.save_market_data {
1790 }
1792
1793 let statuses_deque = self
1794 .instrument_statuses
1795 .entry(status.instrument_id)
1796 .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
1797 statuses_deque.push_front(status);
1798 Ok(())
1799 }
1800
1801 pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1807 log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1808
1809 if self.config.save_market_data
1810 && let Some(database) = &mut self.database
1811 {
1812 database.add_quote("e)?;
1813 }
1814
1815 let quotes_deque = self
1816 .quotes
1817 .entry(quote.instrument_id)
1818 .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
1819 quotes_deque.push_front(quote);
1820 Ok(())
1821 }
1822
1823 pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1829 check_slice_not_empty(quotes, stringify!(quotes))?;
1830
1831 let instrument_id = quotes[0].instrument_id;
1832 log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1833
1834 if self.config.save_market_data
1835 && let Some(database) = &mut self.database
1836 {
1837 for quote in quotes {
1838 database.add_quote(quote)?;
1839 }
1840 }
1841
1842 let quotes_deque = self
1843 .quotes
1844 .entry(instrument_id)
1845 .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
1846
1847 for quote in quotes {
1848 quotes_deque.push_front(*quote);
1849 }
1850 Ok(())
1851 }
1852
1853 pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1859 log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1860
1861 if self.config.save_market_data
1862 && let Some(database) = &mut self.database
1863 {
1864 database.add_trade(&trade)?;
1865 }
1866
1867 let trades_deque = self
1868 .trades
1869 .entry(trade.instrument_id)
1870 .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
1871 trades_deque.push_front(trade);
1872 Ok(())
1873 }
1874
1875 pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1881 check_slice_not_empty(trades, stringify!(trades))?;
1882
1883 let instrument_id = trades[0].instrument_id;
1884 log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1885
1886 if self.config.save_market_data
1887 && let Some(database) = &mut self.database
1888 {
1889 for trade in trades {
1890 database.add_trade(trade)?;
1891 }
1892 }
1893
1894 let trades_deque = self
1895 .trades
1896 .entry(instrument_id)
1897 .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
1898
1899 for trade in trades {
1900 trades_deque.push_front(*trade);
1901 }
1902 Ok(())
1903 }
1904
1905 pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1911 log::debug!("Adding `Bar` {}", bar.bar_type);
1912
1913 if self.config.save_market_data
1914 && let Some(database) = &mut self.database
1915 {
1916 database.add_bar(&bar)?;
1917 }
1918
1919 let bars = self
1920 .bars
1921 .entry(bar.bar_type)
1922 .or_insert_with(|| BoundedVecDeque::new(self.config.bar_capacity));
1923 bars.push_front(bar);
1924 Ok(())
1925 }
1926
1927 pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1933 check_slice_not_empty(bars, stringify!(bars))?;
1934
1935 let bar_type = bars[0].bar_type;
1936 log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1937
1938 if self.config.save_market_data
1939 && let Some(database) = &mut self.database
1940 {
1941 for bar in bars {
1942 database.add_bar(bar)?;
1943 }
1944 }
1945
1946 let bars_deque = self
1947 .bars
1948 .entry(bar_type)
1949 .or_insert_with(|| BoundedVecDeque::new(self.config.bar_capacity));
1950
1951 for bar in bars {
1952 bars_deque.push_front(*bar);
1953 }
1954 Ok(())
1955 }
1956
1957 pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1963 log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1964
1965 if self.config.save_market_data
1966 && let Some(_database) = &mut self.database
1967 {
1968 }
1970
1971 self.greeks.insert(greeks.instrument_id, greeks);
1972 Ok(())
1973 }
1974
1975 pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1977 self.greeks.get(instrument_id).cloned()
1978 }
1979
1980 pub fn add_option_greeks(&mut self, greeks: OptionGreeks) {
1982 log::debug!("Adding `OptionGreeks` {}", greeks.instrument_id);
1983 self.option_greeks.insert(greeks.instrument_id, greeks);
1984 }
1985
1986 #[must_use]
1988 pub fn option_greeks(&self, instrument_id: &InstrumentId) -> Option<&OptionGreeks> {
1989 self.option_greeks.get(instrument_id)
1990 }
1991
1992 pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1998 log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1999
2000 if self.config.save_market_data
2001 && let Some(_database) = &mut self.database
2002 {
2003 }
2005
2006 self.yield_curves
2007 .insert(yield_curve.curve_name.clone(), yield_curve);
2008 Ok(())
2009 }
2010
2011 pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
2013 self.yield_curves.get(key).map(|curve| {
2014 let curve_clone = curve.clone();
2015 Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
2016 as Box<dyn Fn(f64) -> f64>
2017 })
2018 }
2019
2020 pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
2026 if self.currencies.contains_key(¤cy.code) {
2027 return Ok(());
2028 }
2029 log::debug!("Adding `Currency` {}", currency.code);
2030
2031 if let Some(database) = &mut self.database {
2032 database.add_currency(¤cy)?;
2033 }
2034
2035 self.currencies.insert(currency.code, currency);
2036 Ok(())
2037 }
2038
2039 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
2045 log::debug!("Adding `Instrument` {}", instrument.id());
2046
2047 if let Some(base_currency) = instrument.base_currency() {
2049 self.add_currency(base_currency)?;
2050 }
2051 self.add_currency(instrument.quote_currency())?;
2052 self.add_currency(instrument.settlement_currency())?;
2053
2054 if let Some(database) = &mut self.database {
2055 database.add_instrument(&instrument)?;
2056 }
2057
2058 self.instruments.insert(instrument.id(), instrument);
2059 Ok(())
2060 }
2061
2062 pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
2068 log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
2069
2070 if let Some(database) = &mut self.database {
2071 database.add_synthetic(&synthetic)?;
2072 }
2073
2074 self.synthetics.insert(synthetic.id, synthetic);
2075 Ok(())
2076 }
2077
2078 pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
2084 log::debug!("Adding `Account` {}", account.id());
2085
2086 if let Some(database) = &mut self.database {
2087 database.add_account(&account)?;
2088 }
2089
2090 let account_id = account.id();
2091 self.accounts.insert(account_id, SharedCell::new(account));
2092 self.index
2093 .venue_account
2094 .insert(account_id.get_issuer(), account_id);
2095 Ok(())
2096 }
2097
2098 pub fn add_venue_order_id(
2106 &mut self,
2107 client_order_id: &ClientOrderId,
2108 venue_order_id: &VenueOrderId,
2109 overwrite: bool,
2110 ) -> anyhow::Result<()> {
2111 if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
2112 && !overwrite
2113 && existing_venue_order_id != venue_order_id
2114 {
2115 anyhow::bail!(
2116 "Existing {existing_venue_order_id} for {client_order_id}
2117 did not match the given {venue_order_id}.
2118 If you are writing a test then try a different `venue_order_id`,
2119 otherwise this is probably a bug."
2120 );
2121 }
2122
2123 self.index
2124 .client_order_ids
2125 .insert(*client_order_id, *venue_order_id);
2126 self.index
2127 .venue_order_ids
2128 .insert(*venue_order_id, *client_order_id);
2129
2130 Ok(())
2131 }
2132
2133 pub fn add_order(
2145 &mut self,
2146 order: OrderAny,
2147 position_id: Option<PositionId>,
2148 client_id: Option<ClientId>,
2149 replace_existing: bool,
2150 ) -> anyhow::Result<()> {
2151 let instrument_id = order.instrument_id();
2152 let venue = instrument_id.venue;
2153 let client_order_id = order.client_order_id();
2154 let strategy_id = order.strategy_id();
2155 let exec_algorithm_id = order.exec_algorithm_id();
2156 let exec_spawn_id = order.exec_spawn_id();
2157
2158 if !replace_existing {
2159 check_key_not_in_map(
2160 &client_order_id,
2161 &self.orders,
2162 stringify!(client_order_id),
2163 stringify!(orders),
2164 )?;
2165 }
2166
2167 log::debug!("Adding {order:?}");
2168
2169 self.index.orders.insert(client_order_id);
2170
2171 if order.is_active_local() {
2172 self.index.orders_active_local.insert(client_order_id);
2173 }
2174 self.index
2175 .order_strategy
2176 .insert(client_order_id, strategy_id);
2177 self.index.strategies.insert(strategy_id);
2178
2179 self.index
2181 .venue_orders
2182 .entry(venue)
2183 .or_default()
2184 .insert(client_order_id);
2185
2186 self.index
2188 .instrument_orders
2189 .entry(instrument_id)
2190 .or_default()
2191 .insert(client_order_id);
2192
2193 self.index
2195 .strategy_orders
2196 .entry(strategy_id)
2197 .or_default()
2198 .insert(client_order_id);
2199
2200 if let Some(account_id) = order.account_id() {
2202 self.index
2203 .account_orders
2204 .entry(account_id)
2205 .or_default()
2206 .insert(client_order_id);
2207 }
2208
2209 if let Some(exec_algorithm_id) = exec_algorithm_id {
2211 self.index.exec_algorithms.insert(exec_algorithm_id);
2212
2213 self.index
2214 .exec_algorithm_orders
2215 .entry(exec_algorithm_id)
2216 .or_default()
2217 .insert(client_order_id);
2218 }
2219
2220 if let Some(exec_spawn_id) = exec_spawn_id {
2222 self.index
2223 .exec_spawn_orders
2224 .entry(exec_spawn_id)
2225 .or_default()
2226 .insert(client_order_id);
2227 }
2228
2229 if let Some(emulation_trigger) = order.emulation_trigger()
2231 && emulation_trigger != TriggerType::NoTrigger
2232 {
2233 self.index.orders_emulated.insert(client_order_id);
2234 }
2235
2236 if let Some(position_id) = position_id {
2238 self.add_position_id(
2239 &position_id,
2240 &order.instrument_id().venue,
2241 &client_order_id,
2242 &strategy_id,
2243 )?;
2244 }
2245
2246 if let Some(client_id) = client_id {
2248 self.index.order_client.insert(client_order_id, client_id);
2249 log::debug!("Indexed {client_id:?}");
2250 }
2251
2252 if let Some(database) = &mut self.database {
2253 database.add_order(&order, client_id)?;
2254 }
2259
2260 match self.orders.get(&client_order_id) {
2261 Some(order_cell) => *order_cell.borrow_mut() = order,
2264 None => {
2265 self.orders.insert(client_order_id, SharedCell::new(order));
2266 }
2267 }
2268
2269 Ok(())
2270 }
2271
2272 pub fn add_order_list(&mut self, order_list: OrderList) -> anyhow::Result<()> {
2278 let order_list_id = order_list.id;
2279 check_key_not_in_map(
2280 &order_list_id,
2281 &self.order_lists,
2282 stringify!(order_list_id),
2283 stringify!(order_lists),
2284 )?;
2285
2286 log::debug!("Adding {order_list:?}");
2287 self.order_lists.insert(order_list_id, order_list);
2288 Ok(())
2289 }
2290
2291 pub fn add_position_id(
2297 &mut self,
2298 position_id: &PositionId,
2299 venue: &Venue,
2300 client_order_id: &ClientOrderId,
2301 strategy_id: &StrategyId,
2302 ) -> anyhow::Result<()> {
2303 self.index
2304 .order_position
2305 .insert(*client_order_id, *position_id);
2306
2307 if let Some(database) = &mut self.database {
2309 database.index_order_position(*client_order_id, *position_id)?;
2310 }
2311
2312 self.index
2314 .position_strategy
2315 .insert(*position_id, *strategy_id);
2316
2317 self.index
2319 .position_orders
2320 .entry(*position_id)
2321 .or_default()
2322 .insert(*client_order_id);
2323
2324 self.index
2326 .strategy_positions
2327 .entry(*strategy_id)
2328 .or_default()
2329 .insert(*position_id);
2330
2331 self.index
2333 .venue_positions
2334 .entry(*venue)
2335 .or_default()
2336 .insert(*position_id);
2337
2338 Ok(())
2339 }
2340
2341 fn assign_position_ids_to_contingencies(&mut self) {
2350 let mut assignments: Vec<(PositionId, ClientOrderId)> = Vec::new();
2351
2352 for parent_order_cell in self.orders.values() {
2353 let parent = parent_order_cell.borrow();
2354 if parent.contingency_type() != Some(ContingencyType::Oto) {
2355 continue;
2356 }
2357 let Some(parent_position_id) = parent.position_id() else {
2358 continue;
2359 };
2360 let Some(linked_order_ids) = parent.linked_order_ids() else {
2361 continue;
2362 };
2363
2364 for client_order_id in linked_order_ids {
2365 match self.orders.get(client_order_id) {
2366 None => {
2367 log::error!("Contingency order {client_order_id} not found");
2368 }
2369 Some(contingent_order_cell) => {
2370 if contingent_order_cell.borrow().position_id().is_none() {
2371 assignments.push((parent_position_id, *client_order_id));
2372 }
2373 }
2374 }
2375 }
2376 }
2377
2378 for (position_id, client_order_id) in assignments {
2379 let Some((venue, strategy_id)) = self.orders.get(&client_order_id).map(|order_cell| {
2380 let mut contingent = order_cell.borrow_mut();
2381 contingent.set_position_id(Some(position_id));
2382 (contingent.instrument_id().venue, contingent.strategy_id())
2383 }) else {
2384 continue;
2385 };
2386
2387 self.index
2394 .order_position
2395 .insert(client_order_id, position_id);
2396 self.index
2397 .position_strategy
2398 .insert(position_id, strategy_id);
2399 self.index
2400 .position_orders
2401 .entry(position_id)
2402 .or_default()
2403 .insert(client_order_id);
2404 self.index
2405 .strategy_positions
2406 .entry(strategy_id)
2407 .or_default()
2408 .insert(position_id);
2409 self.index
2410 .venue_positions
2411 .entry(venue)
2412 .or_default()
2413 .insert(position_id);
2414 }
2415 }
2416
2417 pub fn add_position(&mut self, position: &Position, _oms_type: OmsType) -> anyhow::Result<()> {
2423 self.positions
2424 .insert(position.id, SharedCell::new(position.clone()));
2425 self.index.positions.insert(position.id);
2426 self.index.positions_open.insert(position.id);
2427 self.index.positions_closed.remove(&position.id); log::debug!("Adding {position}");
2430
2431 self.add_position_id(
2432 &position.id,
2433 &position.instrument_id.venue,
2434 &position.opening_order_id,
2435 &position.strategy_id,
2436 )?;
2437
2438 let venue = position.instrument_id.venue;
2439 let venue_positions = self.index.venue_positions.entry(venue).or_default();
2440 venue_positions.insert(position.id);
2441
2442 let instrument_id = position.instrument_id;
2444 let instrument_positions = self
2445 .index
2446 .instrument_positions
2447 .entry(instrument_id)
2448 .or_default();
2449 instrument_positions.insert(position.id);
2450
2451 self.index
2453 .account_positions
2454 .entry(position.account_id)
2455 .or_default()
2456 .insert(position.id);
2457
2458 if let Some(database) = &mut self.database {
2459 database.add_position(position)?;
2460 }
2469
2470 Ok(())
2471 }
2472
2473 pub fn update_account(&mut self, account: &AccountAny) -> anyhow::Result<()> {
2482 let account_id = account.id();
2483 match self.accounts.get(&account_id) {
2484 Some(account_cell) => *account_cell.borrow_mut() = account.clone(),
2485 None => {
2486 self.accounts
2487 .insert(account_id, SharedCell::new(account.clone()));
2488 }
2489 }
2490
2491 if let Some(database) = &mut self.database {
2492 database.update_account(account)?;
2493 }
2494 Ok(())
2495 }
2496
2497 #[must_use]
2512 pub fn take_account(&mut self, account_id: &AccountId) -> Option<AccountAny> {
2513 self.accounts.remove(account_id).map(|cell| {
2514 let rc: Rc<RefCell<AccountAny>> = cell.into();
2515 Rc::try_unwrap(rc).map_or_else(
2516 |_| panic!("take_account: cache must be sole owner of {account_id} cell"),
2517 RefCell::into_inner,
2518 )
2519 })
2520 }
2521
2522 pub fn cache_account_owned(&mut self, account: AccountAny) {
2524 let account_id = account.id();
2525 self.index
2526 .venue_account
2527 .insert(account_id.get_issuer(), account_id);
2528 match self.accounts.get(&account_id) {
2529 Some(account_cell) => *account_cell.borrow_mut() = account,
2530 None => {
2531 self.accounts.insert(account_id, SharedCell::new(account));
2532 }
2533 }
2534 }
2535
2536 pub fn update_account_owned(&mut self, account: AccountAny) -> anyhow::Result<()> {
2542 let account_id = account.id();
2543 self.cache_account_owned(account);
2544
2545 if let Some(database) = &mut self.database {
2546 let Some(account_cell) = self.accounts.get(&account_id) else {
2547 anyhow::bail!("Account {account_id} not found after cache update");
2548 };
2549 database.update_account(&account_cell.borrow())?;
2550 }
2551 Ok(())
2552 }
2553
2554 pub fn update_account_state(&mut self, event: &AccountState) -> anyhow::Result<()> {
2564 let Some(cell) = self.accounts.get(&event.account_id) else {
2565 return self.add_account(AccountAny::from_events(std::slice::from_ref(event))?);
2566 };
2567
2568 cell.borrow_mut().apply(event.clone())?;
2569
2570 if let Some(database) = &mut self.database {
2571 database.update_account(&cell.borrow())?;
2572 }
2573 Ok(())
2574 }
2575
2576 pub fn replace_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
2585 self.refresh_order(order)?;
2586
2587 let client_order_id = order.client_order_id();
2588 match self.orders.get(&client_order_id) {
2589 Some(order_cell) => *order_cell.borrow_mut() = order.clone(),
2592 None => {
2593 self.orders
2594 .insert(client_order_id, SharedCell::new(order.clone()));
2595 }
2596 }
2597
2598 Ok(())
2599 }
2600
2601 pub fn update_order(&mut self, event: &OrderEventAny) -> anyhow::Result<OrderAny> {
2607 let event_client_order_id = event.client_order_id();
2608 let client_order_id = if self.order_exists(&event_client_order_id) {
2609 event_client_order_id
2610 } else if let Some(venue_order_id) = event.venue_order_id() {
2611 self.index
2612 .venue_order_ids
2613 .get(&venue_order_id)
2614 .copied()
2615 .ok_or(OrderError::NotFound(event_client_order_id))?
2616 } else {
2617 return Err(OrderError::NotFound(event_client_order_id).into());
2618 };
2619
2620 let order_cell = self
2621 .orders
2622 .get(&client_order_id)
2623 .cloned()
2624 .ok_or(OrderError::NotFound(client_order_id))?;
2625
2626 let mut snapshot = order_cell.borrow().clone();
2630 snapshot.apply(event.clone())?;
2631 *order_cell.borrow_mut() = snapshot.clone();
2632
2633 if let Err(e) = self.refresh_order(&snapshot) {
2634 log::error!("Error updating order in cache: {e}");
2635 }
2636
2637 Ok(snapshot)
2638 }
2639
2640 fn refresh_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
2641 let client_order_id = order.client_order_id();
2642
2643 if order.is_active_local() {
2644 self.index.orders_active_local.insert(client_order_id);
2645 } else {
2646 self.index.orders_active_local.remove(&client_order_id);
2647 }
2648
2649 if let Some(venue_order_id) = order.venue_order_id() {
2651 if !self.index.venue_order_ids.contains_key(&venue_order_id) {
2654 let overwrite = matches!(order.last_event(), OrderEventAny::Updated(_));
2655 if let Err(e) =
2656 self.add_venue_order_id(&order.client_order_id(), &venue_order_id, overwrite)
2657 {
2658 log::error!("Error indexing venue order ID in cache: {e}");
2659 }
2660 }
2661 }
2662
2663 if order.is_inflight() {
2665 self.index.orders_inflight.insert(client_order_id);
2666 } else {
2667 self.index.orders_inflight.remove(&client_order_id);
2668 }
2669
2670 if order.is_open() {
2672 self.index.orders_closed.remove(&client_order_id);
2673 self.index.orders_open.insert(client_order_id);
2674 } else if order.is_closed() {
2675 self.index.orders_open.remove(&client_order_id);
2676 self.index.orders_pending_cancel.remove(&client_order_id);
2677 self.index.orders_closed.insert(client_order_id);
2678 }
2679
2680 if let Some(emulation_trigger) = order.emulation_trigger()
2682 && emulation_trigger != TriggerType::NoTrigger
2683 && !order.is_closed()
2684 {
2685 self.index.orders_emulated.insert(client_order_id);
2686 } else {
2687 self.index.orders_emulated.remove(&client_order_id);
2688 }
2689
2690 if let Some(account_id) = order.account_id() {
2692 self.index
2693 .account_orders
2694 .entry(account_id)
2695 .or_default()
2696 .insert(client_order_id);
2697 }
2698
2699 if !self.own_books.is_empty() {
2701 let own_book = self.own_order_book(&order.instrument_id());
2702 if (own_book.is_some() && order.is_closed()) || should_handle_own_book_order(order) {
2703 self.update_own_order_book(order);
2704 }
2705 }
2706
2707 if let Some(database) = &mut self.database {
2708 database.update_order(order.last_event())?;
2709 }
2714
2715 Ok(())
2716 }
2717
2718 pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
2720 self.index
2721 .orders_pending_cancel
2722 .insert(order.client_order_id());
2723 }
2724
2725 pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
2734 if position.is_open() {
2737 self.index.positions_open.insert(position.id);
2738 self.index.positions_closed.remove(&position.id);
2739 } else {
2740 self.index.positions_closed.insert(position.id);
2741 self.index.positions_open.remove(&position.id);
2742 }
2743
2744 if let Some(database) = &mut self.database {
2745 database.update_position(position)?;
2746 }
2751
2752 match self.positions.get(&position.id) {
2753 Some(position_cell) => *position_cell.borrow_mut() = position.clone(),
2754 None => {
2755 self.positions
2756 .insert(position.id, SharedCell::new(position.clone()));
2757 }
2758 }
2759
2760 Ok(())
2761 }
2762
2763 pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<CacheSnapshotRef> {
2770 let position_id = position.id;
2771
2772 let mut copied_position = position.clone();
2773 let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
2774 copied_position.id = PositionId::new(new_id);
2775
2776 let position_serialized = serde_json::to_vec(&copied_position)?;
2778 let snapshot_index = self.position_snapshot_count(&position_id);
2779 let blob_ref = format!(
2780 "cache://position-snapshots/{}/{}",
2781 position_id.as_str(),
2782 snapshot_index,
2783 );
2784 let snapshot_blob = Bytes::from(position_serialized);
2785
2786 self.add(&blob_ref, snapshot_blob.clone())?;
2787 self.position_snapshots
2788 .entry(position_id)
2789 .or_default()
2790 .push(snapshot_blob.clone());
2791
2792 log::debug!("Snapshot {copied_position}");
2793 Ok(CacheSnapshotRef::new(blob_ref, snapshot_blob))
2794 }
2795
2796 pub fn load_snapshot_blob(&mut self, blob_ref: &str) -> anyhow::Result<Option<Bytes>> {
2806 if let Some(blob) = self.snapshot_blob(blob_ref) {
2807 return Ok(Some(blob));
2808 }
2809
2810 if self.database.is_some() {
2811 self.cache_general()?;
2812 }
2813
2814 Ok(self.snapshot_blob(blob_ref))
2815 }
2816
2817 pub fn restore_snapshot_blob(&mut self, blob_ref: &str, blob: Bytes) -> anyhow::Result<()> {
2827 let (position_id, snapshot_index) = parse_position_snapshot_blob_ref(blob_ref)?;
2828 validate_position_snapshot_blob(&position_id, blob.as_ref())?;
2829
2830 let frames = self.position_snapshots.entry(position_id).or_default();
2831 match frames.get(snapshot_index) {
2832 Some(existing) if existing == &blob => {}
2833 Some(_) => {
2834 anyhow::bail!(
2835 "position snapshot frame {snapshot_index} for {position_id} already exists with different bytes"
2836 );
2837 }
2838 None if frames.len() == snapshot_index => frames.push(blob.clone()),
2839 None => {
2840 anyhow::bail!(
2841 "position snapshot blob_ref {blob_ref} skips missing frame {}",
2842 frames.len()
2843 );
2844 }
2845 }
2846
2847 self.general.insert(blob_ref.to_string(), blob);
2848 Ok(())
2849 }
2850
2851 fn snapshot_blob(&self, blob_ref: &str) -> Option<Bytes> {
2852 if let Some(blob) = self.general.get(blob_ref) {
2853 return Some(blob.clone());
2854 }
2855
2856 let (position_id, snapshot_index) = parse_position_snapshot_blob_ref(blob_ref).ok()?;
2857 self.position_snapshots
2858 .get(&position_id)
2859 .and_then(|frames| frames.get(snapshot_index))
2860 .cloned()
2861 }
2862
2863 pub fn snapshot_position_state(
2869 &mut self,
2870 position: &Position,
2871 open_only: Option<bool>,
2874 ) -> anyhow::Result<()> {
2875 let open_only = open_only.unwrap_or(true);
2876
2877 if open_only && !position.is_open() {
2878 return Ok(());
2879 }
2880
2881 if let Some(database) = &mut self.database {
2882 database.snapshot_position_state(position).map_err(|e| {
2883 log::error!(
2884 "Failed to snapshot position state for {}: {e:?}",
2885 position.id
2886 );
2887 e
2888 })?;
2889 } else {
2890 log::warn!(
2891 "Cannot snapshot position state for {} (no database configured)",
2892 position.id
2893 );
2894 }
2895
2896 todo!()
2898 }
2899
2900 #[must_use]
2902 pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
2903 if self.index.position_strategy.contains_key(position_id) {
2905 Some(OmsType::Netting)
2908 } else {
2909 None
2910 }
2911 }
2912
2913 #[must_use]
2918 pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<Vec<u8>>> {
2919 self.position_snapshots
2920 .get(position_id)
2921 .map(|frames| frames.iter().map(|b| b.to_vec()).collect())
2922 }
2923
2924 #[must_use]
2928 pub fn position_snapshot_count(&self, position_id: &PositionId) -> usize {
2929 self.position_snapshots.get(position_id).map_or(0, Vec::len)
2930 }
2931
2932 #[must_use]
2938 pub fn position_snapshots(
2939 &self,
2940 position_id: Option<&PositionId>,
2941 account_id: Option<&AccountId>,
2942 ) -> Vec<Position> {
2943 let frames: Box<dyn Iterator<Item = &Bytes> + '_> = match position_id {
2944 Some(pid) => match self.position_snapshots.get(pid) {
2945 Some(v) => Box::new(v.iter()),
2946 None => Box::new(std::iter::empty()),
2947 },
2948 None => Box::new(self.position_snapshots.values().flat_map(|v| v.iter())),
2949 };
2950
2951 let mut results: Vec<Position> = frames
2952 .filter_map(|bytes| match serde_json::from_slice::<Position>(bytes) {
2953 Ok(position) => Some(position),
2954 Err(e) => {
2955 log::warn!("Failed to decode position snapshot: {e}");
2956 None
2957 }
2958 })
2959 .collect();
2960
2961 if let Some(aid) = account_id {
2962 results.retain(|p| p.account_id == *aid);
2963 }
2964
2965 results
2966 }
2967
2968 #[must_use]
2974 pub fn position_snapshots_from(&self, position_id: &PositionId, skip: usize) -> Vec<Position> {
2975 let Some(frames) = self.position_snapshots.get(position_id) else {
2976 return Vec::new();
2977 };
2978
2979 frames
2980 .iter()
2981 .skip(skip)
2982 .filter_map(|bytes| match serde_json::from_slice::<Position>(bytes) {
2983 Ok(position) => Some(position),
2984 Err(e) => {
2985 log::warn!("Failed to decode position snapshot: {e}");
2986 None
2987 }
2988 })
2989 .collect()
2990 }
2991
2992 #[must_use]
2994 pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> AHashSet<PositionId> {
2995 let mut result = AHashSet::new();
2997
2998 for (position_id, _) in &self.position_snapshots {
2999 if let Some(position_cell) = self.positions.get(position_id)
3001 && position_cell.borrow().instrument_id == *instrument_id
3002 {
3003 result.insert(*position_id);
3004 }
3005 }
3006 result
3007 }
3008
3009 pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
3015 let Some(database) = &self.database else {
3016 log::warn!(
3017 "Cannot snapshot order state for {} (no database configured)",
3018 order.client_order_id()
3019 );
3020 return Ok(());
3021 };
3022
3023 database.snapshot_order_state(order)
3024 }
3025
3026 fn collect_order_filter_sources<'a>(
3037 &'a self,
3038 venue: Option<&Venue>,
3039 instrument_id: Option<&InstrumentId>,
3040 strategy_id: Option<&StrategyId>,
3041 account_id: Option<&AccountId>,
3042 ) -> FilterSources<'a, ClientOrderId> {
3043 let mut sources: Vec<&AHashSet<ClientOrderId>> = Vec::with_capacity(4);
3044
3045 if let Some(venue) = venue {
3046 match self.index.venue_orders.get(venue) {
3047 Some(set) => sources.push(set),
3048 None => return FilterSources::Empty,
3049 }
3050 }
3051
3052 if let Some(instrument_id) = instrument_id {
3053 match self.index.instrument_orders.get(instrument_id) {
3054 Some(set) => sources.push(set),
3055 None => return FilterSources::Empty,
3056 }
3057 }
3058
3059 if let Some(strategy_id) = strategy_id {
3060 match self.index.strategy_orders.get(strategy_id) {
3061 Some(set) => sources.push(set),
3062 None => return FilterSources::Empty,
3063 }
3064 }
3065
3066 if let Some(account_id) = account_id {
3067 match self.index.account_orders.get(account_id) {
3068 Some(set) => sources.push(set),
3069 None => return FilterSources::Empty,
3070 }
3071 }
3072
3073 if sources.is_empty() {
3074 FilterSources::Unfiltered
3075 } else {
3076 FilterSources::Sets(sources)
3077 }
3078 }
3079
3080 fn collect_position_filter_sources<'a>(
3081 &'a self,
3082 venue: Option<&Venue>,
3083 instrument_id: Option<&InstrumentId>,
3084 strategy_id: Option<&StrategyId>,
3085 account_id: Option<&AccountId>,
3086 ) -> FilterSources<'a, PositionId> {
3087 let mut sources: Vec<&AHashSet<PositionId>> = Vec::with_capacity(4);
3088
3089 if let Some(venue) = venue {
3090 match self.index.venue_positions.get(venue) {
3091 Some(set) => sources.push(set),
3092 None => return FilterSources::Empty,
3093 }
3094 }
3095
3096 if let Some(instrument_id) = instrument_id {
3097 match self.index.instrument_positions.get(instrument_id) {
3098 Some(set) => sources.push(set),
3099 None => return FilterSources::Empty,
3100 }
3101 }
3102
3103 if let Some(strategy_id) = strategy_id {
3104 match self.index.strategy_positions.get(strategy_id) {
3105 Some(set) => sources.push(set),
3106 None => return FilterSources::Empty,
3107 }
3108 }
3109
3110 if let Some(account_id) = account_id {
3111 match self.index.account_positions.get(account_id) {
3112 Some(set) => sources.push(set),
3113 None => return FilterSources::Empty,
3114 }
3115 }
3116
3117 if sources.is_empty() {
3118 FilterSources::Unfiltered
3119 } else {
3120 FilterSources::Sets(sources)
3121 }
3122 }
3123
3124 fn query_orders_in_bucket(
3130 &self,
3131 bucket: &AHashSet<ClientOrderId>,
3132 venue: Option<&Venue>,
3133 instrument_id: Option<&InstrumentId>,
3134 strategy_id: Option<&StrategyId>,
3135 account_id: Option<&AccountId>,
3136 ) -> AHashSet<ClientOrderId> {
3137 match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
3138 FilterSources::Empty => AHashSet::new(),
3139 FilterSources::Unfiltered => bucket.clone(),
3140 FilterSources::Sets(sources) => intersect_pair_or_many(bucket, sources),
3141 }
3142 }
3143
3144 fn query_positions_in_bucket(
3145 &self,
3146 bucket: &AHashSet<PositionId>,
3147 venue: Option<&Venue>,
3148 instrument_id: Option<&InstrumentId>,
3149 strategy_id: Option<&StrategyId>,
3150 account_id: Option<&AccountId>,
3151 ) -> AHashSet<PositionId> {
3152 match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
3153 FilterSources::Empty => AHashSet::new(),
3154 FilterSources::Unfiltered => bucket.clone(),
3155 FilterSources::Sets(sources) => intersect_pair_or_many(bucket, sources),
3156 }
3157 }
3158
3159 fn view_orders_in_bucket<'a>(
3162 &'a self,
3163 bucket: &'a AHashSet<ClientOrderId>,
3164 venue: Option<&Venue>,
3165 instrument_id: Option<&InstrumentId>,
3166 strategy_id: Option<&StrategyId>,
3167 account_id: Option<&AccountId>,
3168 ) -> Cow<'a, AHashSet<ClientOrderId>> {
3169 match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
3170 FilterSources::Empty => Cow::Owned(AHashSet::new()),
3171 FilterSources::Unfiltered => Cow::Borrowed(bucket),
3172 FilterSources::Sets(sources) => Cow::Owned(intersect_pair_or_many(bucket, sources)),
3173 }
3174 }
3175
3176 fn view_positions_in_bucket<'a>(
3177 &'a self,
3178 bucket: &'a AHashSet<PositionId>,
3179 venue: Option<&Venue>,
3180 instrument_id: Option<&InstrumentId>,
3181 strategy_id: Option<&StrategyId>,
3182 account_id: Option<&AccountId>,
3183 ) -> Cow<'a, AHashSet<PositionId>> {
3184 match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
3185 FilterSources::Empty => Cow::Owned(AHashSet::new()),
3186 FilterSources::Unfiltered => Cow::Borrowed(bucket),
3187 FilterSources::Sets(sources) => Cow::Owned(intersect_pair_or_many(bucket, sources)),
3188 }
3189 }
3190
3191 fn iter_orders_in_bucket<'a>(
3196 &'a self,
3197 bucket: &'a AHashSet<ClientOrderId>,
3198 venue: Option<&Venue>,
3199 instrument_id: Option<&InstrumentId>,
3200 strategy_id: Option<&StrategyId>,
3201 account_id: Option<&AccountId>,
3202 ) -> Box<dyn Iterator<Item = ClientOrderId> + 'a> {
3203 match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
3204 FilterSources::Empty => Box::new(std::iter::empty()),
3205 FilterSources::Unfiltered => Box::new(bucket.iter().copied()),
3206 FilterSources::Sets(mut sources) => {
3207 sources.push(bucket);
3208 sources.sort_unstable_by_key(|s| s.len());
3209 let driver = sources[0];
3210 let rest: Vec<&'a AHashSet<ClientOrderId>> = sources[1..].to_vec();
3211 Box::new(
3212 driver
3213 .iter()
3214 .copied()
3215 .filter(move |id| rest.iter().all(|s| s.contains(id))),
3216 )
3217 }
3218 }
3219 }
3220
3221 fn iter_positions_in_bucket<'a>(
3222 &'a self,
3223 bucket: &'a AHashSet<PositionId>,
3224 venue: Option<&Venue>,
3225 instrument_id: Option<&InstrumentId>,
3226 strategy_id: Option<&StrategyId>,
3227 account_id: Option<&AccountId>,
3228 ) -> Box<dyn Iterator<Item = PositionId> + 'a> {
3229 match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
3230 FilterSources::Empty => Box::new(std::iter::empty()),
3231 FilterSources::Unfiltered => Box::new(bucket.iter().copied()),
3232 FilterSources::Sets(mut sources) => {
3233 sources.push(bucket);
3234 sources.sort_unstable_by_key(|s| s.len());
3235 let driver = sources[0];
3236 let rest: Vec<&'a AHashSet<PositionId>> = sources[1..].to_vec();
3237 Box::new(
3238 driver
3239 .iter()
3240 .copied()
3241 .filter(move |id| rest.iter().all(|s| s.contains(id))),
3242 )
3243 }
3244 }
3245 }
3246
3247 fn count_orders_in_bucket(
3253 &self,
3254 bucket: &AHashSet<ClientOrderId>,
3255 venue: Option<&Venue>,
3256 instrument_id: Option<&InstrumentId>,
3257 strategy_id: Option<&StrategyId>,
3258 account_id: Option<&AccountId>,
3259 side: Option<OrderSide>,
3260 ) -> usize {
3261 let side = side.unwrap_or(OrderSide::NoOrderSide);
3262
3263 match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
3264 FilterSources::Empty => 0,
3265 FilterSources::Unfiltered => {
3266 if side == OrderSide::NoOrderSide {
3267 bucket.len()
3268 } else {
3269 bucket
3270 .iter()
3271 .filter(|id| self.order_side_matches(id, side))
3272 .count()
3273 }
3274 }
3275 FilterSources::Sets(mut sources) => {
3276 sources.push(bucket);
3277 sources.sort_unstable_by_key(|s| s.len());
3278 let driver = sources[0];
3279 let rest = &sources[1..];
3280
3281 driver
3282 .iter()
3283 .filter(|id| rest.iter().all(|s| s.contains(id)))
3284 .filter(|id| {
3285 side == OrderSide::NoOrderSide || self.order_side_matches(id, side)
3286 })
3287 .count()
3288 }
3289 }
3290 }
3291
3292 fn count_positions_in_bucket(
3293 &self,
3294 bucket: &AHashSet<PositionId>,
3295 venue: Option<&Venue>,
3296 instrument_id: Option<&InstrumentId>,
3297 strategy_id: Option<&StrategyId>,
3298 account_id: Option<&AccountId>,
3299 side: Option<PositionSide>,
3300 ) -> usize {
3301 let side = side.unwrap_or(PositionSide::NoPositionSide);
3302
3303 match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
3304 FilterSources::Empty => 0,
3305 FilterSources::Unfiltered => {
3306 if side == PositionSide::NoPositionSide {
3307 bucket.len()
3308 } else {
3309 bucket
3310 .iter()
3311 .filter(|id| self.position_side_matches(id, side))
3312 .count()
3313 }
3314 }
3315 FilterSources::Sets(mut sources) => {
3316 sources.push(bucket);
3317 sources.sort_unstable_by_key(|s| s.len());
3318 let driver = sources[0];
3319 let rest = &sources[1..];
3320
3321 driver
3322 .iter()
3323 .filter(|id| rest.iter().all(|s| s.contains(id)))
3324 .filter(|id| {
3325 side == PositionSide::NoPositionSide || self.position_side_matches(id, side)
3326 })
3327 .count()
3328 }
3329 }
3330 }
3331
3332 fn any_orders_in_bucket(
3338 &self,
3339 bucket: &AHashSet<ClientOrderId>,
3340 venue: Option<&Venue>,
3341 instrument_id: Option<&InstrumentId>,
3342 strategy_id: Option<&StrategyId>,
3343 account_id: Option<&AccountId>,
3344 side: Option<OrderSide>,
3345 ) -> bool {
3346 let side = side.unwrap_or(OrderSide::NoOrderSide);
3347
3348 match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
3349 FilterSources::Empty => false,
3350 FilterSources::Unfiltered => {
3351 if side == OrderSide::NoOrderSide {
3352 !bucket.is_empty()
3353 } else {
3354 bucket.iter().any(|id| self.order_side_matches(id, side))
3355 }
3356 }
3357 FilterSources::Sets(mut sources) => {
3358 sources.push(bucket);
3359 sources.sort_unstable_by_key(|s| s.len());
3360 let driver = sources[0];
3361 let rest = &sources[1..];
3362
3363 driver
3364 .iter()
3365 .filter(|id| rest.iter().all(|s| s.contains(id)))
3366 .any(|id| side == OrderSide::NoOrderSide || self.order_side_matches(id, side))
3367 }
3368 }
3369 }
3370
3371 fn any_positions_in_bucket(
3372 &self,
3373 bucket: &AHashSet<PositionId>,
3374 venue: Option<&Venue>,
3375 instrument_id: Option<&InstrumentId>,
3376 strategy_id: Option<&StrategyId>,
3377 account_id: Option<&AccountId>,
3378 side: Option<PositionSide>,
3379 ) -> bool {
3380 let side = side.unwrap_or(PositionSide::NoPositionSide);
3381
3382 match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
3383 FilterSources::Empty => false,
3384 FilterSources::Unfiltered => {
3385 if side == PositionSide::NoPositionSide {
3386 !bucket.is_empty()
3387 } else {
3388 bucket.iter().any(|id| self.position_side_matches(id, side))
3389 }
3390 }
3391 FilterSources::Sets(mut sources) => {
3392 sources.push(bucket);
3393 sources.sort_unstable_by_key(|s| s.len());
3394 let driver = sources[0];
3395 let rest = &sources[1..];
3396
3397 driver
3398 .iter()
3399 .filter(|id| rest.iter().all(|s| s.contains(id)))
3400 .any(|id| {
3401 side == PositionSide::NoPositionSide || self.position_side_matches(id, side)
3402 })
3403 }
3404 }
3405 }
3406
3407 fn order_side_matches(&self, client_order_id: &ClientOrderId, side: OrderSide) -> bool {
3408 self.orders
3409 .get(client_order_id)
3410 .is_some_and(|cell| cell.borrow().order_side() == side)
3411 }
3412
3413 fn position_side_matches(&self, position_id: &PositionId, side: PositionSide) -> bool {
3414 self.positions
3415 .get(position_id)
3416 .is_some_and(|cell| cell.borrow().side == side)
3417 }
3418
3419 fn get_orders_for_ids(
3425 &self,
3426 client_order_ids: &AHashSet<ClientOrderId>,
3427 side: Option<OrderSide>,
3428 ) -> Vec<OrderRef<'_>> {
3429 let side = side.unwrap_or(OrderSide::NoOrderSide);
3430 let mut orders = Vec::new();
3431
3432 for client_order_id in client_order_ids {
3433 let order_cell = self
3434 .orders
3435 .get(client_order_id)
3436 .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
3437 let order = OrderRef::new(order_cell.borrow());
3438
3439 if side == OrderSide::NoOrderSide || side == order.order_side() {
3440 orders.push(order);
3441 }
3442 }
3443
3444 orders.sort_by_key(|o| o.client_order_id());
3447 orders
3448 }
3449
3450 fn get_positions_for_ids(
3460 &self,
3461 position_ids: &AHashSet<PositionId>,
3462 side: Option<PositionSide>,
3463 ) -> Vec<PositionRef<'_>> {
3464 let side = side.unwrap_or(PositionSide::NoPositionSide);
3465 let mut positions = Vec::new();
3466
3467 for position_id in position_ids {
3468 let position_cell = self
3469 .positions
3470 .get(position_id)
3471 .unwrap_or_else(|| panic!("Position {position_id} not found"));
3472 let position = PositionRef::new(position_cell.borrow());
3473
3474 if side == PositionSide::NoPositionSide || side == position.side {
3475 positions.push(position);
3476 }
3477 }
3478
3479 positions.sort_by_key(|p| p.id);
3482 positions
3483 }
3484
3485 #[must_use]
3487 pub fn client_order_ids(
3488 &self,
3489 venue: Option<&Venue>,
3490 instrument_id: Option<&InstrumentId>,
3491 strategy_id: Option<&StrategyId>,
3492 account_id: Option<&AccountId>,
3493 ) -> AHashSet<ClientOrderId> {
3494 self.query_orders_in_bucket(
3495 &self.index.orders,
3496 venue,
3497 instrument_id,
3498 strategy_id,
3499 account_id,
3500 )
3501 }
3502
3503 #[must_use]
3505 pub fn client_order_ids_open(
3506 &self,
3507 venue: Option<&Venue>,
3508 instrument_id: Option<&InstrumentId>,
3509 strategy_id: Option<&StrategyId>,
3510 account_id: Option<&AccountId>,
3511 ) -> AHashSet<ClientOrderId> {
3512 self.query_orders_in_bucket(
3513 &self.index.orders_open,
3514 venue,
3515 instrument_id,
3516 strategy_id,
3517 account_id,
3518 )
3519 }
3520
3521 #[must_use]
3523 pub fn client_order_ids_closed(
3524 &self,
3525 venue: Option<&Venue>,
3526 instrument_id: Option<&InstrumentId>,
3527 strategy_id: Option<&StrategyId>,
3528 account_id: Option<&AccountId>,
3529 ) -> AHashSet<ClientOrderId> {
3530 self.query_orders_in_bucket(
3531 &self.index.orders_closed,
3532 venue,
3533 instrument_id,
3534 strategy_id,
3535 account_id,
3536 )
3537 }
3538
3539 #[must_use]
3544 pub fn client_order_ids_active_local(
3545 &self,
3546 venue: Option<&Venue>,
3547 instrument_id: Option<&InstrumentId>,
3548 strategy_id: Option<&StrategyId>,
3549 account_id: Option<&AccountId>,
3550 ) -> AHashSet<ClientOrderId> {
3551 self.query_orders_in_bucket(
3552 &self.index.orders_active_local,
3553 venue,
3554 instrument_id,
3555 strategy_id,
3556 account_id,
3557 )
3558 }
3559
3560 #[must_use]
3562 pub fn client_order_ids_emulated(
3563 &self,
3564 venue: Option<&Venue>,
3565 instrument_id: Option<&InstrumentId>,
3566 strategy_id: Option<&StrategyId>,
3567 account_id: Option<&AccountId>,
3568 ) -> AHashSet<ClientOrderId> {
3569 self.query_orders_in_bucket(
3570 &self.index.orders_emulated,
3571 venue,
3572 instrument_id,
3573 strategy_id,
3574 account_id,
3575 )
3576 }
3577
3578 #[must_use]
3580 pub fn client_order_ids_inflight(
3581 &self,
3582 venue: Option<&Venue>,
3583 instrument_id: Option<&InstrumentId>,
3584 strategy_id: Option<&StrategyId>,
3585 account_id: Option<&AccountId>,
3586 ) -> AHashSet<ClientOrderId> {
3587 self.query_orders_in_bucket(
3588 &self.index.orders_inflight,
3589 venue,
3590 instrument_id,
3591 strategy_id,
3592 account_id,
3593 )
3594 }
3595
3596 #[must_use]
3598 pub fn position_ids(
3599 &self,
3600 venue: Option<&Venue>,
3601 instrument_id: Option<&InstrumentId>,
3602 strategy_id: Option<&StrategyId>,
3603 account_id: Option<&AccountId>,
3604 ) -> AHashSet<PositionId> {
3605 self.query_positions_in_bucket(
3606 &self.index.positions,
3607 venue,
3608 instrument_id,
3609 strategy_id,
3610 account_id,
3611 )
3612 }
3613
3614 #[must_use]
3616 pub fn position_open_ids(
3617 &self,
3618 venue: Option<&Venue>,
3619 instrument_id: Option<&InstrumentId>,
3620 strategy_id: Option<&StrategyId>,
3621 account_id: Option<&AccountId>,
3622 ) -> AHashSet<PositionId> {
3623 self.query_positions_in_bucket(
3624 &self.index.positions_open,
3625 venue,
3626 instrument_id,
3627 strategy_id,
3628 account_id,
3629 )
3630 }
3631
3632 #[must_use]
3634 pub fn position_closed_ids(
3635 &self,
3636 venue: Option<&Venue>,
3637 instrument_id: Option<&InstrumentId>,
3638 strategy_id: Option<&StrategyId>,
3639 account_id: Option<&AccountId>,
3640 ) -> AHashSet<PositionId> {
3641 self.query_positions_in_bucket(
3642 &self.index.positions_closed,
3643 venue,
3644 instrument_id,
3645 strategy_id,
3646 account_id,
3647 )
3648 }
3649
3650 #[must_use]
3657 pub fn client_order_ids_view(
3658 &self,
3659 venue: Option<&Venue>,
3660 instrument_id: Option<&InstrumentId>,
3661 strategy_id: Option<&StrategyId>,
3662 account_id: Option<&AccountId>,
3663 ) -> Cow<'_, AHashSet<ClientOrderId>> {
3664 self.view_orders_in_bucket(
3665 &self.index.orders,
3666 venue,
3667 instrument_id,
3668 strategy_id,
3669 account_id,
3670 )
3671 }
3672
3673 #[must_use]
3675 pub fn client_order_ids_open_view(
3676 &self,
3677 venue: Option<&Venue>,
3678 instrument_id: Option<&InstrumentId>,
3679 strategy_id: Option<&StrategyId>,
3680 account_id: Option<&AccountId>,
3681 ) -> Cow<'_, AHashSet<ClientOrderId>> {
3682 self.view_orders_in_bucket(
3683 &self.index.orders_open,
3684 venue,
3685 instrument_id,
3686 strategy_id,
3687 account_id,
3688 )
3689 }
3690
3691 #[must_use]
3693 pub fn client_order_ids_closed_view(
3694 &self,
3695 venue: Option<&Venue>,
3696 instrument_id: Option<&InstrumentId>,
3697 strategy_id: Option<&StrategyId>,
3698 account_id: Option<&AccountId>,
3699 ) -> Cow<'_, AHashSet<ClientOrderId>> {
3700 self.view_orders_in_bucket(
3701 &self.index.orders_closed,
3702 venue,
3703 instrument_id,
3704 strategy_id,
3705 account_id,
3706 )
3707 }
3708
3709 #[must_use]
3711 pub fn client_order_ids_active_local_view(
3712 &self,
3713 venue: Option<&Venue>,
3714 instrument_id: Option<&InstrumentId>,
3715 strategy_id: Option<&StrategyId>,
3716 account_id: Option<&AccountId>,
3717 ) -> Cow<'_, AHashSet<ClientOrderId>> {
3718 self.view_orders_in_bucket(
3719 &self.index.orders_active_local,
3720 venue,
3721 instrument_id,
3722 strategy_id,
3723 account_id,
3724 )
3725 }
3726
3727 #[must_use]
3729 pub fn client_order_ids_emulated_view(
3730 &self,
3731 venue: Option<&Venue>,
3732 instrument_id: Option<&InstrumentId>,
3733 strategy_id: Option<&StrategyId>,
3734 account_id: Option<&AccountId>,
3735 ) -> Cow<'_, AHashSet<ClientOrderId>> {
3736 self.view_orders_in_bucket(
3737 &self.index.orders_emulated,
3738 venue,
3739 instrument_id,
3740 strategy_id,
3741 account_id,
3742 )
3743 }
3744
3745 #[must_use]
3747 pub fn client_order_ids_inflight_view(
3748 &self,
3749 venue: Option<&Venue>,
3750 instrument_id: Option<&InstrumentId>,
3751 strategy_id: Option<&StrategyId>,
3752 account_id: Option<&AccountId>,
3753 ) -> Cow<'_, AHashSet<ClientOrderId>> {
3754 self.view_orders_in_bucket(
3755 &self.index.orders_inflight,
3756 venue,
3757 instrument_id,
3758 strategy_id,
3759 account_id,
3760 )
3761 }
3762
3763 #[must_use]
3765 pub fn position_ids_view(
3766 &self,
3767 venue: Option<&Venue>,
3768 instrument_id: Option<&InstrumentId>,
3769 strategy_id: Option<&StrategyId>,
3770 account_id: Option<&AccountId>,
3771 ) -> Cow<'_, AHashSet<PositionId>> {
3772 self.view_positions_in_bucket(
3773 &self.index.positions,
3774 venue,
3775 instrument_id,
3776 strategy_id,
3777 account_id,
3778 )
3779 }
3780
3781 #[must_use]
3783 pub fn position_open_ids_view(
3784 &self,
3785 venue: Option<&Venue>,
3786 instrument_id: Option<&InstrumentId>,
3787 strategy_id: Option<&StrategyId>,
3788 account_id: Option<&AccountId>,
3789 ) -> Cow<'_, AHashSet<PositionId>> {
3790 self.view_positions_in_bucket(
3791 &self.index.positions_open,
3792 venue,
3793 instrument_id,
3794 strategy_id,
3795 account_id,
3796 )
3797 }
3798
3799 #[must_use]
3801 pub fn position_closed_ids_view(
3802 &self,
3803 venue: Option<&Venue>,
3804 instrument_id: Option<&InstrumentId>,
3805 strategy_id: Option<&StrategyId>,
3806 account_id: Option<&AccountId>,
3807 ) -> Cow<'_, AHashSet<PositionId>> {
3808 self.view_positions_in_bucket(
3809 &self.index.positions_closed,
3810 venue,
3811 instrument_id,
3812 strategy_id,
3813 account_id,
3814 )
3815 }
3816
3817 pub fn iter_client_order_ids(
3823 &self,
3824 venue: Option<&Venue>,
3825 instrument_id: Option<&InstrumentId>,
3826 strategy_id: Option<&StrategyId>,
3827 account_id: Option<&AccountId>,
3828 ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3829 self.iter_orders_in_bucket(
3830 &self.index.orders,
3831 venue,
3832 instrument_id,
3833 strategy_id,
3834 account_id,
3835 )
3836 }
3837
3838 pub fn iter_client_order_ids_open(
3840 &self,
3841 venue: Option<&Venue>,
3842 instrument_id: Option<&InstrumentId>,
3843 strategy_id: Option<&StrategyId>,
3844 account_id: Option<&AccountId>,
3845 ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3846 self.iter_orders_in_bucket(
3847 &self.index.orders_open,
3848 venue,
3849 instrument_id,
3850 strategy_id,
3851 account_id,
3852 )
3853 }
3854
3855 pub fn iter_client_order_ids_closed(
3857 &self,
3858 venue: Option<&Venue>,
3859 instrument_id: Option<&InstrumentId>,
3860 strategy_id: Option<&StrategyId>,
3861 account_id: Option<&AccountId>,
3862 ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3863 self.iter_orders_in_bucket(
3864 &self.index.orders_closed,
3865 venue,
3866 instrument_id,
3867 strategy_id,
3868 account_id,
3869 )
3870 }
3871
3872 pub fn iter_client_order_ids_active_local(
3874 &self,
3875 venue: Option<&Venue>,
3876 instrument_id: Option<&InstrumentId>,
3877 strategy_id: Option<&StrategyId>,
3878 account_id: Option<&AccountId>,
3879 ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3880 self.iter_orders_in_bucket(
3881 &self.index.orders_active_local,
3882 venue,
3883 instrument_id,
3884 strategy_id,
3885 account_id,
3886 )
3887 }
3888
3889 pub fn iter_client_order_ids_emulated(
3891 &self,
3892 venue: Option<&Venue>,
3893 instrument_id: Option<&InstrumentId>,
3894 strategy_id: Option<&StrategyId>,
3895 account_id: Option<&AccountId>,
3896 ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3897 self.iter_orders_in_bucket(
3898 &self.index.orders_emulated,
3899 venue,
3900 instrument_id,
3901 strategy_id,
3902 account_id,
3903 )
3904 }
3905
3906 pub fn iter_client_order_ids_inflight(
3908 &self,
3909 venue: Option<&Venue>,
3910 instrument_id: Option<&InstrumentId>,
3911 strategy_id: Option<&StrategyId>,
3912 account_id: Option<&AccountId>,
3913 ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3914 self.iter_orders_in_bucket(
3915 &self.index.orders_inflight,
3916 venue,
3917 instrument_id,
3918 strategy_id,
3919 account_id,
3920 )
3921 }
3922
3923 pub fn iter_position_ids(
3925 &self,
3926 venue: Option<&Venue>,
3927 instrument_id: Option<&InstrumentId>,
3928 strategy_id: Option<&StrategyId>,
3929 account_id: Option<&AccountId>,
3930 ) -> Box<dyn Iterator<Item = PositionId> + '_> {
3931 self.iter_positions_in_bucket(
3932 &self.index.positions,
3933 venue,
3934 instrument_id,
3935 strategy_id,
3936 account_id,
3937 )
3938 }
3939
3940 pub fn iter_position_open_ids(
3942 &self,
3943 venue: Option<&Venue>,
3944 instrument_id: Option<&InstrumentId>,
3945 strategy_id: Option<&StrategyId>,
3946 account_id: Option<&AccountId>,
3947 ) -> Box<dyn Iterator<Item = PositionId> + '_> {
3948 self.iter_positions_in_bucket(
3949 &self.index.positions_open,
3950 venue,
3951 instrument_id,
3952 strategy_id,
3953 account_id,
3954 )
3955 }
3956
3957 pub fn iter_position_closed_ids(
3959 &self,
3960 venue: Option<&Venue>,
3961 instrument_id: Option<&InstrumentId>,
3962 strategy_id: Option<&StrategyId>,
3963 account_id: Option<&AccountId>,
3964 ) -> Box<dyn Iterator<Item = PositionId> + '_> {
3965 self.iter_positions_in_bucket(
3966 &self.index.positions_closed,
3967 venue,
3968 instrument_id,
3969 strategy_id,
3970 account_id,
3971 )
3972 }
3973
3974 #[must_use]
3976 pub fn actor_ids(&self) -> AHashSet<ComponentId> {
3977 self.index.actors.clone()
3978 }
3979
3980 #[must_use]
3982 pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
3983 self.index.strategies.clone()
3984 }
3985
3986 #[must_use]
3988 pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
3989 self.index.exec_algorithms.clone()
3990 }
3991
3992 #[must_use]
4001 pub fn order(&self, client_order_id: &ClientOrderId) -> Option<OrderRef<'_>> {
4002 self.orders
4003 .get(client_order_id)
4004 .map(|order_cell| OrderRef::new(order_cell.borrow()))
4005 }
4006
4007 #[must_use]
4017 pub fn order_mut(&mut self, client_order_id: &ClientOrderId) -> Option<OrderRefMut<'_>> {
4018 self.orders
4019 .get(client_order_id)
4020 .map(|order_cell| OrderRefMut::new(order_cell.borrow_mut()))
4021 }
4022
4023 #[must_use]
4028 pub fn order_owned(&self, client_order_id: &ClientOrderId) -> Option<OrderAny> {
4029 self.orders
4030 .get(client_order_id)
4031 .map(|order_cell| order_cell.borrow().clone())
4032 }
4033
4034 #[must_use]
4036 pub fn orders_for_ids(
4037 &self,
4038 client_order_ids: &[ClientOrderId],
4039 context: &dyn Display,
4040 ) -> Vec<OrderAny> {
4041 let mut orders = Vec::with_capacity(client_order_ids.len());
4042 for id in client_order_ids {
4043 match self.orders.get(id) {
4044 Some(order_cell) => orders.push(order_cell.borrow().clone()),
4045 None => log::error!("Order {id} not found in cache for {context}"),
4046 }
4047 }
4048 orders
4049 }
4050
4051 #[must_use]
4053 pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
4054 self.index.venue_order_ids.get(venue_order_id)
4055 }
4056
4057 #[must_use]
4059 pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
4060 self.index.client_order_ids.get(client_order_id)
4061 }
4062
4063 #[must_use]
4065 pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
4066 self.index.order_client.get(client_order_id)
4067 }
4068
4069 #[must_use]
4075 pub fn orders(
4076 &self,
4077 venue: Option<&Venue>,
4078 instrument_id: Option<&InstrumentId>,
4079 strategy_id: Option<&StrategyId>,
4080 account_id: Option<&AccountId>,
4081 side: Option<OrderSide>,
4082 ) -> Vec<OrderRef<'_>> {
4083 let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id, account_id);
4084 self.get_orders_for_ids(&client_order_ids, side)
4085 }
4086
4087 #[must_use]
4089 pub fn orders_open(
4090 &self,
4091 venue: Option<&Venue>,
4092 instrument_id: Option<&InstrumentId>,
4093 strategy_id: Option<&StrategyId>,
4094 account_id: Option<&AccountId>,
4095 side: Option<OrderSide>,
4096 ) -> Vec<OrderRef<'_>> {
4097 let client_order_ids =
4098 self.client_order_ids_open(venue, instrument_id, strategy_id, account_id);
4099 self.get_orders_for_ids(&client_order_ids, side)
4100 }
4101
4102 #[must_use]
4104 pub fn orders_closed(
4105 &self,
4106 venue: Option<&Venue>,
4107 instrument_id: Option<&InstrumentId>,
4108 strategy_id: Option<&StrategyId>,
4109 account_id: Option<&AccountId>,
4110 side: Option<OrderSide>,
4111 ) -> Vec<OrderRef<'_>> {
4112 let client_order_ids =
4113 self.client_order_ids_closed(venue, instrument_id, strategy_id, account_id);
4114 self.get_orders_for_ids(&client_order_ids, side)
4115 }
4116
4117 #[must_use]
4122 pub fn orders_active_local(
4123 &self,
4124 venue: Option<&Venue>,
4125 instrument_id: Option<&InstrumentId>,
4126 strategy_id: Option<&StrategyId>,
4127 account_id: Option<&AccountId>,
4128 side: Option<OrderSide>,
4129 ) -> Vec<OrderRef<'_>> {
4130 let client_order_ids =
4131 self.client_order_ids_active_local(venue, instrument_id, strategy_id, account_id);
4132 self.get_orders_for_ids(&client_order_ids, side)
4133 }
4134
4135 #[must_use]
4137 pub fn orders_emulated(
4138 &self,
4139 venue: Option<&Venue>,
4140 instrument_id: Option<&InstrumentId>,
4141 strategy_id: Option<&StrategyId>,
4142 account_id: Option<&AccountId>,
4143 side: Option<OrderSide>,
4144 ) -> Vec<OrderRef<'_>> {
4145 let client_order_ids =
4146 self.client_order_ids_emulated(venue, instrument_id, strategy_id, account_id);
4147 self.get_orders_for_ids(&client_order_ids, side)
4148 }
4149
4150 #[must_use]
4152 pub fn orders_inflight(
4153 &self,
4154 venue: Option<&Venue>,
4155 instrument_id: Option<&InstrumentId>,
4156 strategy_id: Option<&StrategyId>,
4157 account_id: Option<&AccountId>,
4158 side: Option<OrderSide>,
4159 ) -> Vec<OrderRef<'_>> {
4160 let client_order_ids =
4161 self.client_order_ids_inflight(venue, instrument_id, strategy_id, account_id);
4162 self.get_orders_for_ids(&client_order_ids, side)
4163 }
4164
4165 #[must_use]
4167 pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<OrderRef<'_>> {
4168 match self.index.position_orders.get(position_id) {
4169 Some(client_order_ids) => self.get_orders_for_ids(client_order_ids, None),
4170 None => Vec::new(),
4171 }
4172 }
4173
4174 #[must_use]
4176 pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
4177 self.index.orders.contains(client_order_id)
4178 }
4179
4180 #[must_use]
4182 pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
4183 self.index.orders_open.contains(client_order_id)
4184 }
4185
4186 #[must_use]
4188 pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
4189 self.index.orders_closed.contains(client_order_id)
4190 }
4191
4192 #[must_use]
4197 pub fn is_order_active_local(&self, client_order_id: &ClientOrderId) -> bool {
4198 self.index.orders_active_local.contains(client_order_id)
4199 }
4200
4201 #[must_use]
4203 pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
4204 self.index.orders_emulated.contains(client_order_id)
4205 }
4206
4207 #[must_use]
4209 pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
4210 self.index.orders_inflight.contains(client_order_id)
4211 }
4212
4213 #[must_use]
4215 pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
4216 self.index.orders_pending_cancel.contains(client_order_id)
4217 }
4218
4219 #[must_use]
4221 pub fn orders_open_count(
4222 &self,
4223 venue: Option<&Venue>,
4224 instrument_id: Option<&InstrumentId>,
4225 strategy_id: Option<&StrategyId>,
4226 account_id: Option<&AccountId>,
4227 side: Option<OrderSide>,
4228 ) -> usize {
4229 self.count_orders_in_bucket(
4230 &self.index.orders_open,
4231 venue,
4232 instrument_id,
4233 strategy_id,
4234 account_id,
4235 side,
4236 )
4237 }
4238
4239 #[must_use]
4241 pub fn orders_closed_count(
4242 &self,
4243 venue: Option<&Venue>,
4244 instrument_id: Option<&InstrumentId>,
4245 strategy_id: Option<&StrategyId>,
4246 account_id: Option<&AccountId>,
4247 side: Option<OrderSide>,
4248 ) -> usize {
4249 self.count_orders_in_bucket(
4250 &self.index.orders_closed,
4251 venue,
4252 instrument_id,
4253 strategy_id,
4254 account_id,
4255 side,
4256 )
4257 }
4258
4259 #[must_use]
4264 pub fn orders_active_local_count(
4265 &self,
4266 venue: Option<&Venue>,
4267 instrument_id: Option<&InstrumentId>,
4268 strategy_id: Option<&StrategyId>,
4269 account_id: Option<&AccountId>,
4270 side: Option<OrderSide>,
4271 ) -> usize {
4272 self.count_orders_in_bucket(
4273 &self.index.orders_active_local,
4274 venue,
4275 instrument_id,
4276 strategy_id,
4277 account_id,
4278 side,
4279 )
4280 }
4281
4282 #[must_use]
4284 pub fn orders_emulated_count(
4285 &self,
4286 venue: Option<&Venue>,
4287 instrument_id: Option<&InstrumentId>,
4288 strategy_id: Option<&StrategyId>,
4289 account_id: Option<&AccountId>,
4290 side: Option<OrderSide>,
4291 ) -> usize {
4292 self.count_orders_in_bucket(
4293 &self.index.orders_emulated,
4294 venue,
4295 instrument_id,
4296 strategy_id,
4297 account_id,
4298 side,
4299 )
4300 }
4301
4302 #[must_use]
4304 pub fn orders_inflight_count(
4305 &self,
4306 venue: Option<&Venue>,
4307 instrument_id: Option<&InstrumentId>,
4308 strategy_id: Option<&StrategyId>,
4309 account_id: Option<&AccountId>,
4310 side: Option<OrderSide>,
4311 ) -> usize {
4312 self.count_orders_in_bucket(
4313 &self.index.orders_inflight,
4314 venue,
4315 instrument_id,
4316 strategy_id,
4317 account_id,
4318 side,
4319 )
4320 }
4321
4322 #[must_use]
4324 pub fn orders_total_count(
4325 &self,
4326 venue: Option<&Venue>,
4327 instrument_id: Option<&InstrumentId>,
4328 strategy_id: Option<&StrategyId>,
4329 account_id: Option<&AccountId>,
4330 side: Option<OrderSide>,
4331 ) -> usize {
4332 self.count_orders_in_bucket(
4333 &self.index.orders,
4334 venue,
4335 instrument_id,
4336 strategy_id,
4337 account_id,
4338 side,
4339 )
4340 }
4341
4342 #[must_use]
4348 pub fn has_orders_open(
4349 &self,
4350 venue: Option<&Venue>,
4351 instrument_id: Option<&InstrumentId>,
4352 strategy_id: Option<&StrategyId>,
4353 account_id: Option<&AccountId>,
4354 side: Option<OrderSide>,
4355 ) -> bool {
4356 self.any_orders_in_bucket(
4357 &self.index.orders_open,
4358 venue,
4359 instrument_id,
4360 strategy_id,
4361 account_id,
4362 side,
4363 )
4364 }
4365
4366 #[must_use]
4368 pub fn has_orders_closed(
4369 &self,
4370 venue: Option<&Venue>,
4371 instrument_id: Option<&InstrumentId>,
4372 strategy_id: Option<&StrategyId>,
4373 account_id: Option<&AccountId>,
4374 side: Option<OrderSide>,
4375 ) -> bool {
4376 self.any_orders_in_bucket(
4377 &self.index.orders_closed,
4378 venue,
4379 instrument_id,
4380 strategy_id,
4381 account_id,
4382 side,
4383 )
4384 }
4385
4386 #[must_use]
4390 pub fn has_orders_active_local(
4391 &self,
4392 venue: Option<&Venue>,
4393 instrument_id: Option<&InstrumentId>,
4394 strategy_id: Option<&StrategyId>,
4395 account_id: Option<&AccountId>,
4396 side: Option<OrderSide>,
4397 ) -> bool {
4398 self.any_orders_in_bucket(
4399 &self.index.orders_active_local,
4400 venue,
4401 instrument_id,
4402 strategy_id,
4403 account_id,
4404 side,
4405 )
4406 }
4407
4408 #[must_use]
4410 pub fn has_orders_emulated(
4411 &self,
4412 venue: Option<&Venue>,
4413 instrument_id: Option<&InstrumentId>,
4414 strategy_id: Option<&StrategyId>,
4415 account_id: Option<&AccountId>,
4416 side: Option<OrderSide>,
4417 ) -> bool {
4418 self.any_orders_in_bucket(
4419 &self.index.orders_emulated,
4420 venue,
4421 instrument_id,
4422 strategy_id,
4423 account_id,
4424 side,
4425 )
4426 }
4427
4428 #[must_use]
4430 pub fn has_orders_inflight(
4431 &self,
4432 venue: Option<&Venue>,
4433 instrument_id: Option<&InstrumentId>,
4434 strategy_id: Option<&StrategyId>,
4435 account_id: Option<&AccountId>,
4436 side: Option<OrderSide>,
4437 ) -> bool {
4438 self.any_orders_in_bucket(
4439 &self.index.orders_inflight,
4440 venue,
4441 instrument_id,
4442 strategy_id,
4443 account_id,
4444 side,
4445 )
4446 }
4447
4448 #[must_use]
4450 pub fn has_orders(
4451 &self,
4452 venue: Option<&Venue>,
4453 instrument_id: Option<&InstrumentId>,
4454 strategy_id: Option<&StrategyId>,
4455 account_id: Option<&AccountId>,
4456 side: Option<OrderSide>,
4457 ) -> bool {
4458 self.any_orders_in_bucket(
4459 &self.index.orders,
4460 venue,
4461 instrument_id,
4462 strategy_id,
4463 account_id,
4464 side,
4465 )
4466 }
4467
4468 #[must_use]
4470 pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
4471 self.order_lists.get(order_list_id)
4472 }
4473
4474 #[must_use]
4476 pub fn order_lists(
4477 &self,
4478 venue: Option<&Venue>,
4479 instrument_id: Option<&InstrumentId>,
4480 strategy_id: Option<&StrategyId>,
4481 account_id: Option<&AccountId>,
4482 ) -> Vec<&OrderList> {
4483 let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
4484
4485 if let Some(venue) = venue {
4486 order_lists.retain(|ol| &ol.instrument_id.venue == venue);
4487 }
4488
4489 if let Some(instrument_id) = instrument_id {
4490 order_lists.retain(|ol| &ol.instrument_id == instrument_id);
4491 }
4492
4493 if let Some(strategy_id) = strategy_id {
4494 order_lists.retain(|ol| &ol.strategy_id == strategy_id);
4495 }
4496
4497 if let Some(account_id) = account_id {
4498 order_lists.retain(|ol| {
4499 ol.client_order_ids.iter().any(|client_order_id| {
4500 self.orders.get(client_order_id).is_some_and(|order_cell| {
4501 order_cell.borrow().account_id().as_ref() == Some(account_id)
4502 })
4503 })
4504 });
4505 }
4506
4507 order_lists
4508 }
4509
4510 #[must_use]
4512 pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
4513 self.order_lists.contains_key(order_list_id)
4514 }
4515
4516 #[must_use]
4521 pub fn orders_for_exec_algorithm(
4522 &self,
4523 exec_algorithm_id: &ExecAlgorithmId,
4524 venue: Option<&Venue>,
4525 instrument_id: Option<&InstrumentId>,
4526 strategy_id: Option<&StrategyId>,
4527 account_id: Option<&AccountId>,
4528 side: Option<OrderSide>,
4529 ) -> Vec<OrderRef<'_>> {
4530 let Some(exec_algorithm_order_ids) =
4531 self.index.exec_algorithm_orders.get(exec_algorithm_id)
4532 else {
4533 return Vec::new();
4534 };
4535
4536 let filtered = self.query_orders_in_bucket(
4537 exec_algorithm_order_ids,
4538 venue,
4539 instrument_id,
4540 strategy_id,
4541 account_id,
4542 );
4543 self.get_orders_for_ids(&filtered, side)
4544 }
4545
4546 #[must_use]
4548 pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<OrderRef<'_>> {
4549 match self.index.exec_spawn_orders.get(exec_spawn_id) {
4550 Some(ids) => self.get_orders_for_ids(ids, None),
4551 None => Vec::new(),
4552 }
4553 }
4554
4555 #[must_use]
4557 pub fn exec_spawn_total_quantity(
4558 &self,
4559 exec_spawn_id: &ClientOrderId,
4560 active_only: bool,
4561 ) -> Option<Quantity> {
4562 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
4563
4564 let mut total_quantity: Option<Quantity> = None;
4565
4566 for spawn_order in exec_spawn_orders {
4567 if active_only && spawn_order.is_closed() {
4568 continue;
4569 }
4570
4571 match total_quantity.as_mut() {
4572 Some(total) => *total = *total + spawn_order.quantity(),
4573 None => total_quantity = Some(spawn_order.quantity()),
4574 }
4575 }
4576
4577 total_quantity
4578 }
4579
4580 #[must_use]
4582 pub fn exec_spawn_total_filled_qty(
4583 &self,
4584 exec_spawn_id: &ClientOrderId,
4585 active_only: bool,
4586 ) -> Option<Quantity> {
4587 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
4588
4589 let mut total_quantity: Option<Quantity> = None;
4590
4591 for spawn_order in exec_spawn_orders {
4592 if active_only && spawn_order.is_closed() {
4593 continue;
4594 }
4595
4596 match total_quantity.as_mut() {
4597 Some(total) => *total = *total + spawn_order.filled_qty(),
4598 None => total_quantity = Some(spawn_order.filled_qty()),
4599 }
4600 }
4601
4602 total_quantity
4603 }
4604
4605 #[must_use]
4607 pub fn exec_spawn_total_leaves_qty(
4608 &self,
4609 exec_spawn_id: &ClientOrderId,
4610 active_only: bool,
4611 ) -> Option<Quantity> {
4612 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
4613
4614 let mut total_quantity: Option<Quantity> = None;
4615
4616 for spawn_order in exec_spawn_orders {
4617 if active_only && spawn_order.is_closed() {
4618 continue;
4619 }
4620
4621 match total_quantity.as_mut() {
4622 Some(total) => *total = *total + spawn_order.leaves_qty(),
4623 None => total_quantity = Some(spawn_order.leaves_qty()),
4624 }
4625 }
4626
4627 total_quantity
4628 }
4629
4630 #[must_use]
4634 pub fn position(&self, position_id: &PositionId) -> Option<PositionRef<'_>> {
4635 self.positions
4636 .get(position_id)
4637 .map(|position_cell| PositionRef::new(position_cell.borrow()))
4638 }
4639
4640 #[must_use]
4650 pub fn position_mut(&mut self, position_id: &PositionId) -> Option<PositionRefMut<'_>> {
4651 self.positions
4652 .get(position_id)
4653 .map(|position_cell| PositionRefMut::new(position_cell.borrow_mut()))
4654 }
4655
4656 #[must_use]
4661 pub fn position_owned(&self, position_id: &PositionId) -> Option<Position> {
4662 self.positions
4663 .get(position_id)
4664 .map(|position_cell| position_cell.borrow().clone())
4665 }
4666
4667 #[must_use]
4669 pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<PositionRef<'_>> {
4670 self.index
4671 .order_position
4672 .get(client_order_id)
4673 .and_then(|position_id| self.positions.get(position_id))
4674 .map(|position_cell| PositionRef::new(position_cell.borrow()))
4675 }
4676
4677 #[must_use]
4679 pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
4680 self.index.order_position.get(client_order_id)
4681 }
4682
4683 #[must_use]
4689 pub fn positions(
4690 &self,
4691 venue: Option<&Venue>,
4692 instrument_id: Option<&InstrumentId>,
4693 strategy_id: Option<&StrategyId>,
4694 account_id: Option<&AccountId>,
4695 side: Option<PositionSide>,
4696 ) -> Vec<PositionRef<'_>> {
4697 let position_ids = self.position_ids(venue, instrument_id, strategy_id, account_id);
4698 self.get_positions_for_ids(&position_ids, side)
4699 }
4700
4701 #[must_use]
4703 pub fn positions_open(
4704 &self,
4705 venue: Option<&Venue>,
4706 instrument_id: Option<&InstrumentId>,
4707 strategy_id: Option<&StrategyId>,
4708 account_id: Option<&AccountId>,
4709 side: Option<PositionSide>,
4710 ) -> Vec<PositionRef<'_>> {
4711 let position_ids = self.position_open_ids(venue, instrument_id, strategy_id, account_id);
4712 self.get_positions_for_ids(&position_ids, side)
4713 }
4714
4715 #[must_use]
4717 pub fn positions_closed(
4718 &self,
4719 venue: Option<&Venue>,
4720 instrument_id: Option<&InstrumentId>,
4721 strategy_id: Option<&StrategyId>,
4722 account_id: Option<&AccountId>,
4723 side: Option<PositionSide>,
4724 ) -> Vec<PositionRef<'_>> {
4725 let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id, account_id);
4726 self.get_positions_for_ids(&position_ids, side)
4727 }
4728
4729 #[must_use]
4731 pub fn position_exists(&self, position_id: &PositionId) -> bool {
4732 self.index.positions.contains(position_id)
4733 }
4734
4735 #[must_use]
4737 pub fn is_position_open(&self, position_id: &PositionId) -> bool {
4738 self.index.positions_open.contains(position_id)
4739 }
4740
4741 #[must_use]
4743 pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
4744 self.index.positions_closed.contains(position_id)
4745 }
4746
4747 #[must_use]
4749 pub fn positions_open_count(
4750 &self,
4751 venue: Option<&Venue>,
4752 instrument_id: Option<&InstrumentId>,
4753 strategy_id: Option<&StrategyId>,
4754 account_id: Option<&AccountId>,
4755 side: Option<PositionSide>,
4756 ) -> usize {
4757 self.count_positions_in_bucket(
4758 &self.index.positions_open,
4759 venue,
4760 instrument_id,
4761 strategy_id,
4762 account_id,
4763 side,
4764 )
4765 }
4766
4767 #[must_use]
4769 pub fn positions_closed_count(
4770 &self,
4771 venue: Option<&Venue>,
4772 instrument_id: Option<&InstrumentId>,
4773 strategy_id: Option<&StrategyId>,
4774 account_id: Option<&AccountId>,
4775 side: Option<PositionSide>,
4776 ) -> usize {
4777 self.count_positions_in_bucket(
4778 &self.index.positions_closed,
4779 venue,
4780 instrument_id,
4781 strategy_id,
4782 account_id,
4783 side,
4784 )
4785 }
4786
4787 #[must_use]
4789 pub fn positions_total_count(
4790 &self,
4791 venue: Option<&Venue>,
4792 instrument_id: Option<&InstrumentId>,
4793 strategy_id: Option<&StrategyId>,
4794 account_id: Option<&AccountId>,
4795 side: Option<PositionSide>,
4796 ) -> usize {
4797 self.count_positions_in_bucket(
4798 &self.index.positions,
4799 venue,
4800 instrument_id,
4801 strategy_id,
4802 account_id,
4803 side,
4804 )
4805 }
4806
4807 #[must_use]
4813 pub fn has_positions_open(
4814 &self,
4815 venue: Option<&Venue>,
4816 instrument_id: Option<&InstrumentId>,
4817 strategy_id: Option<&StrategyId>,
4818 account_id: Option<&AccountId>,
4819 side: Option<PositionSide>,
4820 ) -> bool {
4821 self.any_positions_in_bucket(
4822 &self.index.positions_open,
4823 venue,
4824 instrument_id,
4825 strategy_id,
4826 account_id,
4827 side,
4828 )
4829 }
4830
4831 #[must_use]
4833 pub fn has_positions_closed(
4834 &self,
4835 venue: Option<&Venue>,
4836 instrument_id: Option<&InstrumentId>,
4837 strategy_id: Option<&StrategyId>,
4838 account_id: Option<&AccountId>,
4839 side: Option<PositionSide>,
4840 ) -> bool {
4841 self.any_positions_in_bucket(
4842 &self.index.positions_closed,
4843 venue,
4844 instrument_id,
4845 strategy_id,
4846 account_id,
4847 side,
4848 )
4849 }
4850
4851 #[must_use]
4853 pub fn has_positions(
4854 &self,
4855 venue: Option<&Venue>,
4856 instrument_id: Option<&InstrumentId>,
4857 strategy_id: Option<&StrategyId>,
4858 account_id: Option<&AccountId>,
4859 side: Option<PositionSide>,
4860 ) -> bool {
4861 self.any_positions_in_bucket(
4862 &self.index.positions,
4863 venue,
4864 instrument_id,
4865 strategy_id,
4866 account_id,
4867 side,
4868 )
4869 }
4870
4871 #[must_use]
4875 pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
4876 self.index.order_strategy.get(client_order_id)
4877 }
4878
4879 #[must_use]
4881 pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
4882 self.index.position_strategy.get(position_id)
4883 }
4884
4885 pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
4893 check_valid_string_ascii(key, stringify!(key))?;
4894
4895 Ok(self.general.get(key))
4896 }
4897
4898 #[must_use]
4902 pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
4903 match price_type {
4904 PriceType::Bid => self
4905 .quotes
4906 .get(instrument_id)
4907 .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
4908 PriceType::Ask => self
4909 .quotes
4910 .get(instrument_id)
4911 .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
4912 PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
4913 quotes.front().map(|quote| {
4914 Price::new(
4915 f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
4916 quote.bid_price.precision + 1,
4917 )
4918 })
4919 }),
4920 PriceType::Last => self
4921 .trades
4922 .get(instrument_id)
4923 .and_then(|trades| trades.front().map(|trade| trade.price)),
4924 PriceType::Mark => self
4925 .mark_prices
4926 .get(instrument_id)
4927 .and_then(|marks| marks.front().map(|mark| mark.value)),
4928 }
4929 }
4930
4931 #[must_use]
4933 pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
4934 self.quotes
4935 .get(instrument_id)
4936 .map(|quotes| quotes.iter().copied().collect())
4937 }
4938
4939 #[must_use]
4941 pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
4942 self.trades
4943 .get(instrument_id)
4944 .map(|trades| trades.iter().copied().collect())
4945 }
4946
4947 #[must_use]
4949 pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
4950 self.mark_prices
4951 .get(instrument_id)
4952 .map(|mark_prices| mark_prices.iter().copied().collect())
4953 }
4954
4955 #[must_use]
4957 pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
4958 self.index_prices
4959 .get(instrument_id)
4960 .map(|index_prices| index_prices.iter().copied().collect())
4961 }
4962
4963 #[must_use]
4965 pub fn funding_rates(&self, instrument_id: &InstrumentId) -> Option<Vec<FundingRateUpdate>> {
4966 self.funding_rates
4967 .get(instrument_id)
4968 .map(|funding_rates| funding_rates.iter().copied().collect())
4969 }
4970
4971 #[must_use]
4973 pub fn instrument_statuses(
4974 &self,
4975 instrument_id: &InstrumentId,
4976 ) -> Option<Vec<InstrumentStatus>> {
4977 self.instrument_statuses
4978 .get(instrument_id)
4979 .map(|statuses| statuses.iter().copied().collect())
4980 }
4981
4982 #[must_use]
4984 pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
4985 self.bars
4986 .get(bar_type)
4987 .map(|bars| bars.iter().copied().collect())
4988 }
4989
4990 #[must_use]
4992 pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
4993 self.books.get(instrument_id)
4994 }
4995
4996 #[must_use]
4998 pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
4999 self.books.get_mut(instrument_id)
5000 }
5001
5002 #[must_use]
5004 pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
5005 self.own_books.get(instrument_id)
5006 }
5007
5008 #[must_use]
5010 pub fn own_order_book_mut(
5011 &mut self,
5012 instrument_id: &InstrumentId,
5013 ) -> Option<&mut OwnOrderBook> {
5014 self.own_books.get_mut(instrument_id)
5015 }
5016
5017 #[must_use]
5019 pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
5020 self.quotes
5021 .get(instrument_id)
5022 .and_then(|quotes| quotes.front())
5023 }
5024
5025 #[must_use]
5029 pub fn quote_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<&QuoteTick> {
5030 self.quotes
5031 .get(instrument_id)
5032 .and_then(|quotes| quotes.get(index))
5033 }
5034
5035 #[must_use]
5037 pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
5038 self.trades
5039 .get(instrument_id)
5040 .and_then(|trades| trades.front())
5041 }
5042
5043 #[must_use]
5047 pub fn trade_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<&TradeTick> {
5048 self.trades
5049 .get(instrument_id)
5050 .and_then(|trades| trades.get(index))
5051 }
5052
5053 #[must_use]
5055 pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
5056 self.mark_prices
5057 .get(instrument_id)
5058 .and_then(|mark_prices| mark_prices.front())
5059 }
5060
5061 #[must_use]
5063 pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
5064 self.index_prices
5065 .get(instrument_id)
5066 .and_then(|index_prices| index_prices.front())
5067 }
5068
5069 #[must_use]
5071 pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
5072 self.funding_rates
5073 .get(instrument_id)
5074 .and_then(|funding_rates| funding_rates.front())
5075 }
5076
5077 #[must_use]
5079 pub fn instrument_status(&self, instrument_id: &InstrumentId) -> Option<&InstrumentStatus> {
5080 self.instrument_statuses
5081 .get(instrument_id)
5082 .and_then(|statuses| statuses.front())
5083 }
5084
5085 #[must_use]
5087 pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
5088 self.bars.get(bar_type).and_then(|bars| bars.front())
5089 }
5090
5091 #[must_use]
5095 pub fn bar_at_index(&self, bar_type: &BarType, index: usize) -> Option<&Bar> {
5096 self.bars.get(bar_type).and_then(|bars| bars.get(index))
5097 }
5098
5099 #[must_use]
5101 pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
5102 self.books
5103 .get(instrument_id)
5104 .map_or(0, |book| book.update_count) as usize
5105 }
5106
5107 #[must_use]
5109 pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
5110 self.quotes
5111 .get(instrument_id)
5112 .map_or(0, BoundedVecDeque::len)
5113 }
5114
5115 #[must_use]
5117 pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
5118 self.trades
5119 .get(instrument_id)
5120 .map_or(0, BoundedVecDeque::len)
5121 }
5122
5123 #[must_use]
5125 pub fn bar_count(&self, bar_type: &BarType) -> usize {
5126 self.bars.get(bar_type).map_or(0, BoundedVecDeque::len)
5127 }
5128
5129 #[must_use]
5131 pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
5132 self.books.contains_key(instrument_id)
5133 }
5134
5135 #[must_use]
5137 pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
5138 self.quote_count(instrument_id) > 0
5139 }
5140
5141 #[must_use]
5143 pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
5144 self.trade_count(instrument_id) > 0
5145 }
5146
5147 #[must_use]
5149 pub fn has_bars(&self, bar_type: &BarType) -> bool {
5150 self.bar_count(bar_type) > 0
5151 }
5152
5153 #[must_use]
5154 pub fn get_xrate(
5155 &self,
5156 venue: Venue,
5157 from_currency: Currency,
5158 to_currency: Currency,
5159 price_type: PriceType,
5160 ) -> Option<Decimal> {
5161 if from_currency == to_currency {
5162 return Some(Decimal::ONE);
5165 }
5166
5167 let (bid_quote, ask_quote) = self.build_quote_table(&venue);
5168
5169 match get_exchange_rate(
5170 from_currency.code,
5171 to_currency.code,
5172 price_type,
5173 bid_quote,
5174 ask_quote,
5175 ) {
5176 Ok(rate) => rate,
5177 Err(e) => {
5178 log::error!("Failed to calculate xrate: {e}");
5179 None
5180 }
5181 }
5182 }
5183
5184 fn build_quote_table(
5185 &self,
5186 venue: &Venue,
5187 ) -> (AHashMap<Ustr, Decimal>, AHashMap<Ustr, Decimal>) {
5188 let mut bid_quotes = AHashMap::new();
5189 let mut ask_quotes = AHashMap::new();
5190
5191 for instrument_id in self.instruments.keys() {
5192 if instrument_id.venue != *venue {
5193 continue;
5194 }
5195
5196 let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
5197 if let Some(tick) = ticks.front() {
5198 (tick.bid_price, tick.ask_price)
5199 } else {
5200 continue; }
5202 } else {
5203 let bid_bar = self
5204 .bars
5205 .iter()
5206 .find(|(k, _)| {
5207 k.instrument_id() == *instrument_id
5208 && matches!(k.spec().price_type, PriceType::Bid)
5209 })
5210 .map(|(_, v)| v);
5211
5212 let ask_bar = self
5213 .bars
5214 .iter()
5215 .find(|(k, _)| {
5216 k.instrument_id() == *instrument_id
5217 && matches!(k.spec().price_type, PriceType::Ask)
5218 })
5219 .map(|(_, v)| v);
5220
5221 match (bid_bar, ask_bar) {
5222 (Some(bid), Some(ask)) => {
5223 match (bid.front(), ask.front()) {
5224 (Some(bid_bar), Some(ask_bar)) => (bid_bar.close, ask_bar.close),
5225 _ => {
5226 continue;
5228 }
5229 }
5230 }
5231 _ => continue,
5232 }
5233 };
5234
5235 bid_quotes.insert(instrument_id.symbol.inner(), bid_price.as_decimal());
5236 ask_quotes.insert(instrument_id.symbol.inner(), ask_price.as_decimal());
5237 }
5238
5239 (bid_quotes, ask_quotes)
5240 }
5241
5242 #[must_use]
5244 pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
5245 self.mark_xrates.get(&(from_currency, to_currency)).copied()
5246 }
5247
5248 pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
5254 assert!(xrate > 0.0, "xrate was zero");
5255 self.mark_xrates.insert((from_currency, to_currency), xrate);
5256 self.mark_xrates
5257 .insert((to_currency, from_currency), 1.0 / xrate);
5258 }
5259
5260 pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
5262 let _ = self.mark_xrates.remove(&(from_currency, to_currency));
5263 }
5264
5265 pub fn clear_mark_xrates(&mut self) {
5267 self.mark_xrates.clear();
5268 }
5269
5270 #[must_use]
5274 pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
5275 self.instruments.get(instrument_id)
5276 }
5277
5278 #[must_use]
5280 pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
5281 match venue {
5282 Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
5283 None => self.instruments.keys().collect(),
5284 }
5285 }
5286
5287 #[must_use]
5289 pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
5290 self.instruments
5291 .values()
5292 .filter(|i| &i.id().venue == venue)
5293 .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
5294 .collect()
5295 }
5296
5297 #[must_use]
5304 pub fn instruments_by_parent(
5305 &self,
5306 venue: &Venue,
5307 root: &Ustr,
5308 class: InstrumentClass,
5309 ) -> Vec<&InstrumentAny> {
5310 self.instruments
5311 .values()
5312 .filter(|i| &i.id().venue == venue)
5313 .filter(|i| i.underlying() == Some(*root))
5314 .filter(|i| i.instrument_class() == class)
5315 .collect()
5316 }
5317
5318 #[must_use]
5320 pub fn bar_types(
5321 &self,
5322 instrument_id: Option<&InstrumentId>,
5323 price_type: Option<&PriceType>,
5324 aggregation_source: AggregationSource,
5325 ) -> Vec<&BarType> {
5326 let mut bar_types = self
5327 .bars
5328 .keys()
5329 .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
5330 .collect::<Vec<&BarType>>();
5331
5332 if let Some(instrument_id) = instrument_id {
5333 bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
5334 }
5335
5336 if let Some(price_type) = price_type {
5337 bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
5338 }
5339
5340 bar_types
5341 }
5342
5343 #[must_use]
5347 pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
5348 self.synthetics.get(instrument_id)
5349 }
5350
5351 #[must_use]
5353 pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
5354 self.synthetics.keys().collect()
5355 }
5356
5357 #[must_use]
5359 pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
5360 self.synthetics.values().collect()
5361 }
5362
5363 #[must_use]
5367 pub fn account(&self, account_id: &AccountId) -> Option<AccountRef<'_>> {
5368 self.accounts
5369 .get(account_id)
5370 .map(|account_cell| AccountRef::new(account_cell.borrow()))
5371 }
5372
5373 #[must_use]
5383 pub fn account_mut(&mut self, account_id: &AccountId) -> Option<AccountRefMut<'_>> {
5384 self.accounts
5385 .get(account_id)
5386 .map(|account_cell| AccountRefMut::new(account_cell.borrow_mut()))
5387 }
5388
5389 #[must_use]
5394 pub fn account_owned(&self, account_id: &AccountId) -> Option<AccountAny> {
5395 self.accounts
5396 .get(account_id)
5397 .map(|account_cell| account_cell.borrow().clone())
5398 }
5399
5400 #[must_use]
5402 pub fn account_for_venue(&self, venue: &Venue) -> Option<AccountRef<'_>> {
5403 self.index
5404 .venue_account
5405 .get(venue)
5406 .and_then(|account_id| self.accounts.get(account_id))
5407 .map(|account_cell| AccountRef::new(account_cell.borrow()))
5408 }
5409
5410 #[must_use]
5415 pub fn account_for_venue_owned(&self, venue: &Venue) -> Option<AccountAny> {
5416 self.index
5417 .venue_account
5418 .get(venue)
5419 .and_then(|account_id| self.accounts.get(account_id))
5420 .map(|account_cell| account_cell.borrow().clone())
5421 }
5422
5423 #[must_use]
5425 pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
5426 self.index.venue_account.get(venue)
5427 }
5428
5429 #[must_use]
5435 pub fn accounts(&self, account_id: &AccountId) -> Vec<AccountRef<'_>> {
5436 self.accounts
5437 .values()
5438 .filter(|account_cell| &account_cell.borrow().id() == account_id)
5439 .map(|account_cell| AccountRef::new(account_cell.borrow()))
5440 .collect()
5441 }
5442
5443 pub fn update_own_order_book(&mut self, order: &OrderAny) {
5451 if !order.has_price() {
5452 return;
5453 }
5454
5455 let instrument_id = order.instrument_id();
5456
5457 if !self.own_books.contains_key(&instrument_id) {
5458 if order.is_closed() {
5459 return;
5460 }
5461
5462 self.own_books
5463 .insert(instrument_id, OwnOrderBook::new(instrument_id));
5464 }
5465
5466 let Some(own_book) = self.own_books.get_mut(&instrument_id) else {
5467 return;
5468 };
5469
5470 let own_book_order = order.to_own_book_order();
5471
5472 if order.is_closed() {
5473 if let Err(e) = own_book.delete(own_book_order) {
5474 log::debug!(
5475 "Failed to delete order {} from own book: {e}",
5476 order.client_order_id(),
5477 );
5478 } else {
5479 log::debug!("Deleted order {} from own book", order.client_order_id());
5480 }
5481 } else {
5482 if let Err(e) = own_book.update(own_book_order) {
5484 log::debug!(
5485 "Failed to update order {} in own book: {e}; inserting instead",
5486 order.client_order_id(),
5487 );
5488 own_book.add(own_book_order);
5489 }
5490 log::debug!("Updated order {} in own book", order.client_order_id());
5491 }
5492 }
5493
5494 pub fn force_remove_from_own_order_book(&mut self, client_order_id: &ClientOrderId) {
5500 let Some(order_cell) = self.orders.get(client_order_id) else {
5501 return;
5502 };
5503 let order = order_cell.borrow();
5504 let instrument_id = order.instrument_id();
5505 let own_book_order = if order.has_price() {
5506 Some(order.to_own_book_order())
5507 } else {
5508 None
5509 };
5510 drop(order);
5511
5512 self.index.orders_open.remove(client_order_id);
5513 self.index.orders_pending_cancel.remove(client_order_id);
5514 self.index.orders_inflight.remove(client_order_id);
5515 self.index.orders_emulated.remove(client_order_id);
5516 self.index.orders_active_local.remove(client_order_id);
5517
5518 if let Some(own_book) = self.own_books.get_mut(&instrument_id)
5519 && let Some(own_book_order) = own_book_order
5520 {
5521 if let Err(e) = own_book.delete(own_book_order) {
5522 log::debug!("Could not force delete {client_order_id} from own book: {e}");
5523 } else {
5524 log::debug!("Force deleted {client_order_id} from own book");
5525 }
5526 }
5527
5528 self.index.orders_closed.insert(*client_order_id);
5529 }
5530
5531 pub fn audit_own_order_books(&mut self) {
5538 log::debug!("Starting own books audit");
5539 let start = std::time::Instant::now();
5540
5541 let valid_order_ids: AHashSet<ClientOrderId> = self
5544 .index
5545 .orders_open
5546 .union(&self.index.orders_inflight)
5547 .copied()
5548 .collect();
5549
5550 for own_book in self.own_books.values_mut() {
5551 own_book.audit_open_orders(&valid_order_ids);
5552 }
5553
5554 log::debug!("Completed own books audit in {:?}", start.elapsed());
5555 }
5556}
5557
5558fn parse_position_snapshot_blob_ref(blob_ref: &str) -> anyhow::Result<(PositionId, usize)> {
5559 let Some(rest) = blob_ref.strip_prefix("cache://position-snapshots/") else {
5560 anyhow::bail!("unsupported cache snapshot blob_ref {blob_ref}");
5561 };
5562
5563 let Some((position_id, snapshot_index)) = rest.rsplit_once('/') else {
5564 anyhow::bail!("malformed position snapshot blob_ref {blob_ref}");
5565 };
5566
5567 if position_id.is_empty() {
5568 anyhow::bail!("position snapshot blob_ref {blob_ref} has empty position id");
5569 }
5570
5571 let snapshot_index = snapshot_index.parse::<usize>().map_err(|e| {
5572 anyhow::anyhow!("position snapshot blob_ref {blob_ref} has invalid frame index: {e}")
5573 })?;
5574
5575 Ok((PositionId::new(position_id), snapshot_index))
5576}
5577
5578fn validate_position_snapshot_blob(position_id: &PositionId, blob: &[u8]) -> anyhow::Result<()> {
5579 let snapshot = serde_json::from_slice::<Position>(blob)?;
5580 let expected_prefix = format!("{}-", position_id.as_str());
5581
5582 let Some(snapshot_uuid) = snapshot.id.as_str().strip_prefix(&expected_prefix) else {
5583 anyhow::bail!(
5584 "position snapshot id {} does not match blob_ref position {position_id}",
5585 snapshot.id
5586 );
5587 };
5588
5589 if UUID4::from_str(snapshot_uuid).is_err() {
5590 anyhow::bail!(
5591 "position snapshot id {} does not match blob_ref position {position_id}",
5592 snapshot.id
5593 );
5594 }
5595
5596 Ok(())
5597}