1pub mod config;
21pub mod database;
22pub mod fifo;
23pub mod quote;
24pub mod refs;
25
26mod index;
27
28#[cfg(test)]
29mod tests;
30
31use std::{
32 borrow::Cow,
33 cell::{Ref, RefCell},
34 collections::VecDeque,
35 fmt::{Debug, Display},
36 rc::Rc,
37 str::FromStr,
38 time::{SystemTime, UNIX_EPOCH},
39};
40
41use ahash::{AHashMap, AHashSet};
42use bytes::Bytes;
43pub use config::CacheConfig; use database::{CacheDatabaseAdapter, CacheMap};
45use index::CacheIndex;
46use nautilus_core::{
47 SharedCell, UUID4, UnixNanos,
48 correctness::{
49 check_key_not_in_map, check_predicate_false, check_slice_not_empty,
50 check_valid_string_ascii,
51 },
52 datetime::secs_to_nanos_unchecked,
53};
54use nautilus_model::{
55 accounts::{Account, AccountAny},
56 data::{
57 Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, InstrumentStatus,
58 MarkPriceUpdate, QuoteTick, TradeTick, YieldCurveData, option_chain::OptionGreeks,
59 },
60 enums::{
61 AggregationSource, ContingencyType, InstrumentClass, OmsType, OrderSide, PositionSide,
62 PriceType, TriggerType,
63 },
64 events::{AccountState, OrderEventAny},
65 identifiers::{
66 AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
67 OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
68 },
69 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
70 orderbook::{
71 OrderBook,
72 own::{OwnOrderBook, should_handle_own_book_order},
73 },
74 orders::{Order, OrderAny, OrderError, OrderList},
75 position::Position,
76 types::{Currency, Money, Price, Quantity},
77};
78pub use refs::{AccountRef, AccountRefMut, OrderRef, OrderRefMut, PositionRef, PositionRefMut};
79use ustr::Ustr;
80
81use crate::xrate::get_exchange_rate;
82
83#[derive(Clone, Debug, PartialEq, Eq)]
88pub struct CacheSnapshotRef {
89 pub blob_ref: String,
91 pub blob: Bytes,
93}
94
95impl CacheSnapshotRef {
96 #[must_use]
98 pub fn new(blob_ref: impl Into<String>, blob: impl Into<Bytes>) -> Self {
99 Self {
100 blob_ref: blob_ref.into(),
101 blob: blob.into(),
102 }
103 }
104}
105
106#[derive(Clone, Debug)]
111pub struct CacheView {
112 inner: Rc<RefCell<Cache>>,
113}
114
115impl CacheView {
116 #[must_use]
118 pub fn new(inner: Rc<RefCell<Cache>>) -> Self {
119 Self { inner }
120 }
121
122 pub fn borrow(&self) -> Ref<'_, Cache> {
128 self.inner.borrow()
129 }
130}
131
132impl From<Rc<RefCell<Cache>>> for CacheView {
133 fn from(inner: Rc<RefCell<Cache>>) -> Self {
134 Self::new(inner)
135 }
136}
137
138enum FilterSources<'a, K> {
145 Unfiltered,
146 Empty,
147 Sets(Vec<&'a AHashSet<K>>),
148}
149
150fn intersect_filter_sources<K>(mut sources: Vec<&AHashSet<K>>) -> AHashSet<K>
156where
157 K: Copy + Eq + std::hash::Hash,
158{
159 debug_assert!(!sources.is_empty());
160 sources.sort_unstable_by_key(|s| s.len());
161 let driver = sources[0];
162 let rest = &sources[1..];
163
164 if rest.is_empty() {
165 return driver.clone();
166 }
167
168 driver
169 .iter()
170 .filter(|id| rest.iter().all(|s| s.contains(id)))
171 .copied()
172 .collect()
173}
174
175fn intersect_pair_or_many<'a, K>(
183 bucket: &'a AHashSet<K>,
184 mut sources: Vec<&'a AHashSet<K>>,
185) -> AHashSet<K>
186where
187 K: Copy + Eq + std::hash::Hash,
188{
189 debug_assert!(!sources.is_empty());
190 if sources.len() == 1 {
191 let filter = sources[0];
192 let (larger, smaller) = if bucket.len() >= filter.len() {
193 (bucket, filter)
194 } else {
195 (filter, bucket)
196 };
197 return larger.intersection(smaller).copied().collect();
198 }
199
200 sources.push(bucket);
201 intersect_filter_sources(sources)
202}
203
204#[cfg_attr(
206 feature = "python",
207 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
208)]
209pub struct Cache {
210 config: CacheConfig,
211 index: CacheIndex,
212 database: Option<Box<dyn CacheDatabaseAdapter>>,
213 general: AHashMap<String, Bytes>,
214 currencies: AHashMap<Ustr, Currency>,
215 instruments: AHashMap<InstrumentId, InstrumentAny>,
216 synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
217 books: AHashMap<InstrumentId, OrderBook>,
218 own_books: AHashMap<InstrumentId, OwnOrderBook>,
219 quotes: AHashMap<InstrumentId, VecDeque<QuoteTick>>,
220 trades: AHashMap<InstrumentId, VecDeque<TradeTick>>,
221 mark_xrates: AHashMap<(Currency, Currency), f64>,
222 mark_prices: AHashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
223 index_prices: AHashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
224 funding_rates: AHashMap<InstrumentId, VecDeque<FundingRateUpdate>>,
225 instrument_statuses: AHashMap<InstrumentId, VecDeque<InstrumentStatus>>,
226 bars: AHashMap<BarType, VecDeque<Bar>>,
227 greeks: AHashMap<InstrumentId, GreeksData>,
228 option_greeks: AHashMap<InstrumentId, OptionGreeks>,
229 yield_curves: AHashMap<String, YieldCurveData>,
230 accounts: AHashMap<AccountId, SharedCell<AccountAny>>,
231 orders: AHashMap<ClientOrderId, SharedCell<OrderAny>>,
232 order_lists: AHashMap<OrderListId, OrderList>,
233 positions: AHashMap<PositionId, SharedCell<Position>>,
234 position_snapshots: AHashMap<PositionId, Vec<Bytes>>,
235 #[cfg(feature = "defi")]
236 pub(crate) defi: crate::defi::cache::DefiCache,
237}
238
239impl Debug for Cache {
240 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241 f.debug_struct(stringify!(Cache))
242 .field("config", &self.config)
243 .field("index", &self.index)
244 .field("general", &self.general)
245 .field("currencies", &self.currencies)
246 .field("instruments", &self.instruments)
247 .field("synthetics", &self.synthetics)
248 .field("books", &self.books)
249 .field("own_books", &self.own_books)
250 .field("quotes", &self.quotes)
251 .field("trades", &self.trades)
252 .field("mark_xrates", &self.mark_xrates)
253 .field("mark_prices", &self.mark_prices)
254 .field("index_prices", &self.index_prices)
255 .field("funding_rates", &self.funding_rates)
256 .field("instrument_statuses", &self.instrument_statuses)
257 .field("bars", &self.bars)
258 .field("greeks", &self.greeks)
259 .field("option_greeks", &self.option_greeks)
260 .field("yield_curves", &self.yield_curves)
261 .field("accounts", &self.accounts)
262 .field("orders", &self.orders)
263 .field("order_lists", &self.order_lists)
264 .field("positions", &self.positions)
265 .field("position_snapshots", &self.position_snapshots)
266 .finish()
267 }
268}
269
270impl Default for Cache {
271 fn default() -> Self {
273 Self::new(Some(CacheConfig::default()), None)
274 }
275}
276
277impl Cache {
278 #[must_use]
280 pub fn new(
284 config: Option<CacheConfig>,
285 database: Option<Box<dyn CacheDatabaseAdapter>>,
286 ) -> Self {
287 Self {
288 config: config.unwrap_or_default(),
289 index: CacheIndex::default(),
290 database,
291 general: AHashMap::new(),
292 currencies: AHashMap::new(),
293 instruments: AHashMap::new(),
294 synthetics: AHashMap::new(),
295 books: AHashMap::new(),
296 own_books: AHashMap::new(),
297 quotes: AHashMap::new(),
298 trades: AHashMap::new(),
299 mark_xrates: AHashMap::new(),
300 mark_prices: AHashMap::new(),
301 index_prices: AHashMap::new(),
302 funding_rates: AHashMap::new(),
303 instrument_statuses: AHashMap::new(),
304 bars: AHashMap::new(),
305 greeks: AHashMap::new(),
306 option_greeks: AHashMap::new(),
307 yield_curves: AHashMap::new(),
308 accounts: AHashMap::new(),
309 orders: AHashMap::new(),
310 order_lists: AHashMap::new(),
311 positions: AHashMap::new(),
312 position_snapshots: AHashMap::new(),
313 #[cfg(feature = "defi")]
314 defi: crate::defi::cache::DefiCache::default(),
315 }
316 }
317
318 #[must_use]
320 pub fn memory_address(&self) -> String {
321 format!("{:?}", std::ptr::from_ref(self))
322 }
323
324 pub fn set_database(&mut self, database: Box<dyn CacheDatabaseAdapter>) {
328 let type_name = std::any::type_name_of_val(&*database);
329 log::info!("Cache database adapter set: {type_name}");
330 self.database = Some(database);
331 }
332
333 pub fn cache_general(&mut self) -> anyhow::Result<()> {
341 self.general = match &mut self.database {
342 Some(db) => db.load()?,
343 None => AHashMap::new(),
344 };
345
346 log::info!(
347 "Cached {} general object(s) from database",
348 self.general.len()
349 );
350 Ok(())
351 }
352
353 pub async fn cache_all(&mut self) -> anyhow::Result<()> {
359 let cache_map = match &self.database {
360 Some(db) => db.load_all().await?,
361 None => CacheMap::default(),
362 };
363
364 self.currencies = cache_map.currencies;
365 self.instruments = cache_map.instruments;
366 self.synthetics = cache_map.synthetics;
367 self.accounts = cache_map
368 .accounts
369 .into_iter()
370 .map(|(id, account)| (id, SharedCell::new(account)))
371 .collect();
372 self.orders = cache_map
373 .orders
374 .into_iter()
375 .map(|(id, order)| (id, SharedCell::new(order)))
376 .collect();
377 self.positions = cache_map
378 .positions
379 .into_iter()
380 .map(|(id, position)| (id, SharedCell::new(position)))
381 .collect();
382
383 self.assign_position_ids_to_contingencies();
384 Ok(())
385 }
386
387 pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
393 self.currencies = match &mut self.database {
394 Some(db) => db.load_currencies().await?,
395 None => AHashMap::new(),
396 };
397
398 log::info!("Cached {} currencies from database", self.general.len());
399 Ok(())
400 }
401
402 pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
408 self.instruments = match &mut self.database {
409 Some(db) => db.load_instruments().await?,
410 None => AHashMap::new(),
411 };
412
413 log::info!("Cached {} instruments from database", self.general.len());
414 Ok(())
415 }
416
417 pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
423 self.synthetics = match &mut self.database {
424 Some(db) => db.load_synthetics().await?,
425 None => AHashMap::new(),
426 };
427
428 log::info!(
429 "Cached {} synthetic instruments from database",
430 self.general.len()
431 );
432 Ok(())
433 }
434
435 pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
441 self.accounts = match &mut self.database {
442 Some(db) => db
443 .load_accounts()
444 .await?
445 .into_iter()
446 .map(|(id, account)| (id, SharedCell::new(account)))
447 .collect(),
448 None => AHashMap::new(),
449 };
450
451 log::info!(
452 "Cached {} synthetic instruments from database",
453 self.general.len()
454 );
455 Ok(())
456 }
457
458 pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
464 self.orders = match &mut self.database {
465 Some(db) => db
466 .load_orders()
467 .await?
468 .into_iter()
469 .map(|(id, order)| (id, SharedCell::new(order)))
470 .collect(),
471 None => AHashMap::new(),
472 };
473
474 log::info!("Cached {} orders from database", self.general.len());
475
476 self.assign_position_ids_to_contingencies();
477 Ok(())
478 }
479
480 pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
486 self.positions = match &mut self.database {
487 Some(db) => db
488 .load_positions()
489 .await?
490 .into_iter()
491 .map(|(id, position)| (id, SharedCell::new(position)))
492 .collect(),
493 None => AHashMap::new(),
494 };
495
496 log::info!("Cached {} positions from database", self.general.len());
497 Ok(())
498 }
499
500 pub fn build_index(&mut self) {
502 log::debug!("Building index");
503
504 for account_id in self.accounts.keys() {
506 self.index
507 .venue_account
508 .insert(account_id.get_issuer(), *account_id);
509 }
510
511 for (client_order_id, order_cell) in &self.orders {
513 let order = order_cell.borrow();
514 let instrument_id = order.instrument_id();
515 let venue = instrument_id.venue;
516 let strategy_id = order.strategy_id();
517
518 self.index
520 .venue_orders
521 .entry(venue)
522 .or_default()
523 .insert(*client_order_id);
524
525 if let Some(venue_order_id) = order.venue_order_id() {
527 self.index
528 .venue_order_ids
529 .insert(venue_order_id, *client_order_id);
530 }
531
532 if let Some(position_id) = order.position_id() {
534 self.index
535 .order_position
536 .insert(*client_order_id, position_id);
537 }
538
539 self.index
541 .order_strategy
542 .insert(*client_order_id, order.strategy_id());
543
544 self.index
546 .instrument_orders
547 .entry(instrument_id)
548 .or_default()
549 .insert(*client_order_id);
550
551 self.index
553 .strategy_orders
554 .entry(strategy_id)
555 .or_default()
556 .insert(*client_order_id);
557
558 if let Some(account_id) = order.account_id() {
560 self.index
561 .account_orders
562 .entry(account_id)
563 .or_default()
564 .insert(*client_order_id);
565 }
566
567 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
569 self.index
570 .exec_algorithm_orders
571 .entry(exec_algorithm_id)
572 .or_default()
573 .insert(*client_order_id);
574 }
575
576 if let Some(exec_spawn_id) = order.exec_spawn_id() {
578 self.index
579 .exec_spawn_orders
580 .entry(exec_spawn_id)
581 .or_default()
582 .insert(*client_order_id);
583 }
584
585 self.index.orders.insert(*client_order_id);
587
588 if order.is_active_local() {
590 self.index.orders_active_local.insert(*client_order_id);
591 }
592
593 if order.is_open() {
595 self.index.orders_open.insert(*client_order_id);
596 }
597
598 if order.is_closed() {
600 self.index.orders_closed.insert(*client_order_id);
601 }
602
603 if let Some(emulation_trigger) = order.emulation_trigger()
605 && emulation_trigger != TriggerType::NoTrigger
606 && !order.is_closed()
607 {
608 self.index.orders_emulated.insert(*client_order_id);
609 }
610
611 if order.is_inflight() {
613 self.index.orders_inflight.insert(*client_order_id);
614 }
615
616 self.index.strategies.insert(strategy_id);
618
619 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
621 self.index.exec_algorithms.insert(exec_algorithm_id);
622 }
623 }
624
625 for (position_id, position_cell) in &self.positions {
627 let position = position_cell.borrow();
628 let instrument_id = position.instrument_id;
629 let venue = instrument_id.venue;
630 let strategy_id = position.strategy_id;
631
632 self.index
634 .venue_positions
635 .entry(venue)
636 .or_default()
637 .insert(*position_id);
638
639 self.index
641 .position_strategy
642 .insert(*position_id, position.strategy_id);
643
644 self.index
646 .position_orders
647 .entry(*position_id)
648 .or_default()
649 .extend(position.client_order_ids());
650
651 self.index
653 .instrument_positions
654 .entry(instrument_id)
655 .or_default()
656 .insert(*position_id);
657
658 self.index
660 .strategy_positions
661 .entry(strategy_id)
662 .or_default()
663 .insert(*position_id);
664
665 self.index
667 .account_positions
668 .entry(position.account_id)
669 .or_default()
670 .insert(*position_id);
671
672 self.index.positions.insert(*position_id);
674
675 if position.is_open() {
677 self.index.positions_open.insert(*position_id);
678 }
679
680 if position.is_closed() {
682 self.index.positions_closed.insert(*position_id);
683 }
684
685 self.index.strategies.insert(strategy_id);
687 }
688 }
689
690 #[must_use]
692 pub const fn has_backing(&self) -> bool {
693 self.database.is_some()
694 }
695
696 #[must_use]
698 pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
699 let Some(quote) = self.quote(&position.instrument_id) else {
700 log::warn!(
701 "Cannot calculate unrealized PnL for {}, no quotes for {}",
702 position.id,
703 position.instrument_id
704 );
705 return None;
706 };
707
708 let last = match position.side {
710 PositionSide::Flat | PositionSide::NoPositionSide => {
711 return Some(Money::new(0.0, position.settlement_currency));
712 }
713 PositionSide::Long => quote.bid_price,
714 PositionSide::Short => quote.ask_price,
715 };
716
717 Some(position.unrealized_pnl(last))
718 }
719
720 #[must_use]
729 pub fn check_integrity(&mut self) -> bool {
730 let mut error_count = 0;
731 let failure = "Integrity failure";
732
733 let timestamp_us = SystemTime::now()
735 .duration_since(UNIX_EPOCH)
736 .expect("Time went backwards")
737 .as_micros();
738
739 log::info!("Checking data integrity");
740
741 for account_id in self.accounts.keys() {
743 if !self
744 .index
745 .venue_account
746 .contains_key(&account_id.get_issuer())
747 {
748 log::error!(
749 "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
750 );
751 error_count += 1;
752 }
753 }
754
755 for (client_order_id, order_cell) in &self.orders {
756 let order = order_cell.borrow();
757
758 if !self.index.order_strategy.contains_key(client_order_id) {
759 log::error!(
760 "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
761 );
762 error_count += 1;
763 }
764
765 if !self.index.orders.contains(client_order_id) {
766 log::error!(
767 "{failure} in orders: {client_order_id} not found in `self.index.orders`",
768 );
769 error_count += 1;
770 }
771
772 if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
773 log::error!(
774 "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
775 );
776 error_count += 1;
777 }
778
779 if order.is_active_local() && !self.index.orders_active_local.contains(client_order_id)
780 {
781 log::error!(
782 "{failure} in orders: {client_order_id} not found in `self.index.orders_active_local`",
783 );
784 error_count += 1;
785 }
786
787 if order.is_open() && !self.index.orders_open.contains(client_order_id) {
788 log::error!(
789 "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
790 );
791 error_count += 1;
792 }
793
794 if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
795 log::error!(
796 "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
797 );
798 error_count += 1;
799 }
800
801 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
802 if !self
803 .index
804 .exec_algorithm_orders
805 .contains_key(&exec_algorithm_id)
806 {
807 log::error!(
808 "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
809 );
810 error_count += 1;
811 }
812
813 if order.exec_spawn_id().is_none()
814 && !self.index.exec_spawn_orders.contains_key(client_order_id)
815 {
816 log::error!(
817 "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
818 );
819 error_count += 1;
820 }
821 }
822 }
823
824 for (position_id, position_cell) in &self.positions {
825 let position = position_cell.borrow();
826
827 if !self.index.position_strategy.contains_key(position_id) {
828 log::error!(
829 "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
830 );
831 error_count += 1;
832 }
833
834 if !self.index.position_orders.contains_key(position_id) {
835 log::error!(
836 "{failure} in positions: {position_id} not found in `self.index.position_orders`",
837 );
838 error_count += 1;
839 }
840
841 if !self.index.positions.contains(position_id) {
842 log::error!(
843 "{failure} in positions: {position_id} not found in `self.index.positions`",
844 );
845 error_count += 1;
846 }
847
848 if position.is_open() && !self.index.positions_open.contains(position_id) {
849 log::error!(
850 "{failure} in positions: {position_id} not found in `self.index.positions_open`",
851 );
852 error_count += 1;
853 }
854
855 if position.is_closed() && !self.index.positions_closed.contains(position_id) {
856 log::error!(
857 "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
858 );
859 error_count += 1;
860 }
861 }
862
863 for account_id in self.index.venue_account.values() {
865 if !self.accounts.contains_key(account_id) {
866 log::error!(
867 "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
868 );
869 error_count += 1;
870 }
871 }
872
873 for client_order_id in self.index.venue_order_ids.values() {
874 if !self.orders.contains_key(client_order_id) {
875 log::error!(
876 "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
877 );
878 error_count += 1;
879 }
880 }
881
882 for client_order_id in self.index.client_order_ids.keys() {
883 if !self.orders.contains_key(client_order_id) {
884 log::error!(
885 "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
886 );
887 error_count += 1;
888 }
889 }
890
891 for client_order_id in self.index.order_position.keys() {
892 if !self.orders.contains_key(client_order_id) {
893 log::error!(
894 "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
895 );
896 error_count += 1;
897 }
898 }
899
900 for client_order_id in self.index.order_strategy.keys() {
902 if !self.orders.contains_key(client_order_id) {
903 log::error!(
904 "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
905 );
906 error_count += 1;
907 }
908 }
909
910 for position_id in self.index.position_strategy.keys() {
911 if !self.positions.contains_key(position_id) {
912 log::error!(
913 "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
914 );
915 error_count += 1;
916 }
917 }
918
919 for position_id in self.index.position_orders.keys() {
920 if !self.positions.contains_key(position_id) {
921 log::error!(
922 "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
923 );
924 error_count += 1;
925 }
926 }
927
928 for (instrument_id, client_order_ids) in &self.index.instrument_orders {
929 for client_order_id in client_order_ids {
930 if !self.orders.contains_key(client_order_id) {
931 log::error!(
932 "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
933 );
934 error_count += 1;
935 }
936 }
937 }
938
939 for instrument_id in self.index.instrument_positions.keys() {
940 if !self.index.instrument_orders.contains_key(instrument_id) {
941 log::error!(
942 "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
943 );
944 error_count += 1;
945 }
946 }
947
948 for client_order_ids in self.index.strategy_orders.values() {
949 for client_order_id in client_order_ids {
950 if !self.orders.contains_key(client_order_id) {
951 log::error!(
952 "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
953 );
954 error_count += 1;
955 }
956 }
957 }
958
959 for position_ids in self.index.strategy_positions.values() {
960 for position_id in position_ids {
961 if !self.positions.contains_key(position_id) {
962 log::error!(
963 "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
964 );
965 error_count += 1;
966 }
967 }
968 }
969
970 for client_order_id in &self.index.orders {
971 if !self.orders.contains_key(client_order_id) {
972 log::error!(
973 "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
974 );
975 error_count += 1;
976 }
977 }
978
979 for client_order_id in &self.index.orders_emulated {
980 if !self.orders.contains_key(client_order_id) {
981 log::error!(
982 "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
983 );
984 error_count += 1;
985 }
986 }
987
988 for client_order_id in &self.index.orders_active_local {
989 if !self.orders.contains_key(client_order_id) {
990 log::error!(
991 "{failure} in `index.orders_active_local`: {client_order_id} not found in `self.orders`",
992 );
993 error_count += 1;
994 }
995 }
996
997 for client_order_id in &self.index.orders_inflight {
998 if !self.orders.contains_key(client_order_id) {
999 log::error!(
1000 "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
1001 );
1002 error_count += 1;
1003 }
1004 }
1005
1006 for client_order_id in &self.index.orders_open {
1007 if !self.orders.contains_key(client_order_id) {
1008 log::error!(
1009 "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
1010 );
1011 error_count += 1;
1012 }
1013 }
1014
1015 for client_order_id in &self.index.orders_closed {
1016 if !self.orders.contains_key(client_order_id) {
1017 log::error!(
1018 "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
1019 );
1020 error_count += 1;
1021 }
1022 }
1023
1024 for position_id in &self.index.positions {
1025 if !self.positions.contains_key(position_id) {
1026 log::error!(
1027 "{failure} in `index.positions`: {position_id} not found in `self.positions`",
1028 );
1029 error_count += 1;
1030 }
1031 }
1032
1033 for position_id in &self.index.positions_open {
1034 if !self.positions.contains_key(position_id) {
1035 log::error!(
1036 "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
1037 );
1038 error_count += 1;
1039 }
1040 }
1041
1042 for position_id in &self.index.positions_closed {
1043 if !self.positions.contains_key(position_id) {
1044 log::error!(
1045 "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
1046 );
1047 error_count += 1;
1048 }
1049 }
1050
1051 for strategy_id in &self.index.strategies {
1052 if !self.index.strategy_orders.contains_key(strategy_id) {
1053 log::error!(
1054 "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
1055 );
1056 error_count += 1;
1057 }
1058 }
1059
1060 for exec_algorithm_id in &self.index.exec_algorithms {
1061 if !self
1062 .index
1063 .exec_algorithm_orders
1064 .contains_key(exec_algorithm_id)
1065 {
1066 log::error!(
1067 "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
1068 );
1069 error_count += 1;
1070 }
1071 }
1072
1073 let total_us = SystemTime::now()
1074 .duration_since(UNIX_EPOCH)
1075 .expect("Time went backwards")
1076 .as_micros()
1077 - timestamp_us;
1078
1079 if error_count == 0 {
1080 log::info!("Integrity check passed in {total_us}μs");
1081 true
1082 } else {
1083 log::error!(
1084 "Integrity check failed with {error_count} error{} in {total_us}μs",
1085 if error_count == 1 { "" } else { "s" },
1086 );
1087 false
1088 }
1089 }
1090
1091 #[must_use]
1095 pub fn check_residuals(&self) -> bool {
1096 log::debug!("Checking residuals");
1097
1098 let mut residuals = false;
1099
1100 for order in self.orders_open(None, None, None, None, None) {
1102 residuals = true;
1103 log::warn!("Residual {order}");
1104 }
1105
1106 for position in self.positions_open(None, None, None, None, None) {
1108 residuals = true;
1109 log::warn!("Residual {position}");
1110 }
1111
1112 residuals
1113 }
1114
1115 pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
1121 log::debug!(
1122 "Purging closed orders{}",
1123 if buffer_secs > 0 {
1124 format!(" with buffer_secs={buffer_secs}")
1125 } else {
1126 String::new()
1127 }
1128 );
1129
1130 let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
1131
1132 let mut affected_order_list_ids: AHashSet<OrderListId> = AHashSet::new();
1133
1134 'outer: for client_order_id in self.index.orders_closed.clone() {
1135 let purge_target = self.orders.get(&client_order_id).and_then(|order_cell| {
1136 let order = order_cell.borrow();
1137 if order.is_closed()
1138 && let Some(ts_closed) = order.ts_closed()
1139 && ts_closed + buffer_ns <= ts_now
1140 {
1141 let linked = order.linked_order_ids().map(<[_]>::to_vec);
1142 let order_list_id = order.order_list_id();
1143 Some((linked, order_list_id))
1144 } else {
1145 None
1146 }
1147 });
1148
1149 let Some((linked, order_list_id)) = purge_target else {
1150 continue;
1151 };
1152
1153 if let Some(linked_order_ids) = linked {
1155 for linked_order_id in &linked_order_ids {
1156 if let Some(linked_order_cell) = self.orders.get(linked_order_id)
1157 && linked_order_cell.borrow().is_open()
1158 {
1159 continue 'outer;
1161 }
1162 }
1163 }
1164
1165 if let Some(order_list_id) = order_list_id {
1166 affected_order_list_ids.insert(order_list_id);
1167 }
1168
1169 self.purge_order(client_order_id);
1170 }
1171
1172 for order_list_id in affected_order_list_ids {
1173 if let Some(order_list) = self.order_lists.get(&order_list_id) {
1174 let all_purged = order_list
1175 .client_order_ids
1176 .iter()
1177 .all(|id| !self.orders.contains_key(id));
1178
1179 if all_purged {
1180 self.order_lists.remove(&order_list_id);
1181 log::info!("Purged {order_list_id}");
1182 }
1183 }
1184 }
1185 }
1186
1187 pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
1189 log::debug!(
1190 "Purging closed positions{}",
1191 if buffer_secs > 0 {
1192 format!(" with buffer_secs={buffer_secs}")
1193 } else {
1194 String::new()
1195 }
1196 );
1197
1198 let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
1199
1200 for position_id in self.index.positions_closed.clone() {
1201 let should_purge = self.positions.get(&position_id).is_some_and(|cell| {
1202 let position = cell.borrow();
1203 position.is_closed()
1204 && position
1205 .ts_closed
1206 .is_some_and(|ts_closed| ts_closed + buffer_ns <= ts_now)
1207 });
1208
1209 if should_purge {
1210 self.purge_position(position_id);
1211 }
1212 }
1213 }
1214
1215 pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
1219 let order_cell = self.orders.get(&client_order_id).cloned();
1221
1222 if let Some(ref order_cell) = order_cell
1224 && order_cell.borrow().is_open()
1225 {
1226 log::warn!("Order {client_order_id} found open when purging, skipping purge");
1227 return;
1228 }
1229
1230 if let Some(ref order_cell) = order_cell {
1232 let order = order_cell.borrow();
1233 self.orders.remove(&client_order_id);
1235
1236 if let Some(venue_orders) = self
1238 .index
1239 .venue_orders
1240 .get_mut(&order.instrument_id().venue)
1241 {
1242 venue_orders.remove(&client_order_id);
1243 if venue_orders.is_empty() {
1244 self.index.venue_orders.remove(&order.instrument_id().venue);
1245 }
1246 }
1247
1248 if let Some(venue_order_id) = order.venue_order_id() {
1250 self.index.venue_order_ids.remove(&venue_order_id);
1251 }
1252
1253 if let Some(instrument_orders) =
1255 self.index.instrument_orders.get_mut(&order.instrument_id())
1256 {
1257 instrument_orders.remove(&client_order_id);
1258 if instrument_orders.is_empty() {
1259 self.index.instrument_orders.remove(&order.instrument_id());
1260 }
1261 }
1262
1263 if let Some(position_id) = order.position_id()
1265 && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
1266 {
1267 position_orders.remove(&client_order_id);
1268 if position_orders.is_empty() {
1269 self.index.position_orders.remove(&position_id);
1270 }
1271 }
1272
1273 if let Some(exec_algorithm_id) = order.exec_algorithm_id()
1275 && let Some(exec_algorithm_orders) =
1276 self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
1277 {
1278 exec_algorithm_orders.remove(&client_order_id);
1279 if exec_algorithm_orders.is_empty() {
1280 self.index.exec_algorithm_orders.remove(&exec_algorithm_id);
1281 }
1282 }
1283
1284 if let Some(strategy_orders) = self.index.strategy_orders.get_mut(&order.strategy_id())
1286 {
1287 strategy_orders.remove(&client_order_id);
1288 if strategy_orders.is_empty() {
1289 self.index.strategy_orders.remove(&order.strategy_id());
1290 }
1291 }
1292
1293 if let Some(account_id) = order.account_id()
1295 && let Some(account_orders) = self.index.account_orders.get_mut(&account_id)
1296 {
1297 account_orders.remove(&client_order_id);
1298 if account_orders.is_empty() {
1299 self.index.account_orders.remove(&account_id);
1300 }
1301 }
1302
1303 if let Some(exec_spawn_id) = order.exec_spawn_id()
1305 && let Some(spawn_orders) = self.index.exec_spawn_orders.get_mut(&exec_spawn_id)
1306 {
1307 spawn_orders.remove(&client_order_id);
1308 if spawn_orders.is_empty() {
1309 self.index.exec_spawn_orders.remove(&exec_spawn_id);
1310 }
1311 }
1312
1313 log::info!("Purged order {client_order_id}");
1314 } else {
1315 log::warn!("Order {client_order_id} not found when purging");
1316 }
1317
1318 self.index.order_position.remove(&client_order_id);
1320 let strategy_id = self.index.order_strategy.remove(&client_order_id);
1321 self.index.order_client.remove(&client_order_id);
1322 self.index.client_order_ids.remove(&client_order_id);
1323
1324 if let Some(strategy_id) = strategy_id
1326 && let Some(strategy_orders) = self.index.strategy_orders.get_mut(&strategy_id)
1327 {
1328 strategy_orders.remove(&client_order_id);
1329 if strategy_orders.is_empty() {
1330 self.index.strategy_orders.remove(&strategy_id);
1331 }
1332 }
1333
1334 self.index.exec_spawn_orders.remove(&client_order_id);
1336
1337 self.index.orders.remove(&client_order_id);
1338 self.index.orders_active_local.remove(&client_order_id);
1339 self.index.orders_open.remove(&client_order_id);
1340 self.index.orders_closed.remove(&client_order_id);
1341 self.index.orders_emulated.remove(&client_order_id);
1342 self.index.orders_inflight.remove(&client_order_id);
1343 self.index.orders_pending_cancel.remove(&client_order_id);
1344 }
1345
1346 pub fn purge_position(&mut self, position_id: PositionId) {
1350 let position = self
1352 .positions
1353 .get(&position_id)
1354 .map(|cell| cell.borrow().clone());
1355
1356 if let Some(ref pos) = position
1358 && pos.is_open()
1359 {
1360 log::warn!("Position {position_id} found open when purging, skipping purge");
1361 return;
1362 }
1363
1364 if let Some(ref pos) = position {
1366 self.positions.remove(&position_id);
1367
1368 if let Some(venue_positions) =
1370 self.index.venue_positions.get_mut(&pos.instrument_id.venue)
1371 {
1372 venue_positions.remove(&position_id);
1373 if venue_positions.is_empty() {
1374 self.index.venue_positions.remove(&pos.instrument_id.venue);
1375 }
1376 }
1377
1378 if let Some(instrument_positions) =
1380 self.index.instrument_positions.get_mut(&pos.instrument_id)
1381 {
1382 instrument_positions.remove(&position_id);
1383 if instrument_positions.is_empty() {
1384 self.index.instrument_positions.remove(&pos.instrument_id);
1385 }
1386 }
1387
1388 if let Some(strategy_positions) =
1390 self.index.strategy_positions.get_mut(&pos.strategy_id)
1391 {
1392 strategy_positions.remove(&position_id);
1393 if strategy_positions.is_empty() {
1394 self.index.strategy_positions.remove(&pos.strategy_id);
1395 }
1396 }
1397
1398 if let Some(account_positions) = self.index.account_positions.get_mut(&pos.account_id) {
1400 account_positions.remove(&position_id);
1401 if account_positions.is_empty() {
1402 self.index.account_positions.remove(&pos.account_id);
1403 }
1404 }
1405
1406 for client_order_id in pos.client_order_ids() {
1408 self.index.order_position.remove(&client_order_id);
1409 }
1410
1411 log::info!("Purged position {position_id}");
1412 } else {
1413 log::warn!("Position {position_id} not found when purging");
1414 }
1415
1416 self.index.position_strategy.remove(&position_id);
1418 self.index.position_orders.remove(&position_id);
1419 self.index.positions.remove(&position_id);
1420 self.index.positions_open.remove(&position_id);
1421 self.index.positions_closed.remove(&position_id);
1422
1423 self.position_snapshots.remove(&position_id);
1425 }
1426
1427 pub fn purge_instrument(&mut self, instrument_id: InstrumentId) {
1451 #[cfg(feature = "defi")]
1452 let defi_found = self.defi.pools.contains_key(&instrument_id)
1453 || self.defi.pool_profilers.contains_key(&instrument_id);
1454 #[cfg(not(feature = "defi"))]
1455 let defi_found = false;
1456
1457 let found = self.instruments.contains_key(&instrument_id)
1458 || self.synthetics.contains_key(&instrument_id)
1459 || defi_found;
1460
1461 if !found {
1462 log::warn!("Instrument {instrument_id} not found when purging");
1463 return;
1464 }
1465
1466 if let Some(orders) = self.index.instrument_orders.get(&instrument_id) {
1467 let has_non_terminal = orders
1468 .iter()
1469 .any(|client_order_id| !self.index.orders_closed.contains(client_order_id));
1470
1471 if has_non_terminal {
1472 log::warn!(
1473 "Instrument {instrument_id} has non-terminal orders when purging, skipping purge"
1474 );
1475 return;
1476 }
1477 }
1478
1479 if let Some(positions) = self.index.instrument_positions.get(&instrument_id) {
1480 let has_non_closed = positions
1481 .iter()
1482 .any(|position_id| !self.index.positions_closed.contains(position_id));
1483
1484 if has_non_closed {
1485 log::warn!(
1486 "Instrument {instrument_id} has non-closed positions when purging, skipping purge"
1487 );
1488 return;
1489 }
1490 }
1491
1492 self.instruments.remove(&instrument_id);
1493 self.synthetics.remove(&instrument_id);
1494 self.books.remove(&instrument_id);
1495 self.own_books.remove(&instrument_id);
1496 self.quotes.remove(&instrument_id);
1497 self.trades.remove(&instrument_id);
1498 self.mark_prices.remove(&instrument_id);
1499 self.index_prices.remove(&instrument_id);
1500 self.funding_rates.remove(&instrument_id);
1501 self.instrument_statuses.remove(&instrument_id);
1502 self.greeks.remove(&instrument_id);
1503 self.option_greeks.remove(&instrument_id);
1504
1505 self.bars
1506 .retain(|bar_type, _| bar_type.instrument_id() != instrument_id);
1507
1508 #[cfg(feature = "defi")]
1509 {
1510 self.defi.pools.remove(&instrument_id);
1511 self.defi.pool_profilers.remove(&instrument_id);
1512 }
1513
1514 self.index.instrument_orders.remove(&instrument_id);
1515 self.index.instrument_positions.remove(&instrument_id);
1516
1517 log::info!("Purged instrument {instrument_id}");
1518 }
1519
1520 pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1525 log::debug!(
1526 "Purging account events{}",
1527 if lookback_secs > 0 {
1528 format!(" with lookback_secs={lookback_secs}")
1529 } else {
1530 String::new()
1531 }
1532 );
1533
1534 for account_cell in self.accounts.values() {
1535 let mut account = account_cell.borrow_mut();
1536 let event_count = account.event_count();
1537 account.purge_account_events(ts_now, lookback_secs);
1538 let count_diff = event_count - account.event_count();
1539 if count_diff > 0 {
1540 log::info!(
1541 "Purged {} event(s) from account {}",
1542 count_diff,
1543 account.id()
1544 );
1545 }
1546 }
1547 }
1548
1549 pub fn clear_index(&mut self) {
1551 self.index.clear();
1552 log::debug!("Cleared index");
1553 }
1554
1555 pub fn reset(&mut self) {
1561 log::debug!("Resetting cache");
1562
1563 self.general.clear();
1564 self.books.clear();
1565 self.own_books.clear();
1566 self.quotes.clear();
1567 self.trades.clear();
1568 self.mark_xrates.clear();
1569 self.mark_prices.clear();
1570 self.index_prices.clear();
1571 self.funding_rates.clear();
1572 self.instrument_statuses.clear();
1573 self.bars.clear();
1574 self.accounts.clear();
1575 self.orders.clear();
1576 self.order_lists.clear();
1577 self.positions.clear();
1578 self.position_snapshots.clear();
1579 self.greeks.clear();
1580 self.yield_curves.clear();
1581
1582 if self.config.drop_instruments_on_reset {
1583 self.currencies.clear();
1584 self.instruments.clear();
1585 self.synthetics.clear();
1586 }
1587
1588 #[cfg(feature = "defi")]
1589 {
1590 self.defi.pools.clear();
1591 self.defi.pool_profilers.clear();
1592 }
1593
1594 self.clear_index();
1595
1596 log::info!("Reset cache");
1597 }
1598
1599 pub fn dispose(&mut self) {
1603 self.reset();
1604
1605 if let Some(database) = &mut self.database
1606 && let Err(e) = database.close()
1607 {
1608 log::error!("Failed to close database during dispose: {e}");
1609 }
1610 }
1611
1612 pub fn flush_db(&mut self) {
1616 if let Some(database) = &mut self.database
1617 && let Err(e) = database.flush()
1618 {
1619 log::error!("Failed to flush database: {e}");
1620 }
1621 }
1622
1623 pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1631 check_valid_string_ascii(key, stringify!(key))?;
1632 check_predicate_false(value.is_empty(), stringify!(value))?;
1633
1634 log::debug!("Adding general {key}");
1635 self.general.insert(key.to_string(), value.clone());
1636
1637 if let Some(database) = &mut self.database {
1638 database.add(key.to_string(), value)?;
1639 }
1640 Ok(())
1641 }
1642
1643 pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1649 log::debug!("Adding `OrderBook` {}", book.instrument_id);
1650
1651 if self.config.save_market_data
1652 && let Some(database) = &mut self.database
1653 {
1654 database.add_order_book(&book)?;
1655 }
1656
1657 self.books.insert(book.instrument_id, book);
1658 Ok(())
1659 }
1660
1661 pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1667 log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1668
1669 self.own_books.insert(own_book.instrument_id, own_book);
1670 Ok(())
1671 }
1672
1673 pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1679 log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1680
1681 if self.config.save_market_data {
1682 }
1684
1685 let mark_prices_deque = self
1686 .mark_prices
1687 .entry(mark_price.instrument_id)
1688 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1689 mark_prices_deque.push_front(mark_price);
1690 Ok(())
1691 }
1692
1693 pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1699 log::debug!(
1700 "Adding `IndexPriceUpdate` for {}",
1701 index_price.instrument_id
1702 );
1703
1704 if self.config.save_market_data {
1705 }
1707
1708 let index_prices_deque = self
1709 .index_prices
1710 .entry(index_price.instrument_id)
1711 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1712 index_prices_deque.push_front(index_price);
1713 Ok(())
1714 }
1715
1716 pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
1722 log::debug!(
1723 "Adding `FundingRateUpdate` for {}",
1724 funding_rate.instrument_id
1725 );
1726
1727 if self.config.save_market_data {
1728 }
1730
1731 let funding_rates_deque = self
1732 .funding_rates
1733 .entry(funding_rate.instrument_id)
1734 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1735 funding_rates_deque.push_front(funding_rate);
1736 Ok(())
1737 }
1738
1739 pub fn add_funding_rates(&mut self, funding_rates: &[FundingRateUpdate]) -> anyhow::Result<()> {
1745 check_slice_not_empty(funding_rates, stringify!(funding_rates))?;
1746
1747 let instrument_id = funding_rates[0].instrument_id;
1748 log::debug!(
1749 "Adding `FundingRateUpdate`[{}] {instrument_id}",
1750 funding_rates.len()
1751 );
1752
1753 if self.config.save_market_data
1754 && let Some(database) = &mut self.database
1755 {
1756 for funding_rate in funding_rates {
1757 database.add_funding_rate(funding_rate)?;
1758 }
1759 }
1760
1761 let funding_rate_deque = self
1762 .funding_rates
1763 .entry(instrument_id)
1764 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1765
1766 for funding_rate in funding_rates {
1767 funding_rate_deque.push_front(*funding_rate);
1768 }
1769 Ok(())
1770 }
1771
1772 pub fn add_instrument_status(&mut self, status: InstrumentStatus) -> anyhow::Result<()> {
1778 log::debug!("Adding `InstrumentStatus` for {}", status.instrument_id);
1779
1780 if self.config.save_market_data {
1781 }
1783
1784 let statuses_deque = self
1785 .instrument_statuses
1786 .entry(status.instrument_id)
1787 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1788 statuses_deque.push_front(status);
1789 Ok(())
1790 }
1791
1792 pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1798 log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1799
1800 if self.config.save_market_data
1801 && let Some(database) = &mut self.database
1802 {
1803 database.add_quote("e)?;
1804 }
1805
1806 let quotes_deque = self
1807 .quotes
1808 .entry(quote.instrument_id)
1809 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1810 quotes_deque.push_front(quote);
1811 Ok(())
1812 }
1813
1814 pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1820 check_slice_not_empty(quotes, stringify!(quotes))?;
1821
1822 let instrument_id = quotes[0].instrument_id;
1823 log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1824
1825 if self.config.save_market_data
1826 && let Some(database) = &mut self.database
1827 {
1828 for quote in quotes {
1829 database.add_quote(quote)?;
1830 }
1831 }
1832
1833 let quotes_deque = self
1834 .quotes
1835 .entry(instrument_id)
1836 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1837
1838 for quote in quotes {
1839 quotes_deque.push_front(*quote);
1840 }
1841 Ok(())
1842 }
1843
1844 pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1850 log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1851
1852 if self.config.save_market_data
1853 && let Some(database) = &mut self.database
1854 {
1855 database.add_trade(&trade)?;
1856 }
1857
1858 let trades_deque = self
1859 .trades
1860 .entry(trade.instrument_id)
1861 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1862 trades_deque.push_front(trade);
1863 Ok(())
1864 }
1865
1866 pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1872 check_slice_not_empty(trades, stringify!(trades))?;
1873
1874 let instrument_id = trades[0].instrument_id;
1875 log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1876
1877 if self.config.save_market_data
1878 && let Some(database) = &mut self.database
1879 {
1880 for trade in trades {
1881 database.add_trade(trade)?;
1882 }
1883 }
1884
1885 let trades_deque = self
1886 .trades
1887 .entry(instrument_id)
1888 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1889
1890 for trade in trades {
1891 trades_deque.push_front(*trade);
1892 }
1893 Ok(())
1894 }
1895
1896 pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1902 log::debug!("Adding `Bar` {}", bar.bar_type);
1903
1904 if self.config.save_market_data
1905 && let Some(database) = &mut self.database
1906 {
1907 database.add_bar(&bar)?;
1908 }
1909
1910 let bars = self
1911 .bars
1912 .entry(bar.bar_type)
1913 .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1914 bars.push_front(bar);
1915 Ok(())
1916 }
1917
1918 pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1924 check_slice_not_empty(bars, stringify!(bars))?;
1925
1926 let bar_type = bars[0].bar_type;
1927 log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1928
1929 if self.config.save_market_data
1930 && let Some(database) = &mut self.database
1931 {
1932 for bar in bars {
1933 database.add_bar(bar)?;
1934 }
1935 }
1936
1937 let bars_deque = self
1938 .bars
1939 .entry(bar_type)
1940 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1941
1942 for bar in bars {
1943 bars_deque.push_front(*bar);
1944 }
1945 Ok(())
1946 }
1947
1948 pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1954 log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1955
1956 if self.config.save_market_data
1957 && let Some(_database) = &mut self.database
1958 {
1959 }
1961
1962 self.greeks.insert(greeks.instrument_id, greeks);
1963 Ok(())
1964 }
1965
1966 pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1968 self.greeks.get(instrument_id).cloned()
1969 }
1970
1971 pub fn add_option_greeks(&mut self, greeks: OptionGreeks) {
1973 log::debug!("Adding `OptionGreeks` {}", greeks.instrument_id);
1974 self.option_greeks.insert(greeks.instrument_id, greeks);
1975 }
1976
1977 #[must_use]
1979 pub fn option_greeks(&self, instrument_id: &InstrumentId) -> Option<&OptionGreeks> {
1980 self.option_greeks.get(instrument_id)
1981 }
1982
1983 pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1989 log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1990
1991 if self.config.save_market_data
1992 && let Some(_database) = &mut self.database
1993 {
1994 }
1996
1997 self.yield_curves
1998 .insert(yield_curve.curve_name.clone(), yield_curve);
1999 Ok(())
2000 }
2001
2002 pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
2004 self.yield_curves.get(key).map(|curve| {
2005 let curve_clone = curve.clone();
2006 Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
2007 as Box<dyn Fn(f64) -> f64>
2008 })
2009 }
2010
2011 pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
2017 if self.currencies.contains_key(¤cy.code) {
2018 return Ok(());
2019 }
2020 log::debug!("Adding `Currency` {}", currency.code);
2021
2022 if let Some(database) = &mut self.database {
2023 database.add_currency(¤cy)?;
2024 }
2025
2026 self.currencies.insert(currency.code, currency);
2027 Ok(())
2028 }
2029
2030 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
2036 log::debug!("Adding `Instrument` {}", instrument.id());
2037
2038 if let Some(base_currency) = instrument.base_currency() {
2040 self.add_currency(base_currency)?;
2041 }
2042 self.add_currency(instrument.quote_currency())?;
2043 self.add_currency(instrument.settlement_currency())?;
2044
2045 if let Some(database) = &mut self.database {
2046 database.add_instrument(&instrument)?;
2047 }
2048
2049 self.instruments.insert(instrument.id(), instrument);
2050 Ok(())
2051 }
2052
2053 pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
2059 log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
2060
2061 if let Some(database) = &mut self.database {
2062 database.add_synthetic(&synthetic)?;
2063 }
2064
2065 self.synthetics.insert(synthetic.id, synthetic);
2066 Ok(())
2067 }
2068
2069 pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
2075 log::debug!("Adding `Account` {}", account.id());
2076
2077 if let Some(database) = &mut self.database {
2078 database.add_account(&account)?;
2079 }
2080
2081 let account_id = account.id();
2082 self.accounts.insert(account_id, SharedCell::new(account));
2083 self.index
2084 .venue_account
2085 .insert(account_id.get_issuer(), account_id);
2086 Ok(())
2087 }
2088
2089 pub fn add_venue_order_id(
2097 &mut self,
2098 client_order_id: &ClientOrderId,
2099 venue_order_id: &VenueOrderId,
2100 overwrite: bool,
2101 ) -> anyhow::Result<()> {
2102 if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
2103 && !overwrite
2104 && existing_venue_order_id != venue_order_id
2105 {
2106 anyhow::bail!(
2107 "Existing {existing_venue_order_id} for {client_order_id}
2108 did not match the given {venue_order_id}.
2109 If you are writing a test then try a different `venue_order_id`,
2110 otherwise this is probably a bug."
2111 );
2112 }
2113
2114 self.index
2115 .client_order_ids
2116 .insert(*client_order_id, *venue_order_id);
2117 self.index
2118 .venue_order_ids
2119 .insert(*venue_order_id, *client_order_id);
2120
2121 Ok(())
2122 }
2123
2124 pub fn add_order(
2136 &mut self,
2137 order: OrderAny,
2138 position_id: Option<PositionId>,
2139 client_id: Option<ClientId>,
2140 replace_existing: bool,
2141 ) -> anyhow::Result<()> {
2142 let instrument_id = order.instrument_id();
2143 let venue = instrument_id.venue;
2144 let client_order_id = order.client_order_id();
2145 let strategy_id = order.strategy_id();
2146 let exec_algorithm_id = order.exec_algorithm_id();
2147 let exec_spawn_id = order.exec_spawn_id();
2148
2149 if !replace_existing {
2150 check_key_not_in_map(
2151 &client_order_id,
2152 &self.orders,
2153 stringify!(client_order_id),
2154 stringify!(orders),
2155 )?;
2156 }
2157
2158 log::debug!("Adding {order:?}");
2159
2160 self.index.orders.insert(client_order_id);
2161
2162 if order.is_active_local() {
2163 self.index.orders_active_local.insert(client_order_id);
2164 }
2165 self.index
2166 .order_strategy
2167 .insert(client_order_id, strategy_id);
2168 self.index.strategies.insert(strategy_id);
2169
2170 self.index
2172 .venue_orders
2173 .entry(venue)
2174 .or_default()
2175 .insert(client_order_id);
2176
2177 self.index
2179 .instrument_orders
2180 .entry(instrument_id)
2181 .or_default()
2182 .insert(client_order_id);
2183
2184 self.index
2186 .strategy_orders
2187 .entry(strategy_id)
2188 .or_default()
2189 .insert(client_order_id);
2190
2191 if let Some(account_id) = order.account_id() {
2193 self.index
2194 .account_orders
2195 .entry(account_id)
2196 .or_default()
2197 .insert(client_order_id);
2198 }
2199
2200 if let Some(exec_algorithm_id) = exec_algorithm_id {
2202 self.index.exec_algorithms.insert(exec_algorithm_id);
2203
2204 self.index
2205 .exec_algorithm_orders
2206 .entry(exec_algorithm_id)
2207 .or_default()
2208 .insert(client_order_id);
2209 }
2210
2211 if let Some(exec_spawn_id) = exec_spawn_id {
2213 self.index
2214 .exec_spawn_orders
2215 .entry(exec_spawn_id)
2216 .or_default()
2217 .insert(client_order_id);
2218 }
2219
2220 if let Some(emulation_trigger) = order.emulation_trigger()
2222 && emulation_trigger != TriggerType::NoTrigger
2223 {
2224 self.index.orders_emulated.insert(client_order_id);
2225 }
2226
2227 if let Some(position_id) = position_id {
2229 self.add_position_id(
2230 &position_id,
2231 &order.instrument_id().venue,
2232 &client_order_id,
2233 &strategy_id,
2234 )?;
2235 }
2236
2237 if let Some(client_id) = client_id {
2239 self.index.order_client.insert(client_order_id, client_id);
2240 log::debug!("Indexed {client_id:?}");
2241 }
2242
2243 if let Some(database) = &mut self.database {
2244 database.add_order(&order, client_id)?;
2245 }
2250
2251 match self.orders.get(&client_order_id) {
2252 Some(order_cell) => *order_cell.borrow_mut() = order,
2255 None => {
2256 self.orders.insert(client_order_id, SharedCell::new(order));
2257 }
2258 }
2259
2260 Ok(())
2261 }
2262
2263 pub fn add_order_list(&mut self, order_list: OrderList) -> anyhow::Result<()> {
2269 let order_list_id = order_list.id;
2270 check_key_not_in_map(
2271 &order_list_id,
2272 &self.order_lists,
2273 stringify!(order_list_id),
2274 stringify!(order_lists),
2275 )?;
2276
2277 log::debug!("Adding {order_list:?}");
2278 self.order_lists.insert(order_list_id, order_list);
2279 Ok(())
2280 }
2281
2282 pub fn add_position_id(
2288 &mut self,
2289 position_id: &PositionId,
2290 venue: &Venue,
2291 client_order_id: &ClientOrderId,
2292 strategy_id: &StrategyId,
2293 ) -> anyhow::Result<()> {
2294 self.index
2295 .order_position
2296 .insert(*client_order_id, *position_id);
2297
2298 if let Some(database) = &mut self.database {
2300 database.index_order_position(*client_order_id, *position_id)?;
2301 }
2302
2303 self.index
2305 .position_strategy
2306 .insert(*position_id, *strategy_id);
2307
2308 self.index
2310 .position_orders
2311 .entry(*position_id)
2312 .or_default()
2313 .insert(*client_order_id);
2314
2315 self.index
2317 .strategy_positions
2318 .entry(*strategy_id)
2319 .or_default()
2320 .insert(*position_id);
2321
2322 self.index
2324 .venue_positions
2325 .entry(*venue)
2326 .or_default()
2327 .insert(*position_id);
2328
2329 Ok(())
2330 }
2331
2332 fn assign_position_ids_to_contingencies(&mut self) {
2341 let mut assignments: Vec<(PositionId, ClientOrderId)> = Vec::new();
2342
2343 for parent_order_cell in self.orders.values() {
2344 let parent = parent_order_cell.borrow();
2345 if parent.contingency_type() != Some(ContingencyType::Oto) {
2346 continue;
2347 }
2348 let Some(parent_position_id) = parent.position_id() else {
2349 continue;
2350 };
2351 let Some(linked_order_ids) = parent.linked_order_ids() else {
2352 continue;
2353 };
2354
2355 for client_order_id in linked_order_ids {
2356 match self.orders.get(client_order_id) {
2357 None => {
2358 log::error!("Contingency order {client_order_id} not found");
2359 }
2360 Some(contingent_order_cell) => {
2361 if contingent_order_cell.borrow().position_id().is_none() {
2362 assignments.push((parent_position_id, *client_order_id));
2363 }
2364 }
2365 }
2366 }
2367 }
2368
2369 for (position_id, client_order_id) in assignments {
2370 let Some((venue, strategy_id)) = self.orders.get(&client_order_id).map(|order_cell| {
2371 let mut contingent = order_cell.borrow_mut();
2372 contingent.set_position_id(Some(position_id));
2373 (contingent.instrument_id().venue, contingent.strategy_id())
2374 }) else {
2375 continue;
2376 };
2377
2378 self.index
2385 .order_position
2386 .insert(client_order_id, position_id);
2387 self.index
2388 .position_strategy
2389 .insert(position_id, strategy_id);
2390 self.index
2391 .position_orders
2392 .entry(position_id)
2393 .or_default()
2394 .insert(client_order_id);
2395 self.index
2396 .strategy_positions
2397 .entry(strategy_id)
2398 .or_default()
2399 .insert(position_id);
2400 self.index
2401 .venue_positions
2402 .entry(venue)
2403 .or_default()
2404 .insert(position_id);
2405 }
2406 }
2407
2408 pub fn add_position(&mut self, position: &Position, _oms_type: OmsType) -> anyhow::Result<()> {
2414 self.positions
2415 .insert(position.id, SharedCell::new(position.clone()));
2416 self.index.positions.insert(position.id);
2417 self.index.positions_open.insert(position.id);
2418 self.index.positions_closed.remove(&position.id); log::debug!("Adding {position}");
2421
2422 self.add_position_id(
2423 &position.id,
2424 &position.instrument_id.venue,
2425 &position.opening_order_id,
2426 &position.strategy_id,
2427 )?;
2428
2429 let venue = position.instrument_id.venue;
2430 let venue_positions = self.index.venue_positions.entry(venue).or_default();
2431 venue_positions.insert(position.id);
2432
2433 let instrument_id = position.instrument_id;
2435 let instrument_positions = self
2436 .index
2437 .instrument_positions
2438 .entry(instrument_id)
2439 .or_default();
2440 instrument_positions.insert(position.id);
2441
2442 self.index
2444 .account_positions
2445 .entry(position.account_id)
2446 .or_default()
2447 .insert(position.id);
2448
2449 if let Some(database) = &mut self.database {
2450 database.add_position(position)?;
2451 }
2460
2461 Ok(())
2462 }
2463
2464 pub fn update_account(&mut self, account: &AccountAny) -> anyhow::Result<()> {
2473 let account_id = account.id();
2474 match self.accounts.get(&account_id) {
2475 Some(account_cell) => *account_cell.borrow_mut() = account.clone(),
2476 None => {
2477 self.accounts
2478 .insert(account_id, SharedCell::new(account.clone()));
2479 }
2480 }
2481
2482 if let Some(database) = &mut self.database {
2483 database.update_account(account)?;
2484 }
2485 Ok(())
2486 }
2487
2488 #[must_use]
2503 pub fn take_account(&mut self, account_id: &AccountId) -> Option<AccountAny> {
2504 self.accounts.remove(account_id).map(|cell| {
2505 let rc: Rc<RefCell<AccountAny>> = cell.into();
2506 Rc::try_unwrap(rc).map_or_else(
2507 |_| panic!("take_account: cache must be sole owner of {account_id} cell"),
2508 RefCell::into_inner,
2509 )
2510 })
2511 }
2512
2513 pub fn cache_account_owned(&mut self, account: AccountAny) {
2515 let account_id = account.id();
2516 self.index
2517 .venue_account
2518 .insert(account_id.get_issuer(), account_id);
2519 match self.accounts.get(&account_id) {
2520 Some(account_cell) => *account_cell.borrow_mut() = account,
2521 None => {
2522 self.accounts.insert(account_id, SharedCell::new(account));
2523 }
2524 }
2525 }
2526
2527 pub fn update_account_owned(&mut self, account: AccountAny) -> anyhow::Result<()> {
2533 let account_id = account.id();
2534 self.cache_account_owned(account);
2535
2536 if let Some(database) = &mut self.database {
2537 let Some(account_cell) = self.accounts.get(&account_id) else {
2538 anyhow::bail!("Account {account_id} not found after cache update");
2539 };
2540 database.update_account(&account_cell.borrow())?;
2541 }
2542 Ok(())
2543 }
2544
2545 pub fn update_account_state(&mut self, event: &AccountState) -> anyhow::Result<()> {
2555 let Some(cell) = self.accounts.get(&event.account_id) else {
2556 return self.add_account(AccountAny::from_events(std::slice::from_ref(event))?);
2557 };
2558
2559 cell.borrow_mut().apply(event.clone())?;
2560
2561 if let Some(database) = &mut self.database {
2562 database.update_account(&cell.borrow())?;
2563 }
2564 Ok(())
2565 }
2566
2567 pub fn replace_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
2576 self.refresh_order(order)?;
2577
2578 let client_order_id = order.client_order_id();
2579 match self.orders.get(&client_order_id) {
2580 Some(order_cell) => *order_cell.borrow_mut() = order.clone(),
2583 None => {
2584 self.orders
2585 .insert(client_order_id, SharedCell::new(order.clone()));
2586 }
2587 }
2588
2589 Ok(())
2590 }
2591
2592 pub fn update_order(&mut self, event: &OrderEventAny) -> anyhow::Result<OrderAny> {
2598 let event_client_order_id = event.client_order_id();
2599 let client_order_id = if self.order_exists(&event_client_order_id) {
2600 event_client_order_id
2601 } else if let Some(venue_order_id) = event.venue_order_id() {
2602 self.index
2603 .venue_order_ids
2604 .get(&venue_order_id)
2605 .copied()
2606 .ok_or(OrderError::NotFound(event_client_order_id))?
2607 } else {
2608 return Err(OrderError::NotFound(event_client_order_id).into());
2609 };
2610
2611 let order_cell = self
2612 .orders
2613 .get(&client_order_id)
2614 .cloned()
2615 .ok_or(OrderError::NotFound(client_order_id))?;
2616
2617 let mut snapshot = order_cell.borrow().clone();
2621 snapshot.apply(event.clone())?;
2622 *order_cell.borrow_mut() = snapshot.clone();
2623
2624 if let Err(e) = self.refresh_order(&snapshot) {
2625 log::error!("Error updating order in cache: {e}");
2626 }
2627
2628 Ok(snapshot)
2629 }
2630
2631 fn refresh_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
2632 let client_order_id = order.client_order_id();
2633
2634 if order.is_active_local() {
2635 self.index.orders_active_local.insert(client_order_id);
2636 } else {
2637 self.index.orders_active_local.remove(&client_order_id);
2638 }
2639
2640 if let Some(venue_order_id) = order.venue_order_id() {
2642 if !self.index.venue_order_ids.contains_key(&venue_order_id) {
2645 let overwrite = matches!(order.last_event(), OrderEventAny::Updated(_));
2646 if let Err(e) =
2647 self.add_venue_order_id(&order.client_order_id(), &venue_order_id, overwrite)
2648 {
2649 log::error!("Error indexing venue order ID in cache: {e}");
2650 }
2651 }
2652 }
2653
2654 if order.is_inflight() {
2656 self.index.orders_inflight.insert(client_order_id);
2657 } else {
2658 self.index.orders_inflight.remove(&client_order_id);
2659 }
2660
2661 if order.is_open() {
2663 self.index.orders_closed.remove(&client_order_id);
2664 self.index.orders_open.insert(client_order_id);
2665 } else if order.is_closed() {
2666 self.index.orders_open.remove(&client_order_id);
2667 self.index.orders_pending_cancel.remove(&client_order_id);
2668 self.index.orders_closed.insert(client_order_id);
2669 }
2670
2671 if let Some(emulation_trigger) = order.emulation_trigger()
2673 && emulation_trigger != TriggerType::NoTrigger
2674 && !order.is_closed()
2675 {
2676 self.index.orders_emulated.insert(client_order_id);
2677 } else {
2678 self.index.orders_emulated.remove(&client_order_id);
2679 }
2680
2681 if let Some(account_id) = order.account_id() {
2683 self.index
2684 .account_orders
2685 .entry(account_id)
2686 .or_default()
2687 .insert(client_order_id);
2688 }
2689
2690 if !self.own_books.is_empty() {
2692 let own_book = self.own_order_book(&order.instrument_id());
2693 if (own_book.is_some() && order.is_closed()) || should_handle_own_book_order(order) {
2694 self.update_own_order_book(order);
2695 }
2696 }
2697
2698 if let Some(database) = &mut self.database {
2699 database.update_order(order.last_event())?;
2700 }
2705
2706 Ok(())
2707 }
2708
2709 pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
2711 self.index
2712 .orders_pending_cancel
2713 .insert(order.client_order_id());
2714 }
2715
2716 pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
2725 if position.is_open() {
2728 self.index.positions_open.insert(position.id);
2729 self.index.positions_closed.remove(&position.id);
2730 } else {
2731 self.index.positions_closed.insert(position.id);
2732 self.index.positions_open.remove(&position.id);
2733 }
2734
2735 if let Some(database) = &mut self.database {
2736 database.update_position(position)?;
2737 }
2742
2743 match self.positions.get(&position.id) {
2744 Some(position_cell) => *position_cell.borrow_mut() = position.clone(),
2745 None => {
2746 self.positions
2747 .insert(position.id, SharedCell::new(position.clone()));
2748 }
2749 }
2750
2751 Ok(())
2752 }
2753
2754 pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<CacheSnapshotRef> {
2761 let position_id = position.id;
2762
2763 let mut copied_position = position.clone();
2764 let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
2765 copied_position.id = PositionId::new(new_id);
2766
2767 let position_serialized = serde_json::to_vec(&copied_position)?;
2769 let snapshot_index = self.position_snapshot_count(&position_id);
2770 let blob_ref = format!(
2771 "cache://position-snapshots/{}/{}",
2772 position_id.as_str(),
2773 snapshot_index,
2774 );
2775 let snapshot_blob = Bytes::from(position_serialized);
2776
2777 self.add(&blob_ref, snapshot_blob.clone())?;
2778 self.position_snapshots
2779 .entry(position_id)
2780 .or_default()
2781 .push(snapshot_blob.clone());
2782
2783 log::debug!("Snapshot {copied_position}");
2784 Ok(CacheSnapshotRef::new(blob_ref, snapshot_blob))
2785 }
2786
2787 pub fn load_snapshot_blob(&mut self, blob_ref: &str) -> anyhow::Result<Option<Bytes>> {
2797 if let Some(blob) = self.snapshot_blob(blob_ref) {
2798 return Ok(Some(blob));
2799 }
2800
2801 if self.database.is_some() {
2802 self.cache_general()?;
2803 }
2804
2805 Ok(self.snapshot_blob(blob_ref))
2806 }
2807
2808 pub fn restore_snapshot_blob(&mut self, blob_ref: &str, blob: Bytes) -> anyhow::Result<()> {
2818 let (position_id, snapshot_index) = parse_position_snapshot_blob_ref(blob_ref)?;
2819 validate_position_snapshot_blob(&position_id, blob.as_ref())?;
2820
2821 let frames = self.position_snapshots.entry(position_id).or_default();
2822 match frames.get(snapshot_index) {
2823 Some(existing) if existing == &blob => {}
2824 Some(_) => {
2825 anyhow::bail!(
2826 "position snapshot frame {snapshot_index} for {position_id} already exists with different bytes"
2827 );
2828 }
2829 None if frames.len() == snapshot_index => frames.push(blob.clone()),
2830 None => {
2831 anyhow::bail!(
2832 "position snapshot blob_ref {blob_ref} skips missing frame {}",
2833 frames.len()
2834 );
2835 }
2836 }
2837
2838 self.general.insert(blob_ref.to_string(), blob);
2839 Ok(())
2840 }
2841
2842 fn snapshot_blob(&self, blob_ref: &str) -> Option<Bytes> {
2843 if let Some(blob) = self.general.get(blob_ref) {
2844 return Some(blob.clone());
2845 }
2846
2847 let (position_id, snapshot_index) = parse_position_snapshot_blob_ref(blob_ref).ok()?;
2848 self.position_snapshots
2849 .get(&position_id)
2850 .and_then(|frames| frames.get(snapshot_index))
2851 .cloned()
2852 }
2853
2854 pub fn snapshot_position_state(
2860 &mut self,
2861 position: &Position,
2862 open_only: Option<bool>,
2865 ) -> anyhow::Result<()> {
2866 let open_only = open_only.unwrap_or(true);
2867
2868 if open_only && !position.is_open() {
2869 return Ok(());
2870 }
2871
2872 if let Some(database) = &mut self.database {
2873 database.snapshot_position_state(position).map_err(|e| {
2874 log::error!(
2875 "Failed to snapshot position state for {}: {e:?}",
2876 position.id
2877 );
2878 e
2879 })?;
2880 } else {
2881 log::warn!(
2882 "Cannot snapshot position state for {} (no database configured)",
2883 position.id
2884 );
2885 }
2886
2887 todo!()
2889 }
2890
2891 #[must_use]
2893 pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
2894 if self.index.position_strategy.contains_key(position_id) {
2896 Some(OmsType::Netting)
2899 } else {
2900 None
2901 }
2902 }
2903
2904 #[must_use]
2909 pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<Vec<u8>>> {
2910 self.position_snapshots
2911 .get(position_id)
2912 .map(|frames| frames.iter().map(|b| b.to_vec()).collect())
2913 }
2914
2915 #[must_use]
2919 pub fn position_snapshot_count(&self, position_id: &PositionId) -> usize {
2920 self.position_snapshots.get(position_id).map_or(0, Vec::len)
2921 }
2922
2923 #[must_use]
2929 pub fn position_snapshots(
2930 &self,
2931 position_id: Option<&PositionId>,
2932 account_id: Option<&AccountId>,
2933 ) -> Vec<Position> {
2934 let frames: Box<dyn Iterator<Item = &Bytes> + '_> = match position_id {
2935 Some(pid) => match self.position_snapshots.get(pid) {
2936 Some(v) => Box::new(v.iter()),
2937 None => Box::new(std::iter::empty()),
2938 },
2939 None => Box::new(self.position_snapshots.values().flat_map(|v| v.iter())),
2940 };
2941
2942 let mut results: Vec<Position> = frames
2943 .filter_map(|bytes| match serde_json::from_slice::<Position>(bytes) {
2944 Ok(position) => Some(position),
2945 Err(e) => {
2946 log::warn!("Failed to decode position snapshot: {e}");
2947 None
2948 }
2949 })
2950 .collect();
2951
2952 if let Some(aid) = account_id {
2953 results.retain(|p| p.account_id == *aid);
2954 }
2955
2956 results
2957 }
2958
2959 #[must_use]
2965 pub fn position_snapshots_from(&self, position_id: &PositionId, skip: usize) -> Vec<Position> {
2966 let Some(frames) = self.position_snapshots.get(position_id) else {
2967 return Vec::new();
2968 };
2969
2970 frames
2971 .iter()
2972 .skip(skip)
2973 .filter_map(|bytes| match serde_json::from_slice::<Position>(bytes) {
2974 Ok(position) => Some(position),
2975 Err(e) => {
2976 log::warn!("Failed to decode position snapshot: {e}");
2977 None
2978 }
2979 })
2980 .collect()
2981 }
2982
2983 #[must_use]
2985 pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> AHashSet<PositionId> {
2986 let mut result = AHashSet::new();
2988
2989 for (position_id, _) in &self.position_snapshots {
2990 if let Some(position_cell) = self.positions.get(position_id)
2992 && position_cell.borrow().instrument_id == *instrument_id
2993 {
2994 result.insert(*position_id);
2995 }
2996 }
2997 result
2998 }
2999
3000 pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
3006 let Some(database) = &self.database else {
3007 log::warn!(
3008 "Cannot snapshot order state for {} (no database configured)",
3009 order.client_order_id()
3010 );
3011 return Ok(());
3012 };
3013
3014 database.snapshot_order_state(order)
3015 }
3016
3017 fn collect_order_filter_sources<'a>(
3028 &'a self,
3029 venue: Option<&Venue>,
3030 instrument_id: Option<&InstrumentId>,
3031 strategy_id: Option<&StrategyId>,
3032 account_id: Option<&AccountId>,
3033 ) -> FilterSources<'a, ClientOrderId> {
3034 let mut sources: Vec<&AHashSet<ClientOrderId>> = Vec::with_capacity(4);
3035
3036 if let Some(venue) = venue {
3037 match self.index.venue_orders.get(venue) {
3038 Some(set) => sources.push(set),
3039 None => return FilterSources::Empty,
3040 }
3041 }
3042
3043 if let Some(instrument_id) = instrument_id {
3044 match self.index.instrument_orders.get(instrument_id) {
3045 Some(set) => sources.push(set),
3046 None => return FilterSources::Empty,
3047 }
3048 }
3049
3050 if let Some(strategy_id) = strategy_id {
3051 match self.index.strategy_orders.get(strategy_id) {
3052 Some(set) => sources.push(set),
3053 None => return FilterSources::Empty,
3054 }
3055 }
3056
3057 if let Some(account_id) = account_id {
3058 match self.index.account_orders.get(account_id) {
3059 Some(set) => sources.push(set),
3060 None => return FilterSources::Empty,
3061 }
3062 }
3063
3064 if sources.is_empty() {
3065 FilterSources::Unfiltered
3066 } else {
3067 FilterSources::Sets(sources)
3068 }
3069 }
3070
3071 fn collect_position_filter_sources<'a>(
3072 &'a self,
3073 venue: Option<&Venue>,
3074 instrument_id: Option<&InstrumentId>,
3075 strategy_id: Option<&StrategyId>,
3076 account_id: Option<&AccountId>,
3077 ) -> FilterSources<'a, PositionId> {
3078 let mut sources: Vec<&AHashSet<PositionId>> = Vec::with_capacity(4);
3079
3080 if let Some(venue) = venue {
3081 match self.index.venue_positions.get(venue) {
3082 Some(set) => sources.push(set),
3083 None => return FilterSources::Empty,
3084 }
3085 }
3086
3087 if let Some(instrument_id) = instrument_id {
3088 match self.index.instrument_positions.get(instrument_id) {
3089 Some(set) => sources.push(set),
3090 None => return FilterSources::Empty,
3091 }
3092 }
3093
3094 if let Some(strategy_id) = strategy_id {
3095 match self.index.strategy_positions.get(strategy_id) {
3096 Some(set) => sources.push(set),
3097 None => return FilterSources::Empty,
3098 }
3099 }
3100
3101 if let Some(account_id) = account_id {
3102 match self.index.account_positions.get(account_id) {
3103 Some(set) => sources.push(set),
3104 None => return FilterSources::Empty,
3105 }
3106 }
3107
3108 if sources.is_empty() {
3109 FilterSources::Unfiltered
3110 } else {
3111 FilterSources::Sets(sources)
3112 }
3113 }
3114
3115 fn query_orders_in_bucket(
3121 &self,
3122 bucket: &AHashSet<ClientOrderId>,
3123 venue: Option<&Venue>,
3124 instrument_id: Option<&InstrumentId>,
3125 strategy_id: Option<&StrategyId>,
3126 account_id: Option<&AccountId>,
3127 ) -> AHashSet<ClientOrderId> {
3128 match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
3129 FilterSources::Empty => AHashSet::new(),
3130 FilterSources::Unfiltered => bucket.clone(),
3131 FilterSources::Sets(sources) => intersect_pair_or_many(bucket, sources),
3132 }
3133 }
3134
3135 fn query_positions_in_bucket(
3136 &self,
3137 bucket: &AHashSet<PositionId>,
3138 venue: Option<&Venue>,
3139 instrument_id: Option<&InstrumentId>,
3140 strategy_id: Option<&StrategyId>,
3141 account_id: Option<&AccountId>,
3142 ) -> AHashSet<PositionId> {
3143 match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
3144 FilterSources::Empty => AHashSet::new(),
3145 FilterSources::Unfiltered => bucket.clone(),
3146 FilterSources::Sets(sources) => intersect_pair_or_many(bucket, sources),
3147 }
3148 }
3149
3150 fn view_orders_in_bucket<'a>(
3153 &'a self,
3154 bucket: &'a AHashSet<ClientOrderId>,
3155 venue: Option<&Venue>,
3156 instrument_id: Option<&InstrumentId>,
3157 strategy_id: Option<&StrategyId>,
3158 account_id: Option<&AccountId>,
3159 ) -> Cow<'a, AHashSet<ClientOrderId>> {
3160 match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
3161 FilterSources::Empty => Cow::Owned(AHashSet::new()),
3162 FilterSources::Unfiltered => Cow::Borrowed(bucket),
3163 FilterSources::Sets(sources) => Cow::Owned(intersect_pair_or_many(bucket, sources)),
3164 }
3165 }
3166
3167 fn view_positions_in_bucket<'a>(
3168 &'a self,
3169 bucket: &'a AHashSet<PositionId>,
3170 venue: Option<&Venue>,
3171 instrument_id: Option<&InstrumentId>,
3172 strategy_id: Option<&StrategyId>,
3173 account_id: Option<&AccountId>,
3174 ) -> Cow<'a, AHashSet<PositionId>> {
3175 match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
3176 FilterSources::Empty => Cow::Owned(AHashSet::new()),
3177 FilterSources::Unfiltered => Cow::Borrowed(bucket),
3178 FilterSources::Sets(sources) => Cow::Owned(intersect_pair_or_many(bucket, sources)),
3179 }
3180 }
3181
3182 fn iter_orders_in_bucket<'a>(
3187 &'a self,
3188 bucket: &'a AHashSet<ClientOrderId>,
3189 venue: Option<&Venue>,
3190 instrument_id: Option<&InstrumentId>,
3191 strategy_id: Option<&StrategyId>,
3192 account_id: Option<&AccountId>,
3193 ) -> Box<dyn Iterator<Item = ClientOrderId> + 'a> {
3194 match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
3195 FilterSources::Empty => Box::new(std::iter::empty()),
3196 FilterSources::Unfiltered => Box::new(bucket.iter().copied()),
3197 FilterSources::Sets(mut sources) => {
3198 sources.push(bucket);
3199 sources.sort_unstable_by_key(|s| s.len());
3200 let driver = sources[0];
3201 let rest: Vec<&'a AHashSet<ClientOrderId>> = sources[1..].to_vec();
3202 Box::new(
3203 driver
3204 .iter()
3205 .copied()
3206 .filter(move |id| rest.iter().all(|s| s.contains(id))),
3207 )
3208 }
3209 }
3210 }
3211
3212 fn iter_positions_in_bucket<'a>(
3213 &'a self,
3214 bucket: &'a AHashSet<PositionId>,
3215 venue: Option<&Venue>,
3216 instrument_id: Option<&InstrumentId>,
3217 strategy_id: Option<&StrategyId>,
3218 account_id: Option<&AccountId>,
3219 ) -> Box<dyn Iterator<Item = PositionId> + 'a> {
3220 match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
3221 FilterSources::Empty => Box::new(std::iter::empty()),
3222 FilterSources::Unfiltered => Box::new(bucket.iter().copied()),
3223 FilterSources::Sets(mut sources) => {
3224 sources.push(bucket);
3225 sources.sort_unstable_by_key(|s| s.len());
3226 let driver = sources[0];
3227 let rest: Vec<&'a AHashSet<PositionId>> = sources[1..].to_vec();
3228 Box::new(
3229 driver
3230 .iter()
3231 .copied()
3232 .filter(move |id| rest.iter().all(|s| s.contains(id))),
3233 )
3234 }
3235 }
3236 }
3237
3238 fn count_orders_in_bucket(
3244 &self,
3245 bucket: &AHashSet<ClientOrderId>,
3246 venue: Option<&Venue>,
3247 instrument_id: Option<&InstrumentId>,
3248 strategy_id: Option<&StrategyId>,
3249 account_id: Option<&AccountId>,
3250 side: Option<OrderSide>,
3251 ) -> usize {
3252 let side = side.unwrap_or(OrderSide::NoOrderSide);
3253
3254 match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
3255 FilterSources::Empty => 0,
3256 FilterSources::Unfiltered => {
3257 if side == OrderSide::NoOrderSide {
3258 bucket.len()
3259 } else {
3260 bucket
3261 .iter()
3262 .filter(|id| self.order_side_matches(id, side))
3263 .count()
3264 }
3265 }
3266 FilterSources::Sets(mut sources) => {
3267 sources.push(bucket);
3268 sources.sort_unstable_by_key(|s| s.len());
3269 let driver = sources[0];
3270 let rest = &sources[1..];
3271
3272 driver
3273 .iter()
3274 .filter(|id| rest.iter().all(|s| s.contains(id)))
3275 .filter(|id| {
3276 side == OrderSide::NoOrderSide || self.order_side_matches(id, side)
3277 })
3278 .count()
3279 }
3280 }
3281 }
3282
3283 fn count_positions_in_bucket(
3284 &self,
3285 bucket: &AHashSet<PositionId>,
3286 venue: Option<&Venue>,
3287 instrument_id: Option<&InstrumentId>,
3288 strategy_id: Option<&StrategyId>,
3289 account_id: Option<&AccountId>,
3290 side: Option<PositionSide>,
3291 ) -> usize {
3292 let side = side.unwrap_or(PositionSide::NoPositionSide);
3293
3294 match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
3295 FilterSources::Empty => 0,
3296 FilterSources::Unfiltered => {
3297 if side == PositionSide::NoPositionSide {
3298 bucket.len()
3299 } else {
3300 bucket
3301 .iter()
3302 .filter(|id| self.position_side_matches(id, side))
3303 .count()
3304 }
3305 }
3306 FilterSources::Sets(mut sources) => {
3307 sources.push(bucket);
3308 sources.sort_unstable_by_key(|s| s.len());
3309 let driver = sources[0];
3310 let rest = &sources[1..];
3311
3312 driver
3313 .iter()
3314 .filter(|id| rest.iter().all(|s| s.contains(id)))
3315 .filter(|id| {
3316 side == PositionSide::NoPositionSide || self.position_side_matches(id, side)
3317 })
3318 .count()
3319 }
3320 }
3321 }
3322
3323 fn any_orders_in_bucket(
3329 &self,
3330 bucket: &AHashSet<ClientOrderId>,
3331 venue: Option<&Venue>,
3332 instrument_id: Option<&InstrumentId>,
3333 strategy_id: Option<&StrategyId>,
3334 account_id: Option<&AccountId>,
3335 side: Option<OrderSide>,
3336 ) -> bool {
3337 let side = side.unwrap_or(OrderSide::NoOrderSide);
3338
3339 match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
3340 FilterSources::Empty => false,
3341 FilterSources::Unfiltered => {
3342 if side == OrderSide::NoOrderSide {
3343 !bucket.is_empty()
3344 } else {
3345 bucket.iter().any(|id| self.order_side_matches(id, side))
3346 }
3347 }
3348 FilterSources::Sets(mut sources) => {
3349 sources.push(bucket);
3350 sources.sort_unstable_by_key(|s| s.len());
3351 let driver = sources[0];
3352 let rest = &sources[1..];
3353
3354 driver
3355 .iter()
3356 .filter(|id| rest.iter().all(|s| s.contains(id)))
3357 .any(|id| side == OrderSide::NoOrderSide || self.order_side_matches(id, side))
3358 }
3359 }
3360 }
3361
3362 fn any_positions_in_bucket(
3363 &self,
3364 bucket: &AHashSet<PositionId>,
3365 venue: Option<&Venue>,
3366 instrument_id: Option<&InstrumentId>,
3367 strategy_id: Option<&StrategyId>,
3368 account_id: Option<&AccountId>,
3369 side: Option<PositionSide>,
3370 ) -> bool {
3371 let side = side.unwrap_or(PositionSide::NoPositionSide);
3372
3373 match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
3374 FilterSources::Empty => false,
3375 FilterSources::Unfiltered => {
3376 if side == PositionSide::NoPositionSide {
3377 !bucket.is_empty()
3378 } else {
3379 bucket.iter().any(|id| self.position_side_matches(id, side))
3380 }
3381 }
3382 FilterSources::Sets(mut sources) => {
3383 sources.push(bucket);
3384 sources.sort_unstable_by_key(|s| s.len());
3385 let driver = sources[0];
3386 let rest = &sources[1..];
3387
3388 driver
3389 .iter()
3390 .filter(|id| rest.iter().all(|s| s.contains(id)))
3391 .any(|id| {
3392 side == PositionSide::NoPositionSide || self.position_side_matches(id, side)
3393 })
3394 }
3395 }
3396 }
3397
3398 fn order_side_matches(&self, client_order_id: &ClientOrderId, side: OrderSide) -> bool {
3399 self.orders
3400 .get(client_order_id)
3401 .is_some_and(|cell| cell.borrow().order_side() == side)
3402 }
3403
3404 fn position_side_matches(&self, position_id: &PositionId, side: PositionSide) -> bool {
3405 self.positions
3406 .get(position_id)
3407 .is_some_and(|cell| cell.borrow().side == side)
3408 }
3409
3410 fn get_orders_for_ids(
3416 &self,
3417 client_order_ids: &AHashSet<ClientOrderId>,
3418 side: Option<OrderSide>,
3419 ) -> Vec<OrderRef<'_>> {
3420 let side = side.unwrap_or(OrderSide::NoOrderSide);
3421 let mut orders = Vec::new();
3422
3423 for client_order_id in client_order_ids {
3424 let order_cell = self
3425 .orders
3426 .get(client_order_id)
3427 .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
3428 let order = OrderRef::new(order_cell.borrow());
3429
3430 if side == OrderSide::NoOrderSide || side == order.order_side() {
3431 orders.push(order);
3432 }
3433 }
3434
3435 orders.sort_by_key(|o| o.client_order_id());
3438 orders
3439 }
3440
3441 fn get_positions_for_ids(
3451 &self,
3452 position_ids: &AHashSet<PositionId>,
3453 side: Option<PositionSide>,
3454 ) -> Vec<PositionRef<'_>> {
3455 let side = side.unwrap_or(PositionSide::NoPositionSide);
3456 let mut positions = Vec::new();
3457
3458 for position_id in position_ids {
3459 let position_cell = self
3460 .positions
3461 .get(position_id)
3462 .unwrap_or_else(|| panic!("Position {position_id} not found"));
3463 let position = PositionRef::new(position_cell.borrow());
3464
3465 if side == PositionSide::NoPositionSide || side == position.side {
3466 positions.push(position);
3467 }
3468 }
3469
3470 positions.sort_by_key(|p| p.id);
3473 positions
3474 }
3475
3476 #[must_use]
3478 pub fn client_order_ids(
3479 &self,
3480 venue: Option<&Venue>,
3481 instrument_id: Option<&InstrumentId>,
3482 strategy_id: Option<&StrategyId>,
3483 account_id: Option<&AccountId>,
3484 ) -> AHashSet<ClientOrderId> {
3485 self.query_orders_in_bucket(
3486 &self.index.orders,
3487 venue,
3488 instrument_id,
3489 strategy_id,
3490 account_id,
3491 )
3492 }
3493
3494 #[must_use]
3496 pub fn client_order_ids_open(
3497 &self,
3498 venue: Option<&Venue>,
3499 instrument_id: Option<&InstrumentId>,
3500 strategy_id: Option<&StrategyId>,
3501 account_id: Option<&AccountId>,
3502 ) -> AHashSet<ClientOrderId> {
3503 self.query_orders_in_bucket(
3504 &self.index.orders_open,
3505 venue,
3506 instrument_id,
3507 strategy_id,
3508 account_id,
3509 )
3510 }
3511
3512 #[must_use]
3514 pub fn client_order_ids_closed(
3515 &self,
3516 venue: Option<&Venue>,
3517 instrument_id: Option<&InstrumentId>,
3518 strategy_id: Option<&StrategyId>,
3519 account_id: Option<&AccountId>,
3520 ) -> AHashSet<ClientOrderId> {
3521 self.query_orders_in_bucket(
3522 &self.index.orders_closed,
3523 venue,
3524 instrument_id,
3525 strategy_id,
3526 account_id,
3527 )
3528 }
3529
3530 #[must_use]
3535 pub fn client_order_ids_active_local(
3536 &self,
3537 venue: Option<&Venue>,
3538 instrument_id: Option<&InstrumentId>,
3539 strategy_id: Option<&StrategyId>,
3540 account_id: Option<&AccountId>,
3541 ) -> AHashSet<ClientOrderId> {
3542 self.query_orders_in_bucket(
3543 &self.index.orders_active_local,
3544 venue,
3545 instrument_id,
3546 strategy_id,
3547 account_id,
3548 )
3549 }
3550
3551 #[must_use]
3553 pub fn client_order_ids_emulated(
3554 &self,
3555 venue: Option<&Venue>,
3556 instrument_id: Option<&InstrumentId>,
3557 strategy_id: Option<&StrategyId>,
3558 account_id: Option<&AccountId>,
3559 ) -> AHashSet<ClientOrderId> {
3560 self.query_orders_in_bucket(
3561 &self.index.orders_emulated,
3562 venue,
3563 instrument_id,
3564 strategy_id,
3565 account_id,
3566 )
3567 }
3568
3569 #[must_use]
3571 pub fn client_order_ids_inflight(
3572 &self,
3573 venue: Option<&Venue>,
3574 instrument_id: Option<&InstrumentId>,
3575 strategy_id: Option<&StrategyId>,
3576 account_id: Option<&AccountId>,
3577 ) -> AHashSet<ClientOrderId> {
3578 self.query_orders_in_bucket(
3579 &self.index.orders_inflight,
3580 venue,
3581 instrument_id,
3582 strategy_id,
3583 account_id,
3584 )
3585 }
3586
3587 #[must_use]
3589 pub fn position_ids(
3590 &self,
3591 venue: Option<&Venue>,
3592 instrument_id: Option<&InstrumentId>,
3593 strategy_id: Option<&StrategyId>,
3594 account_id: Option<&AccountId>,
3595 ) -> AHashSet<PositionId> {
3596 self.query_positions_in_bucket(
3597 &self.index.positions,
3598 venue,
3599 instrument_id,
3600 strategy_id,
3601 account_id,
3602 )
3603 }
3604
3605 #[must_use]
3607 pub fn position_open_ids(
3608 &self,
3609 venue: Option<&Venue>,
3610 instrument_id: Option<&InstrumentId>,
3611 strategy_id: Option<&StrategyId>,
3612 account_id: Option<&AccountId>,
3613 ) -> AHashSet<PositionId> {
3614 self.query_positions_in_bucket(
3615 &self.index.positions_open,
3616 venue,
3617 instrument_id,
3618 strategy_id,
3619 account_id,
3620 )
3621 }
3622
3623 #[must_use]
3625 pub fn position_closed_ids(
3626 &self,
3627 venue: Option<&Venue>,
3628 instrument_id: Option<&InstrumentId>,
3629 strategy_id: Option<&StrategyId>,
3630 account_id: Option<&AccountId>,
3631 ) -> AHashSet<PositionId> {
3632 self.query_positions_in_bucket(
3633 &self.index.positions_closed,
3634 venue,
3635 instrument_id,
3636 strategy_id,
3637 account_id,
3638 )
3639 }
3640
3641 #[must_use]
3648 pub fn client_order_ids_view(
3649 &self,
3650 venue: Option<&Venue>,
3651 instrument_id: Option<&InstrumentId>,
3652 strategy_id: Option<&StrategyId>,
3653 account_id: Option<&AccountId>,
3654 ) -> Cow<'_, AHashSet<ClientOrderId>> {
3655 self.view_orders_in_bucket(
3656 &self.index.orders,
3657 venue,
3658 instrument_id,
3659 strategy_id,
3660 account_id,
3661 )
3662 }
3663
3664 #[must_use]
3666 pub fn client_order_ids_open_view(
3667 &self,
3668 venue: Option<&Venue>,
3669 instrument_id: Option<&InstrumentId>,
3670 strategy_id: Option<&StrategyId>,
3671 account_id: Option<&AccountId>,
3672 ) -> Cow<'_, AHashSet<ClientOrderId>> {
3673 self.view_orders_in_bucket(
3674 &self.index.orders_open,
3675 venue,
3676 instrument_id,
3677 strategy_id,
3678 account_id,
3679 )
3680 }
3681
3682 #[must_use]
3684 pub fn client_order_ids_closed_view(
3685 &self,
3686 venue: Option<&Venue>,
3687 instrument_id: Option<&InstrumentId>,
3688 strategy_id: Option<&StrategyId>,
3689 account_id: Option<&AccountId>,
3690 ) -> Cow<'_, AHashSet<ClientOrderId>> {
3691 self.view_orders_in_bucket(
3692 &self.index.orders_closed,
3693 venue,
3694 instrument_id,
3695 strategy_id,
3696 account_id,
3697 )
3698 }
3699
3700 #[must_use]
3702 pub fn client_order_ids_active_local_view(
3703 &self,
3704 venue: Option<&Venue>,
3705 instrument_id: Option<&InstrumentId>,
3706 strategy_id: Option<&StrategyId>,
3707 account_id: Option<&AccountId>,
3708 ) -> Cow<'_, AHashSet<ClientOrderId>> {
3709 self.view_orders_in_bucket(
3710 &self.index.orders_active_local,
3711 venue,
3712 instrument_id,
3713 strategy_id,
3714 account_id,
3715 )
3716 }
3717
3718 #[must_use]
3720 pub fn client_order_ids_emulated_view(
3721 &self,
3722 venue: Option<&Venue>,
3723 instrument_id: Option<&InstrumentId>,
3724 strategy_id: Option<&StrategyId>,
3725 account_id: Option<&AccountId>,
3726 ) -> Cow<'_, AHashSet<ClientOrderId>> {
3727 self.view_orders_in_bucket(
3728 &self.index.orders_emulated,
3729 venue,
3730 instrument_id,
3731 strategy_id,
3732 account_id,
3733 )
3734 }
3735
3736 #[must_use]
3738 pub fn client_order_ids_inflight_view(
3739 &self,
3740 venue: Option<&Venue>,
3741 instrument_id: Option<&InstrumentId>,
3742 strategy_id: Option<&StrategyId>,
3743 account_id: Option<&AccountId>,
3744 ) -> Cow<'_, AHashSet<ClientOrderId>> {
3745 self.view_orders_in_bucket(
3746 &self.index.orders_inflight,
3747 venue,
3748 instrument_id,
3749 strategy_id,
3750 account_id,
3751 )
3752 }
3753
3754 #[must_use]
3756 pub fn position_ids_view(
3757 &self,
3758 venue: Option<&Venue>,
3759 instrument_id: Option<&InstrumentId>,
3760 strategy_id: Option<&StrategyId>,
3761 account_id: Option<&AccountId>,
3762 ) -> Cow<'_, AHashSet<PositionId>> {
3763 self.view_positions_in_bucket(
3764 &self.index.positions,
3765 venue,
3766 instrument_id,
3767 strategy_id,
3768 account_id,
3769 )
3770 }
3771
3772 #[must_use]
3774 pub fn position_open_ids_view(
3775 &self,
3776 venue: Option<&Venue>,
3777 instrument_id: Option<&InstrumentId>,
3778 strategy_id: Option<&StrategyId>,
3779 account_id: Option<&AccountId>,
3780 ) -> Cow<'_, AHashSet<PositionId>> {
3781 self.view_positions_in_bucket(
3782 &self.index.positions_open,
3783 venue,
3784 instrument_id,
3785 strategy_id,
3786 account_id,
3787 )
3788 }
3789
3790 #[must_use]
3792 pub fn position_closed_ids_view(
3793 &self,
3794 venue: Option<&Venue>,
3795 instrument_id: Option<&InstrumentId>,
3796 strategy_id: Option<&StrategyId>,
3797 account_id: Option<&AccountId>,
3798 ) -> Cow<'_, AHashSet<PositionId>> {
3799 self.view_positions_in_bucket(
3800 &self.index.positions_closed,
3801 venue,
3802 instrument_id,
3803 strategy_id,
3804 account_id,
3805 )
3806 }
3807
3808 pub fn iter_client_order_ids(
3814 &self,
3815 venue: Option<&Venue>,
3816 instrument_id: Option<&InstrumentId>,
3817 strategy_id: Option<&StrategyId>,
3818 account_id: Option<&AccountId>,
3819 ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3820 self.iter_orders_in_bucket(
3821 &self.index.orders,
3822 venue,
3823 instrument_id,
3824 strategy_id,
3825 account_id,
3826 )
3827 }
3828
3829 pub fn iter_client_order_ids_open(
3831 &self,
3832 venue: Option<&Venue>,
3833 instrument_id: Option<&InstrumentId>,
3834 strategy_id: Option<&StrategyId>,
3835 account_id: Option<&AccountId>,
3836 ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3837 self.iter_orders_in_bucket(
3838 &self.index.orders_open,
3839 venue,
3840 instrument_id,
3841 strategy_id,
3842 account_id,
3843 )
3844 }
3845
3846 pub fn iter_client_order_ids_closed(
3848 &self,
3849 venue: Option<&Venue>,
3850 instrument_id: Option<&InstrumentId>,
3851 strategy_id: Option<&StrategyId>,
3852 account_id: Option<&AccountId>,
3853 ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3854 self.iter_orders_in_bucket(
3855 &self.index.orders_closed,
3856 venue,
3857 instrument_id,
3858 strategy_id,
3859 account_id,
3860 )
3861 }
3862
3863 pub fn iter_client_order_ids_active_local(
3865 &self,
3866 venue: Option<&Venue>,
3867 instrument_id: Option<&InstrumentId>,
3868 strategy_id: Option<&StrategyId>,
3869 account_id: Option<&AccountId>,
3870 ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3871 self.iter_orders_in_bucket(
3872 &self.index.orders_active_local,
3873 venue,
3874 instrument_id,
3875 strategy_id,
3876 account_id,
3877 )
3878 }
3879
3880 pub fn iter_client_order_ids_emulated(
3882 &self,
3883 venue: Option<&Venue>,
3884 instrument_id: Option<&InstrumentId>,
3885 strategy_id: Option<&StrategyId>,
3886 account_id: Option<&AccountId>,
3887 ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3888 self.iter_orders_in_bucket(
3889 &self.index.orders_emulated,
3890 venue,
3891 instrument_id,
3892 strategy_id,
3893 account_id,
3894 )
3895 }
3896
3897 pub fn iter_client_order_ids_inflight(
3899 &self,
3900 venue: Option<&Venue>,
3901 instrument_id: Option<&InstrumentId>,
3902 strategy_id: Option<&StrategyId>,
3903 account_id: Option<&AccountId>,
3904 ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3905 self.iter_orders_in_bucket(
3906 &self.index.orders_inflight,
3907 venue,
3908 instrument_id,
3909 strategy_id,
3910 account_id,
3911 )
3912 }
3913
3914 pub fn iter_position_ids(
3916 &self,
3917 venue: Option<&Venue>,
3918 instrument_id: Option<&InstrumentId>,
3919 strategy_id: Option<&StrategyId>,
3920 account_id: Option<&AccountId>,
3921 ) -> Box<dyn Iterator<Item = PositionId> + '_> {
3922 self.iter_positions_in_bucket(
3923 &self.index.positions,
3924 venue,
3925 instrument_id,
3926 strategy_id,
3927 account_id,
3928 )
3929 }
3930
3931 pub fn iter_position_open_ids(
3933 &self,
3934 venue: Option<&Venue>,
3935 instrument_id: Option<&InstrumentId>,
3936 strategy_id: Option<&StrategyId>,
3937 account_id: Option<&AccountId>,
3938 ) -> Box<dyn Iterator<Item = PositionId> + '_> {
3939 self.iter_positions_in_bucket(
3940 &self.index.positions_open,
3941 venue,
3942 instrument_id,
3943 strategy_id,
3944 account_id,
3945 )
3946 }
3947
3948 pub fn iter_position_closed_ids(
3950 &self,
3951 venue: Option<&Venue>,
3952 instrument_id: Option<&InstrumentId>,
3953 strategy_id: Option<&StrategyId>,
3954 account_id: Option<&AccountId>,
3955 ) -> Box<dyn Iterator<Item = PositionId> + '_> {
3956 self.iter_positions_in_bucket(
3957 &self.index.positions_closed,
3958 venue,
3959 instrument_id,
3960 strategy_id,
3961 account_id,
3962 )
3963 }
3964
3965 #[must_use]
3967 pub fn actor_ids(&self) -> AHashSet<ComponentId> {
3968 self.index.actors.clone()
3969 }
3970
3971 #[must_use]
3973 pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
3974 self.index.strategies.clone()
3975 }
3976
3977 #[must_use]
3979 pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
3980 self.index.exec_algorithms.clone()
3981 }
3982
3983 #[must_use]
3992 pub fn order(&self, client_order_id: &ClientOrderId) -> Option<OrderRef<'_>> {
3993 self.orders
3994 .get(client_order_id)
3995 .map(|order_cell| OrderRef::new(order_cell.borrow()))
3996 }
3997
3998 #[must_use]
4008 pub fn order_mut(&mut self, client_order_id: &ClientOrderId) -> Option<OrderRefMut<'_>> {
4009 self.orders
4010 .get(client_order_id)
4011 .map(|order_cell| OrderRefMut::new(order_cell.borrow_mut()))
4012 }
4013
4014 #[must_use]
4019 pub fn order_owned(&self, client_order_id: &ClientOrderId) -> Option<OrderAny> {
4020 self.orders
4021 .get(client_order_id)
4022 .map(|order_cell| order_cell.borrow().clone())
4023 }
4024
4025 #[must_use]
4027 pub fn orders_for_ids(
4028 &self,
4029 client_order_ids: &[ClientOrderId],
4030 context: &dyn Display,
4031 ) -> Vec<OrderAny> {
4032 let mut orders = Vec::with_capacity(client_order_ids.len());
4033 for id in client_order_ids {
4034 match self.orders.get(id) {
4035 Some(order_cell) => orders.push(order_cell.borrow().clone()),
4036 None => log::error!("Order {id} not found in cache for {context}"),
4037 }
4038 }
4039 orders
4040 }
4041
4042 #[must_use]
4044 pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
4045 self.index.venue_order_ids.get(venue_order_id)
4046 }
4047
4048 #[must_use]
4050 pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
4051 self.index.client_order_ids.get(client_order_id)
4052 }
4053
4054 #[must_use]
4056 pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
4057 self.index.order_client.get(client_order_id)
4058 }
4059
4060 #[must_use]
4066 pub fn orders(
4067 &self,
4068 venue: Option<&Venue>,
4069 instrument_id: Option<&InstrumentId>,
4070 strategy_id: Option<&StrategyId>,
4071 account_id: Option<&AccountId>,
4072 side: Option<OrderSide>,
4073 ) -> Vec<OrderRef<'_>> {
4074 let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id, account_id);
4075 self.get_orders_for_ids(&client_order_ids, side)
4076 }
4077
4078 #[must_use]
4080 pub fn orders_open(
4081 &self,
4082 venue: Option<&Venue>,
4083 instrument_id: Option<&InstrumentId>,
4084 strategy_id: Option<&StrategyId>,
4085 account_id: Option<&AccountId>,
4086 side: Option<OrderSide>,
4087 ) -> Vec<OrderRef<'_>> {
4088 let client_order_ids =
4089 self.client_order_ids_open(venue, instrument_id, strategy_id, account_id);
4090 self.get_orders_for_ids(&client_order_ids, side)
4091 }
4092
4093 #[must_use]
4095 pub fn orders_closed(
4096 &self,
4097 venue: Option<&Venue>,
4098 instrument_id: Option<&InstrumentId>,
4099 strategy_id: Option<&StrategyId>,
4100 account_id: Option<&AccountId>,
4101 side: Option<OrderSide>,
4102 ) -> Vec<OrderRef<'_>> {
4103 let client_order_ids =
4104 self.client_order_ids_closed(venue, instrument_id, strategy_id, account_id);
4105 self.get_orders_for_ids(&client_order_ids, side)
4106 }
4107
4108 #[must_use]
4113 pub fn orders_active_local(
4114 &self,
4115 venue: Option<&Venue>,
4116 instrument_id: Option<&InstrumentId>,
4117 strategy_id: Option<&StrategyId>,
4118 account_id: Option<&AccountId>,
4119 side: Option<OrderSide>,
4120 ) -> Vec<OrderRef<'_>> {
4121 let client_order_ids =
4122 self.client_order_ids_active_local(venue, instrument_id, strategy_id, account_id);
4123 self.get_orders_for_ids(&client_order_ids, side)
4124 }
4125
4126 #[must_use]
4128 pub fn orders_emulated(
4129 &self,
4130 venue: Option<&Venue>,
4131 instrument_id: Option<&InstrumentId>,
4132 strategy_id: Option<&StrategyId>,
4133 account_id: Option<&AccountId>,
4134 side: Option<OrderSide>,
4135 ) -> Vec<OrderRef<'_>> {
4136 let client_order_ids =
4137 self.client_order_ids_emulated(venue, instrument_id, strategy_id, account_id);
4138 self.get_orders_for_ids(&client_order_ids, side)
4139 }
4140
4141 #[must_use]
4143 pub fn orders_inflight(
4144 &self,
4145 venue: Option<&Venue>,
4146 instrument_id: Option<&InstrumentId>,
4147 strategy_id: Option<&StrategyId>,
4148 account_id: Option<&AccountId>,
4149 side: Option<OrderSide>,
4150 ) -> Vec<OrderRef<'_>> {
4151 let client_order_ids =
4152 self.client_order_ids_inflight(venue, instrument_id, strategy_id, account_id);
4153 self.get_orders_for_ids(&client_order_ids, side)
4154 }
4155
4156 #[must_use]
4158 pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<OrderRef<'_>> {
4159 match self.index.position_orders.get(position_id) {
4160 Some(client_order_ids) => self.get_orders_for_ids(client_order_ids, None),
4161 None => Vec::new(),
4162 }
4163 }
4164
4165 #[must_use]
4167 pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
4168 self.index.orders.contains(client_order_id)
4169 }
4170
4171 #[must_use]
4173 pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
4174 self.index.orders_open.contains(client_order_id)
4175 }
4176
4177 #[must_use]
4179 pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
4180 self.index.orders_closed.contains(client_order_id)
4181 }
4182
4183 #[must_use]
4188 pub fn is_order_active_local(&self, client_order_id: &ClientOrderId) -> bool {
4189 self.index.orders_active_local.contains(client_order_id)
4190 }
4191
4192 #[must_use]
4194 pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
4195 self.index.orders_emulated.contains(client_order_id)
4196 }
4197
4198 #[must_use]
4200 pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
4201 self.index.orders_inflight.contains(client_order_id)
4202 }
4203
4204 #[must_use]
4206 pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
4207 self.index.orders_pending_cancel.contains(client_order_id)
4208 }
4209
4210 #[must_use]
4212 pub fn orders_open_count(
4213 &self,
4214 venue: Option<&Venue>,
4215 instrument_id: Option<&InstrumentId>,
4216 strategy_id: Option<&StrategyId>,
4217 account_id: Option<&AccountId>,
4218 side: Option<OrderSide>,
4219 ) -> usize {
4220 self.count_orders_in_bucket(
4221 &self.index.orders_open,
4222 venue,
4223 instrument_id,
4224 strategy_id,
4225 account_id,
4226 side,
4227 )
4228 }
4229
4230 #[must_use]
4232 pub fn orders_closed_count(
4233 &self,
4234 venue: Option<&Venue>,
4235 instrument_id: Option<&InstrumentId>,
4236 strategy_id: Option<&StrategyId>,
4237 account_id: Option<&AccountId>,
4238 side: Option<OrderSide>,
4239 ) -> usize {
4240 self.count_orders_in_bucket(
4241 &self.index.orders_closed,
4242 venue,
4243 instrument_id,
4244 strategy_id,
4245 account_id,
4246 side,
4247 )
4248 }
4249
4250 #[must_use]
4255 pub fn orders_active_local_count(
4256 &self,
4257 venue: Option<&Venue>,
4258 instrument_id: Option<&InstrumentId>,
4259 strategy_id: Option<&StrategyId>,
4260 account_id: Option<&AccountId>,
4261 side: Option<OrderSide>,
4262 ) -> usize {
4263 self.count_orders_in_bucket(
4264 &self.index.orders_active_local,
4265 venue,
4266 instrument_id,
4267 strategy_id,
4268 account_id,
4269 side,
4270 )
4271 }
4272
4273 #[must_use]
4275 pub fn orders_emulated_count(
4276 &self,
4277 venue: Option<&Venue>,
4278 instrument_id: Option<&InstrumentId>,
4279 strategy_id: Option<&StrategyId>,
4280 account_id: Option<&AccountId>,
4281 side: Option<OrderSide>,
4282 ) -> usize {
4283 self.count_orders_in_bucket(
4284 &self.index.orders_emulated,
4285 venue,
4286 instrument_id,
4287 strategy_id,
4288 account_id,
4289 side,
4290 )
4291 }
4292
4293 #[must_use]
4295 pub fn orders_inflight_count(
4296 &self,
4297 venue: Option<&Venue>,
4298 instrument_id: Option<&InstrumentId>,
4299 strategy_id: Option<&StrategyId>,
4300 account_id: Option<&AccountId>,
4301 side: Option<OrderSide>,
4302 ) -> usize {
4303 self.count_orders_in_bucket(
4304 &self.index.orders_inflight,
4305 venue,
4306 instrument_id,
4307 strategy_id,
4308 account_id,
4309 side,
4310 )
4311 }
4312
4313 #[must_use]
4315 pub fn orders_total_count(
4316 &self,
4317 venue: Option<&Venue>,
4318 instrument_id: Option<&InstrumentId>,
4319 strategy_id: Option<&StrategyId>,
4320 account_id: Option<&AccountId>,
4321 side: Option<OrderSide>,
4322 ) -> usize {
4323 self.count_orders_in_bucket(
4324 &self.index.orders,
4325 venue,
4326 instrument_id,
4327 strategy_id,
4328 account_id,
4329 side,
4330 )
4331 }
4332
4333 #[must_use]
4339 pub fn has_orders_open(
4340 &self,
4341 venue: Option<&Venue>,
4342 instrument_id: Option<&InstrumentId>,
4343 strategy_id: Option<&StrategyId>,
4344 account_id: Option<&AccountId>,
4345 side: Option<OrderSide>,
4346 ) -> bool {
4347 self.any_orders_in_bucket(
4348 &self.index.orders_open,
4349 venue,
4350 instrument_id,
4351 strategy_id,
4352 account_id,
4353 side,
4354 )
4355 }
4356
4357 #[must_use]
4359 pub fn has_orders_closed(
4360 &self,
4361 venue: Option<&Venue>,
4362 instrument_id: Option<&InstrumentId>,
4363 strategy_id: Option<&StrategyId>,
4364 account_id: Option<&AccountId>,
4365 side: Option<OrderSide>,
4366 ) -> bool {
4367 self.any_orders_in_bucket(
4368 &self.index.orders_closed,
4369 venue,
4370 instrument_id,
4371 strategy_id,
4372 account_id,
4373 side,
4374 )
4375 }
4376
4377 #[must_use]
4381 pub fn has_orders_active_local(
4382 &self,
4383 venue: Option<&Venue>,
4384 instrument_id: Option<&InstrumentId>,
4385 strategy_id: Option<&StrategyId>,
4386 account_id: Option<&AccountId>,
4387 side: Option<OrderSide>,
4388 ) -> bool {
4389 self.any_orders_in_bucket(
4390 &self.index.orders_active_local,
4391 venue,
4392 instrument_id,
4393 strategy_id,
4394 account_id,
4395 side,
4396 )
4397 }
4398
4399 #[must_use]
4401 pub fn has_orders_emulated(
4402 &self,
4403 venue: Option<&Venue>,
4404 instrument_id: Option<&InstrumentId>,
4405 strategy_id: Option<&StrategyId>,
4406 account_id: Option<&AccountId>,
4407 side: Option<OrderSide>,
4408 ) -> bool {
4409 self.any_orders_in_bucket(
4410 &self.index.orders_emulated,
4411 venue,
4412 instrument_id,
4413 strategy_id,
4414 account_id,
4415 side,
4416 )
4417 }
4418
4419 #[must_use]
4421 pub fn has_orders_inflight(
4422 &self,
4423 venue: Option<&Venue>,
4424 instrument_id: Option<&InstrumentId>,
4425 strategy_id: Option<&StrategyId>,
4426 account_id: Option<&AccountId>,
4427 side: Option<OrderSide>,
4428 ) -> bool {
4429 self.any_orders_in_bucket(
4430 &self.index.orders_inflight,
4431 venue,
4432 instrument_id,
4433 strategy_id,
4434 account_id,
4435 side,
4436 )
4437 }
4438
4439 #[must_use]
4441 pub fn has_orders(
4442 &self,
4443 venue: Option<&Venue>,
4444 instrument_id: Option<&InstrumentId>,
4445 strategy_id: Option<&StrategyId>,
4446 account_id: Option<&AccountId>,
4447 side: Option<OrderSide>,
4448 ) -> bool {
4449 self.any_orders_in_bucket(
4450 &self.index.orders,
4451 venue,
4452 instrument_id,
4453 strategy_id,
4454 account_id,
4455 side,
4456 )
4457 }
4458
4459 #[must_use]
4461 pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
4462 self.order_lists.get(order_list_id)
4463 }
4464
4465 #[must_use]
4467 pub fn order_lists(
4468 &self,
4469 venue: Option<&Venue>,
4470 instrument_id: Option<&InstrumentId>,
4471 strategy_id: Option<&StrategyId>,
4472 account_id: Option<&AccountId>,
4473 ) -> Vec<&OrderList> {
4474 let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
4475
4476 if let Some(venue) = venue {
4477 order_lists.retain(|ol| &ol.instrument_id.venue == venue);
4478 }
4479
4480 if let Some(instrument_id) = instrument_id {
4481 order_lists.retain(|ol| &ol.instrument_id == instrument_id);
4482 }
4483
4484 if let Some(strategy_id) = strategy_id {
4485 order_lists.retain(|ol| &ol.strategy_id == strategy_id);
4486 }
4487
4488 if let Some(account_id) = account_id {
4489 order_lists.retain(|ol| {
4490 ol.client_order_ids.iter().any(|client_order_id| {
4491 self.orders.get(client_order_id).is_some_and(|order_cell| {
4492 order_cell.borrow().account_id().as_ref() == Some(account_id)
4493 })
4494 })
4495 });
4496 }
4497
4498 order_lists
4499 }
4500
4501 #[must_use]
4503 pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
4504 self.order_lists.contains_key(order_list_id)
4505 }
4506
4507 #[must_use]
4512 pub fn orders_for_exec_algorithm(
4513 &self,
4514 exec_algorithm_id: &ExecAlgorithmId,
4515 venue: Option<&Venue>,
4516 instrument_id: Option<&InstrumentId>,
4517 strategy_id: Option<&StrategyId>,
4518 account_id: Option<&AccountId>,
4519 side: Option<OrderSide>,
4520 ) -> Vec<OrderRef<'_>> {
4521 let Some(exec_algorithm_order_ids) =
4522 self.index.exec_algorithm_orders.get(exec_algorithm_id)
4523 else {
4524 return Vec::new();
4525 };
4526
4527 let filtered = self.query_orders_in_bucket(
4528 exec_algorithm_order_ids,
4529 venue,
4530 instrument_id,
4531 strategy_id,
4532 account_id,
4533 );
4534 self.get_orders_for_ids(&filtered, side)
4535 }
4536
4537 #[must_use]
4539 pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<OrderRef<'_>> {
4540 match self.index.exec_spawn_orders.get(exec_spawn_id) {
4541 Some(ids) => self.get_orders_for_ids(ids, None),
4542 None => Vec::new(),
4543 }
4544 }
4545
4546 #[must_use]
4548 pub fn exec_spawn_total_quantity(
4549 &self,
4550 exec_spawn_id: &ClientOrderId,
4551 active_only: bool,
4552 ) -> Option<Quantity> {
4553 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
4554
4555 let mut total_quantity: Option<Quantity> = None;
4556
4557 for spawn_order in exec_spawn_orders {
4558 if active_only && spawn_order.is_closed() {
4559 continue;
4560 }
4561
4562 match total_quantity.as_mut() {
4563 Some(total) => *total = *total + spawn_order.quantity(),
4564 None => total_quantity = Some(spawn_order.quantity()),
4565 }
4566 }
4567
4568 total_quantity
4569 }
4570
4571 #[must_use]
4573 pub fn exec_spawn_total_filled_qty(
4574 &self,
4575 exec_spawn_id: &ClientOrderId,
4576 active_only: bool,
4577 ) -> Option<Quantity> {
4578 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
4579
4580 let mut total_quantity: Option<Quantity> = None;
4581
4582 for spawn_order in exec_spawn_orders {
4583 if active_only && spawn_order.is_closed() {
4584 continue;
4585 }
4586
4587 match total_quantity.as_mut() {
4588 Some(total) => *total = *total + spawn_order.filled_qty(),
4589 None => total_quantity = Some(spawn_order.filled_qty()),
4590 }
4591 }
4592
4593 total_quantity
4594 }
4595
4596 #[must_use]
4598 pub fn exec_spawn_total_leaves_qty(
4599 &self,
4600 exec_spawn_id: &ClientOrderId,
4601 active_only: bool,
4602 ) -> Option<Quantity> {
4603 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
4604
4605 let mut total_quantity: Option<Quantity> = None;
4606
4607 for spawn_order in exec_spawn_orders {
4608 if active_only && spawn_order.is_closed() {
4609 continue;
4610 }
4611
4612 match total_quantity.as_mut() {
4613 Some(total) => *total = *total + spawn_order.leaves_qty(),
4614 None => total_quantity = Some(spawn_order.leaves_qty()),
4615 }
4616 }
4617
4618 total_quantity
4619 }
4620
4621 #[must_use]
4625 pub fn position(&self, position_id: &PositionId) -> Option<PositionRef<'_>> {
4626 self.positions
4627 .get(position_id)
4628 .map(|position_cell| PositionRef::new(position_cell.borrow()))
4629 }
4630
4631 #[must_use]
4641 pub fn position_mut(&mut self, position_id: &PositionId) -> Option<PositionRefMut<'_>> {
4642 self.positions
4643 .get(position_id)
4644 .map(|position_cell| PositionRefMut::new(position_cell.borrow_mut()))
4645 }
4646
4647 #[must_use]
4652 pub fn position_owned(&self, position_id: &PositionId) -> Option<Position> {
4653 self.positions
4654 .get(position_id)
4655 .map(|position_cell| position_cell.borrow().clone())
4656 }
4657
4658 #[must_use]
4660 pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<PositionRef<'_>> {
4661 self.index
4662 .order_position
4663 .get(client_order_id)
4664 .and_then(|position_id| self.positions.get(position_id))
4665 .map(|position_cell| PositionRef::new(position_cell.borrow()))
4666 }
4667
4668 #[must_use]
4670 pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
4671 self.index.order_position.get(client_order_id)
4672 }
4673
4674 #[must_use]
4680 pub fn positions(
4681 &self,
4682 venue: Option<&Venue>,
4683 instrument_id: Option<&InstrumentId>,
4684 strategy_id: Option<&StrategyId>,
4685 account_id: Option<&AccountId>,
4686 side: Option<PositionSide>,
4687 ) -> Vec<PositionRef<'_>> {
4688 let position_ids = self.position_ids(venue, instrument_id, strategy_id, account_id);
4689 self.get_positions_for_ids(&position_ids, side)
4690 }
4691
4692 #[must_use]
4694 pub fn positions_open(
4695 &self,
4696 venue: Option<&Venue>,
4697 instrument_id: Option<&InstrumentId>,
4698 strategy_id: Option<&StrategyId>,
4699 account_id: Option<&AccountId>,
4700 side: Option<PositionSide>,
4701 ) -> Vec<PositionRef<'_>> {
4702 let position_ids = self.position_open_ids(venue, instrument_id, strategy_id, account_id);
4703 self.get_positions_for_ids(&position_ids, side)
4704 }
4705
4706 #[must_use]
4708 pub fn positions_closed(
4709 &self,
4710 venue: Option<&Venue>,
4711 instrument_id: Option<&InstrumentId>,
4712 strategy_id: Option<&StrategyId>,
4713 account_id: Option<&AccountId>,
4714 side: Option<PositionSide>,
4715 ) -> Vec<PositionRef<'_>> {
4716 let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id, account_id);
4717 self.get_positions_for_ids(&position_ids, side)
4718 }
4719
4720 #[must_use]
4722 pub fn position_exists(&self, position_id: &PositionId) -> bool {
4723 self.index.positions.contains(position_id)
4724 }
4725
4726 #[must_use]
4728 pub fn is_position_open(&self, position_id: &PositionId) -> bool {
4729 self.index.positions_open.contains(position_id)
4730 }
4731
4732 #[must_use]
4734 pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
4735 self.index.positions_closed.contains(position_id)
4736 }
4737
4738 #[must_use]
4740 pub fn positions_open_count(
4741 &self,
4742 venue: Option<&Venue>,
4743 instrument_id: Option<&InstrumentId>,
4744 strategy_id: Option<&StrategyId>,
4745 account_id: Option<&AccountId>,
4746 side: Option<PositionSide>,
4747 ) -> usize {
4748 self.count_positions_in_bucket(
4749 &self.index.positions_open,
4750 venue,
4751 instrument_id,
4752 strategy_id,
4753 account_id,
4754 side,
4755 )
4756 }
4757
4758 #[must_use]
4760 pub fn positions_closed_count(
4761 &self,
4762 venue: Option<&Venue>,
4763 instrument_id: Option<&InstrumentId>,
4764 strategy_id: Option<&StrategyId>,
4765 account_id: Option<&AccountId>,
4766 side: Option<PositionSide>,
4767 ) -> usize {
4768 self.count_positions_in_bucket(
4769 &self.index.positions_closed,
4770 venue,
4771 instrument_id,
4772 strategy_id,
4773 account_id,
4774 side,
4775 )
4776 }
4777
4778 #[must_use]
4780 pub fn positions_total_count(
4781 &self,
4782 venue: Option<&Venue>,
4783 instrument_id: Option<&InstrumentId>,
4784 strategy_id: Option<&StrategyId>,
4785 account_id: Option<&AccountId>,
4786 side: Option<PositionSide>,
4787 ) -> usize {
4788 self.count_positions_in_bucket(
4789 &self.index.positions,
4790 venue,
4791 instrument_id,
4792 strategy_id,
4793 account_id,
4794 side,
4795 )
4796 }
4797
4798 #[must_use]
4804 pub fn has_positions_open(
4805 &self,
4806 venue: Option<&Venue>,
4807 instrument_id: Option<&InstrumentId>,
4808 strategy_id: Option<&StrategyId>,
4809 account_id: Option<&AccountId>,
4810 side: Option<PositionSide>,
4811 ) -> bool {
4812 self.any_positions_in_bucket(
4813 &self.index.positions_open,
4814 venue,
4815 instrument_id,
4816 strategy_id,
4817 account_id,
4818 side,
4819 )
4820 }
4821
4822 #[must_use]
4824 pub fn has_positions_closed(
4825 &self,
4826 venue: Option<&Venue>,
4827 instrument_id: Option<&InstrumentId>,
4828 strategy_id: Option<&StrategyId>,
4829 account_id: Option<&AccountId>,
4830 side: Option<PositionSide>,
4831 ) -> bool {
4832 self.any_positions_in_bucket(
4833 &self.index.positions_closed,
4834 venue,
4835 instrument_id,
4836 strategy_id,
4837 account_id,
4838 side,
4839 )
4840 }
4841
4842 #[must_use]
4844 pub fn has_positions(
4845 &self,
4846 venue: Option<&Venue>,
4847 instrument_id: Option<&InstrumentId>,
4848 strategy_id: Option<&StrategyId>,
4849 account_id: Option<&AccountId>,
4850 side: Option<PositionSide>,
4851 ) -> bool {
4852 self.any_positions_in_bucket(
4853 &self.index.positions,
4854 venue,
4855 instrument_id,
4856 strategy_id,
4857 account_id,
4858 side,
4859 )
4860 }
4861
4862 #[must_use]
4866 pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
4867 self.index.order_strategy.get(client_order_id)
4868 }
4869
4870 #[must_use]
4872 pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
4873 self.index.position_strategy.get(position_id)
4874 }
4875
4876 pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
4884 check_valid_string_ascii(key, stringify!(key))?;
4885
4886 Ok(self.general.get(key))
4887 }
4888
4889 #[must_use]
4893 pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
4894 match price_type {
4895 PriceType::Bid => self
4896 .quotes
4897 .get(instrument_id)
4898 .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
4899 PriceType::Ask => self
4900 .quotes
4901 .get(instrument_id)
4902 .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
4903 PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
4904 quotes.front().map(|quote| {
4905 Price::new(
4906 f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
4907 quote.bid_price.precision + 1,
4908 )
4909 })
4910 }),
4911 PriceType::Last => self
4912 .trades
4913 .get(instrument_id)
4914 .and_then(|trades| trades.front().map(|trade| trade.price)),
4915 PriceType::Mark => self
4916 .mark_prices
4917 .get(instrument_id)
4918 .and_then(|marks| marks.front().map(|mark| mark.value)),
4919 }
4920 }
4921
4922 #[must_use]
4924 pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
4925 self.quotes
4926 .get(instrument_id)
4927 .map(|quotes| quotes.iter().copied().collect())
4928 }
4929
4930 #[must_use]
4932 pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
4933 self.trades
4934 .get(instrument_id)
4935 .map(|trades| trades.iter().copied().collect())
4936 }
4937
4938 #[must_use]
4940 pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
4941 self.mark_prices
4942 .get(instrument_id)
4943 .map(|mark_prices| mark_prices.iter().copied().collect())
4944 }
4945
4946 #[must_use]
4948 pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
4949 self.index_prices
4950 .get(instrument_id)
4951 .map(|index_prices| index_prices.iter().copied().collect())
4952 }
4953
4954 #[must_use]
4956 pub fn funding_rates(&self, instrument_id: &InstrumentId) -> Option<Vec<FundingRateUpdate>> {
4957 self.funding_rates
4958 .get(instrument_id)
4959 .map(|funding_rates| funding_rates.iter().copied().collect())
4960 }
4961
4962 #[must_use]
4964 pub fn instrument_statuses(
4965 &self,
4966 instrument_id: &InstrumentId,
4967 ) -> Option<Vec<InstrumentStatus>> {
4968 self.instrument_statuses
4969 .get(instrument_id)
4970 .map(|statuses| statuses.iter().copied().collect())
4971 }
4972
4973 #[must_use]
4975 pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
4976 self.bars
4977 .get(bar_type)
4978 .map(|bars| bars.iter().copied().collect())
4979 }
4980
4981 #[must_use]
4983 pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
4984 self.books.get(instrument_id)
4985 }
4986
4987 #[must_use]
4989 pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
4990 self.books.get_mut(instrument_id)
4991 }
4992
4993 #[must_use]
4995 pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
4996 self.own_books.get(instrument_id)
4997 }
4998
4999 #[must_use]
5001 pub fn own_order_book_mut(
5002 &mut self,
5003 instrument_id: &InstrumentId,
5004 ) -> Option<&mut OwnOrderBook> {
5005 self.own_books.get_mut(instrument_id)
5006 }
5007
5008 #[must_use]
5010 pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
5011 self.quotes
5012 .get(instrument_id)
5013 .and_then(|quotes| quotes.front())
5014 }
5015
5016 #[must_use]
5020 pub fn quote_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<&QuoteTick> {
5021 self.quotes
5022 .get(instrument_id)
5023 .and_then(|quotes| quotes.get(index))
5024 }
5025
5026 #[must_use]
5028 pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
5029 self.trades
5030 .get(instrument_id)
5031 .and_then(|trades| trades.front())
5032 }
5033
5034 #[must_use]
5038 pub fn trade_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<&TradeTick> {
5039 self.trades
5040 .get(instrument_id)
5041 .and_then(|trades| trades.get(index))
5042 }
5043
5044 #[must_use]
5046 pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
5047 self.mark_prices
5048 .get(instrument_id)
5049 .and_then(|mark_prices| mark_prices.front())
5050 }
5051
5052 #[must_use]
5054 pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
5055 self.index_prices
5056 .get(instrument_id)
5057 .and_then(|index_prices| index_prices.front())
5058 }
5059
5060 #[must_use]
5062 pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
5063 self.funding_rates
5064 .get(instrument_id)
5065 .and_then(|funding_rates| funding_rates.front())
5066 }
5067
5068 #[must_use]
5070 pub fn instrument_status(&self, instrument_id: &InstrumentId) -> Option<&InstrumentStatus> {
5071 self.instrument_statuses
5072 .get(instrument_id)
5073 .and_then(|statuses| statuses.front())
5074 }
5075
5076 #[must_use]
5078 pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
5079 self.bars.get(bar_type).and_then(|bars| bars.front())
5080 }
5081
5082 #[must_use]
5086 pub fn bar_at_index(&self, bar_type: &BarType, index: usize) -> Option<&Bar> {
5087 self.bars.get(bar_type).and_then(|bars| bars.get(index))
5088 }
5089
5090 #[must_use]
5092 pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
5093 self.books
5094 .get(instrument_id)
5095 .map_or(0, |book| book.update_count) as usize
5096 }
5097
5098 #[must_use]
5100 pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
5101 self.quotes
5102 .get(instrument_id)
5103 .map_or(0, std::collections::VecDeque::len)
5104 }
5105
5106 #[must_use]
5108 pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
5109 self.trades
5110 .get(instrument_id)
5111 .map_or(0, std::collections::VecDeque::len)
5112 }
5113
5114 #[must_use]
5116 pub fn bar_count(&self, bar_type: &BarType) -> usize {
5117 self.bars
5118 .get(bar_type)
5119 .map_or(0, std::collections::VecDeque::len)
5120 }
5121
5122 #[must_use]
5124 pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
5125 self.books.contains_key(instrument_id)
5126 }
5127
5128 #[must_use]
5130 pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
5131 self.quote_count(instrument_id) > 0
5132 }
5133
5134 #[must_use]
5136 pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
5137 self.trade_count(instrument_id) > 0
5138 }
5139
5140 #[must_use]
5142 pub fn has_bars(&self, bar_type: &BarType) -> bool {
5143 self.bar_count(bar_type) > 0
5144 }
5145
5146 #[must_use]
5147 pub fn get_xrate(
5148 &self,
5149 venue: Venue,
5150 from_currency: Currency,
5151 to_currency: Currency,
5152 price_type: PriceType,
5153 ) -> Option<f64> {
5154 if from_currency == to_currency {
5155 return Some(1.0);
5158 }
5159
5160 let (bid_quote, ask_quote) = self.build_quote_table(&venue);
5161
5162 match get_exchange_rate(
5163 from_currency.code,
5164 to_currency.code,
5165 price_type,
5166 bid_quote,
5167 ask_quote,
5168 ) {
5169 Ok(rate) => rate,
5170 Err(e) => {
5171 log::error!("Failed to calculate xrate: {e}");
5172 None
5173 }
5174 }
5175 }
5176
5177 fn build_quote_table(&self, venue: &Venue) -> (AHashMap<String, f64>, AHashMap<String, f64>) {
5178 let mut bid_quotes = AHashMap::new();
5179 let mut ask_quotes = AHashMap::new();
5180
5181 for instrument_id in self.instruments.keys() {
5182 if instrument_id.venue != *venue {
5183 continue;
5184 }
5185
5186 let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
5187 if let Some(tick) = ticks.front() {
5188 (tick.bid_price, tick.ask_price)
5189 } else {
5190 continue; }
5192 } else {
5193 let bid_bar = self
5194 .bars
5195 .iter()
5196 .find(|(k, _)| {
5197 k.instrument_id() == *instrument_id
5198 && matches!(k.spec().price_type, PriceType::Bid)
5199 })
5200 .map(|(_, v)| v);
5201
5202 let ask_bar = self
5203 .bars
5204 .iter()
5205 .find(|(k, _)| {
5206 k.instrument_id() == *instrument_id
5207 && matches!(k.spec().price_type, PriceType::Ask)
5208 })
5209 .map(|(_, v)| v);
5210
5211 match (bid_bar, ask_bar) {
5212 (Some(bid), Some(ask)) => {
5213 match (bid.front(), ask.front()) {
5214 (Some(bid_bar), Some(ask_bar)) => (bid_bar.close, ask_bar.close),
5215 _ => {
5216 continue;
5218 }
5219 }
5220 }
5221 _ => continue,
5222 }
5223 };
5224
5225 bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
5226 ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
5227 }
5228
5229 (bid_quotes, ask_quotes)
5230 }
5231
5232 #[must_use]
5234 pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
5235 self.mark_xrates.get(&(from_currency, to_currency)).copied()
5236 }
5237
5238 pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
5244 assert!(xrate > 0.0, "xrate was zero");
5245 self.mark_xrates.insert((from_currency, to_currency), xrate);
5246 self.mark_xrates
5247 .insert((to_currency, from_currency), 1.0 / xrate);
5248 }
5249
5250 pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
5252 let _ = self.mark_xrates.remove(&(from_currency, to_currency));
5253 }
5254
5255 pub fn clear_mark_xrates(&mut self) {
5257 self.mark_xrates.clear();
5258 }
5259
5260 #[must_use]
5264 pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
5265 self.instruments.get(instrument_id)
5266 }
5267
5268 #[must_use]
5270 pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
5271 match venue {
5272 Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
5273 None => self.instruments.keys().collect(),
5274 }
5275 }
5276
5277 #[must_use]
5279 pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
5280 self.instruments
5281 .values()
5282 .filter(|i| &i.id().venue == venue)
5283 .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
5284 .collect()
5285 }
5286
5287 #[must_use]
5294 pub fn instruments_by_parent(
5295 &self,
5296 venue: &Venue,
5297 root: &Ustr,
5298 class: InstrumentClass,
5299 ) -> Vec<&InstrumentAny> {
5300 self.instruments
5301 .values()
5302 .filter(|i| &i.id().venue == venue)
5303 .filter(|i| i.underlying() == Some(*root))
5304 .filter(|i| i.instrument_class() == class)
5305 .collect()
5306 }
5307
5308 #[must_use]
5310 pub fn bar_types(
5311 &self,
5312 instrument_id: Option<&InstrumentId>,
5313 price_type: Option<&PriceType>,
5314 aggregation_source: AggregationSource,
5315 ) -> Vec<&BarType> {
5316 let mut bar_types = self
5317 .bars
5318 .keys()
5319 .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
5320 .collect::<Vec<&BarType>>();
5321
5322 if let Some(instrument_id) = instrument_id {
5323 bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
5324 }
5325
5326 if let Some(price_type) = price_type {
5327 bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
5328 }
5329
5330 bar_types
5331 }
5332
5333 #[must_use]
5337 pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
5338 self.synthetics.get(instrument_id)
5339 }
5340
5341 #[must_use]
5343 pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
5344 self.synthetics.keys().collect()
5345 }
5346
5347 #[must_use]
5349 pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
5350 self.synthetics.values().collect()
5351 }
5352
5353 #[must_use]
5357 pub fn account(&self, account_id: &AccountId) -> Option<AccountRef<'_>> {
5358 self.accounts
5359 .get(account_id)
5360 .map(|account_cell| AccountRef::new(account_cell.borrow()))
5361 }
5362
5363 #[must_use]
5373 pub fn account_mut(&mut self, account_id: &AccountId) -> Option<AccountRefMut<'_>> {
5374 self.accounts
5375 .get(account_id)
5376 .map(|account_cell| AccountRefMut::new(account_cell.borrow_mut()))
5377 }
5378
5379 #[must_use]
5384 pub fn account_owned(&self, account_id: &AccountId) -> Option<AccountAny> {
5385 self.accounts
5386 .get(account_id)
5387 .map(|account_cell| account_cell.borrow().clone())
5388 }
5389
5390 #[must_use]
5392 pub fn account_for_venue(&self, venue: &Venue) -> Option<AccountRef<'_>> {
5393 self.index
5394 .venue_account
5395 .get(venue)
5396 .and_then(|account_id| self.accounts.get(account_id))
5397 .map(|account_cell| AccountRef::new(account_cell.borrow()))
5398 }
5399
5400 #[must_use]
5405 pub fn account_for_venue_owned(&self, venue: &Venue) -> Option<AccountAny> {
5406 self.index
5407 .venue_account
5408 .get(venue)
5409 .and_then(|account_id| self.accounts.get(account_id))
5410 .map(|account_cell| account_cell.borrow().clone())
5411 }
5412
5413 #[must_use]
5415 pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
5416 self.index.venue_account.get(venue)
5417 }
5418
5419 #[must_use]
5425 pub fn accounts(&self, account_id: &AccountId) -> Vec<AccountRef<'_>> {
5426 self.accounts
5427 .values()
5428 .filter(|account_cell| &account_cell.borrow().id() == account_id)
5429 .map(|account_cell| AccountRef::new(account_cell.borrow()))
5430 .collect()
5431 }
5432
5433 pub fn update_own_order_book(&mut self, order: &OrderAny) {
5441 if !order.has_price() {
5442 return;
5443 }
5444
5445 let instrument_id = order.instrument_id();
5446
5447 if !self.own_books.contains_key(&instrument_id) {
5448 if order.is_closed() {
5449 return;
5450 }
5451
5452 self.own_books
5453 .insert(instrument_id, OwnOrderBook::new(instrument_id));
5454 }
5455
5456 let Some(own_book) = self.own_books.get_mut(&instrument_id) else {
5457 return;
5458 };
5459
5460 let own_book_order = order.to_own_book_order();
5461
5462 if order.is_closed() {
5463 if let Err(e) = own_book.delete(own_book_order) {
5464 log::debug!(
5465 "Failed to delete order {} from own book: {e}",
5466 order.client_order_id(),
5467 );
5468 } else {
5469 log::debug!("Deleted order {} from own book", order.client_order_id());
5470 }
5471 } else {
5472 if let Err(e) = own_book.update(own_book_order) {
5474 log::debug!(
5475 "Failed to update order {} in own book: {e}; inserting instead",
5476 order.client_order_id(),
5477 );
5478 own_book.add(own_book_order);
5479 }
5480 log::debug!("Updated order {} in own book", order.client_order_id());
5481 }
5482 }
5483
5484 pub fn force_remove_from_own_order_book(&mut self, client_order_id: &ClientOrderId) {
5490 let Some(order_cell) = self.orders.get(client_order_id) else {
5491 return;
5492 };
5493 let order = order_cell.borrow();
5494 let instrument_id = order.instrument_id();
5495 let own_book_order = if order.has_price() {
5496 Some(order.to_own_book_order())
5497 } else {
5498 None
5499 };
5500 drop(order);
5501
5502 self.index.orders_open.remove(client_order_id);
5503 self.index.orders_pending_cancel.remove(client_order_id);
5504 self.index.orders_inflight.remove(client_order_id);
5505 self.index.orders_emulated.remove(client_order_id);
5506 self.index.orders_active_local.remove(client_order_id);
5507
5508 if let Some(own_book) = self.own_books.get_mut(&instrument_id)
5509 && let Some(own_book_order) = own_book_order
5510 {
5511 if let Err(e) = own_book.delete(own_book_order) {
5512 log::debug!("Could not force delete {client_order_id} from own book: {e}");
5513 } else {
5514 log::debug!("Force deleted {client_order_id} from own book");
5515 }
5516 }
5517
5518 self.index.orders_closed.insert(*client_order_id);
5519 }
5520
5521 pub fn audit_own_order_books(&mut self) {
5528 log::debug!("Starting own books audit");
5529 let start = std::time::Instant::now();
5530
5531 let valid_order_ids: AHashSet<ClientOrderId> = self
5534 .index
5535 .orders_open
5536 .union(&self.index.orders_inflight)
5537 .copied()
5538 .collect();
5539
5540 for own_book in self.own_books.values_mut() {
5541 own_book.audit_open_orders(&valid_order_ids);
5542 }
5543
5544 log::debug!("Completed own books audit in {:?}", start.elapsed());
5545 }
5546}
5547
5548fn parse_position_snapshot_blob_ref(blob_ref: &str) -> anyhow::Result<(PositionId, usize)> {
5549 let Some(rest) = blob_ref.strip_prefix("cache://position-snapshots/") else {
5550 anyhow::bail!("unsupported cache snapshot blob_ref {blob_ref}");
5551 };
5552
5553 let Some((position_id, snapshot_index)) = rest.rsplit_once('/') else {
5554 anyhow::bail!("malformed position snapshot blob_ref {blob_ref}");
5555 };
5556
5557 if position_id.is_empty() {
5558 anyhow::bail!("position snapshot blob_ref {blob_ref} has empty position id");
5559 }
5560
5561 let snapshot_index = snapshot_index.parse::<usize>().map_err(|e| {
5562 anyhow::anyhow!("position snapshot blob_ref {blob_ref} has invalid frame index: {e}")
5563 })?;
5564
5565 Ok((PositionId::new(position_id), snapshot_index))
5566}
5567
5568fn validate_position_snapshot_blob(position_id: &PositionId, blob: &[u8]) -> anyhow::Result<()> {
5569 let snapshot = serde_json::from_slice::<Position>(blob)?;
5570 let expected_prefix = format!("{}-", position_id.as_str());
5571
5572 let Some(snapshot_uuid) = snapshot.id.as_str().strip_prefix(&expected_prefix) else {
5573 anyhow::bail!(
5574 "position snapshot id {} does not match blob_ref position {position_id}",
5575 snapshot.id
5576 );
5577 };
5578
5579 if UUID4::from_str(snapshot_uuid).is_err() {
5580 anyhow::bail!(
5581 "position snapshot id {} does not match blob_ref position {position_id}",
5582 snapshot.id
5583 );
5584 }
5585
5586 Ok(())
5587}