1pub mod config;
21pub mod database;
22pub mod fifo;
23pub mod quote;
24
25mod index;
26
27#[cfg(test)]
28mod tests;
29
30use std::{
31 collections::VecDeque,
32 fmt::{Debug, Display},
33 time::{SystemTime, UNIX_EPOCH},
34};
35
36use ahash::{AHashMap, AHashSet};
37use bytes::Bytes;
38pub use config::CacheConfig; use database::{CacheDatabaseAdapter, CacheMap};
40use index::CacheIndex;
41use nautilus_core::{
42 UUID4, UnixNanos,
43 correctness::{
44 check_key_not_in_map, check_predicate_false, check_slice_not_empty,
45 check_valid_string_ascii,
46 },
47 datetime::secs_to_nanos_unchecked,
48};
49use nautilus_model::{
50 accounts::{Account, AccountAny},
51 data::{
52 Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate, QuoteTick,
53 TradeTick, YieldCurveData, option_chain::OptionGreeks,
54 },
55 enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
56 identifiers::{
57 AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
58 OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
59 },
60 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
61 orderbook::{
62 OrderBook,
63 own::{OwnOrderBook, should_handle_own_book_order},
64 },
65 orders::{Order, OrderAny, OrderList},
66 position::Position,
67 types::{Currency, Money, Price, Quantity},
68};
69use ustr::Ustr;
70
71use crate::xrate::get_exchange_rate;
72
73#[cfg_attr(
75 feature = "python",
76 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
77)]
78pub struct Cache {
79 config: CacheConfig,
80 index: CacheIndex,
81 database: Option<Box<dyn CacheDatabaseAdapter>>,
82 general: AHashMap<String, Bytes>,
83 currencies: AHashMap<Ustr, Currency>,
84 instruments: AHashMap<InstrumentId, InstrumentAny>,
85 synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
86 books: AHashMap<InstrumentId, OrderBook>,
87 own_books: AHashMap<InstrumentId, OwnOrderBook>,
88 quotes: AHashMap<InstrumentId, VecDeque<QuoteTick>>,
89 trades: AHashMap<InstrumentId, VecDeque<TradeTick>>,
90 mark_xrates: AHashMap<(Currency, Currency), f64>,
91 mark_prices: AHashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
92 index_prices: AHashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
93 funding_rates: AHashMap<InstrumentId, VecDeque<FundingRateUpdate>>,
94 bars: AHashMap<BarType, VecDeque<Bar>>,
95 greeks: AHashMap<InstrumentId, GreeksData>,
96 option_greeks: AHashMap<InstrumentId, OptionGreeks>,
97 yield_curves: AHashMap<String, YieldCurveData>,
98 accounts: AHashMap<AccountId, AccountAny>,
99 orders: AHashMap<ClientOrderId, OrderAny>,
100 order_lists: AHashMap<OrderListId, OrderList>,
101 positions: AHashMap<PositionId, Position>,
102 position_snapshots: AHashMap<PositionId, Bytes>,
103 #[cfg(feature = "defi")]
104 pub(crate) defi: crate::defi::cache::DefiCache,
105}
106
107impl Debug for Cache {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 f.debug_struct(stringify!(Cache))
110 .field("config", &self.config)
111 .field("index", &self.index)
112 .field("general", &self.general)
113 .field("currencies", &self.currencies)
114 .field("instruments", &self.instruments)
115 .field("synthetics", &self.synthetics)
116 .field("books", &self.books)
117 .field("own_books", &self.own_books)
118 .field("quotes", &self.quotes)
119 .field("trades", &self.trades)
120 .field("mark_xrates", &self.mark_xrates)
121 .field("mark_prices", &self.mark_prices)
122 .field("index_prices", &self.index_prices)
123 .field("funding_rates", &self.funding_rates)
124 .field("bars", &self.bars)
125 .field("greeks", &self.greeks)
126 .field("option_greeks", &self.option_greeks)
127 .field("yield_curves", &self.yield_curves)
128 .field("accounts", &self.accounts)
129 .field("orders", &self.orders)
130 .field("order_lists", &self.order_lists)
131 .field("positions", &self.positions)
132 .field("position_snapshots", &self.position_snapshots)
133 .finish()
134 }
135}
136
137impl Default for Cache {
138 fn default() -> Self {
140 Self::new(Some(CacheConfig::default()), None)
141 }
142}
143
144impl Cache {
145 #[must_use]
147 pub fn new(
151 config: Option<CacheConfig>,
152 database: Option<Box<dyn CacheDatabaseAdapter>>,
153 ) -> Self {
154 Self {
155 config: config.unwrap_or_default(),
156 index: CacheIndex::default(),
157 database,
158 general: AHashMap::new(),
159 currencies: AHashMap::new(),
160 instruments: AHashMap::new(),
161 synthetics: AHashMap::new(),
162 books: AHashMap::new(),
163 own_books: AHashMap::new(),
164 quotes: AHashMap::new(),
165 trades: AHashMap::new(),
166 mark_xrates: AHashMap::new(),
167 mark_prices: AHashMap::new(),
168 index_prices: AHashMap::new(),
169 funding_rates: AHashMap::new(),
170 bars: AHashMap::new(),
171 greeks: AHashMap::new(),
172 option_greeks: AHashMap::new(),
173 yield_curves: AHashMap::new(),
174 accounts: AHashMap::new(),
175 orders: AHashMap::new(),
176 order_lists: AHashMap::new(),
177 positions: AHashMap::new(),
178 position_snapshots: AHashMap::new(),
179 #[cfg(feature = "defi")]
180 defi: crate::defi::cache::DefiCache::default(),
181 }
182 }
183
184 #[must_use]
186 pub fn memory_address(&self) -> String {
187 format!("{:?}", std::ptr::from_ref(self))
188 }
189
190 pub fn set_database(&mut self, database: Box<dyn CacheDatabaseAdapter>) {
194 let type_name = std::any::type_name_of_val(&*database);
195 log::info!("Cache database adapter set: {type_name}");
196 self.database = Some(database);
197 }
198
199 pub fn cache_general(&mut self) -> anyhow::Result<()> {
207 self.general = match &mut self.database {
208 Some(db) => db.load()?,
209 None => AHashMap::new(),
210 };
211
212 log::info!(
213 "Cached {} general object(s) from database",
214 self.general.len()
215 );
216 Ok(())
217 }
218
219 pub async fn cache_all(&mut self) -> anyhow::Result<()> {
225 let cache_map = match &self.database {
226 Some(db) => db.load_all().await?,
227 None => CacheMap::default(),
228 };
229
230 self.currencies = cache_map.currencies;
231 self.instruments = cache_map.instruments;
232 self.synthetics = cache_map.synthetics;
233 self.accounts = cache_map.accounts;
234 self.orders = cache_map.orders;
235 self.positions = cache_map.positions;
236 Ok(())
237 }
238
239 pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
245 self.currencies = match &mut self.database {
246 Some(db) => db.load_currencies().await?,
247 None => AHashMap::new(),
248 };
249
250 log::info!("Cached {} currencies from database", self.general.len());
251 Ok(())
252 }
253
254 pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
260 self.instruments = match &mut self.database {
261 Some(db) => db.load_instruments().await?,
262 None => AHashMap::new(),
263 };
264
265 log::info!("Cached {} instruments from database", self.general.len());
266 Ok(())
267 }
268
269 pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
275 self.synthetics = match &mut self.database {
276 Some(db) => db.load_synthetics().await?,
277 None => AHashMap::new(),
278 };
279
280 log::info!(
281 "Cached {} synthetic instruments from database",
282 self.general.len()
283 );
284 Ok(())
285 }
286
287 pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
293 self.accounts = match &mut self.database {
294 Some(db) => db.load_accounts().await?,
295 None => AHashMap::new(),
296 };
297
298 log::info!(
299 "Cached {} synthetic instruments from database",
300 self.general.len()
301 );
302 Ok(())
303 }
304
305 pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
311 self.orders = match &mut self.database {
312 Some(db) => db.load_orders().await?,
313 None => AHashMap::new(),
314 };
315
316 log::info!("Cached {} orders from database", self.general.len());
317 Ok(())
318 }
319
320 pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
326 self.positions = match &mut self.database {
327 Some(db) => db.load_positions().await?,
328 None => AHashMap::new(),
329 };
330
331 log::info!("Cached {} positions from database", self.general.len());
332 Ok(())
333 }
334
335 pub fn build_index(&mut self) {
337 log::debug!("Building index");
338
339 for account_id in self.accounts.keys() {
341 self.index
342 .venue_account
343 .insert(account_id.get_issuer(), *account_id);
344 }
345
346 for (client_order_id, order) in &self.orders {
348 let instrument_id = order.instrument_id();
349 let venue = instrument_id.venue;
350 let strategy_id = order.strategy_id();
351
352 self.index
354 .venue_orders
355 .entry(venue)
356 .or_default()
357 .insert(*client_order_id);
358
359 if let Some(venue_order_id) = order.venue_order_id() {
361 self.index
362 .venue_order_ids
363 .insert(venue_order_id, *client_order_id);
364 }
365
366 if let Some(position_id) = order.position_id() {
368 self.index
369 .order_position
370 .insert(*client_order_id, position_id);
371 }
372
373 self.index
375 .order_strategy
376 .insert(*client_order_id, order.strategy_id());
377
378 self.index
380 .instrument_orders
381 .entry(instrument_id)
382 .or_default()
383 .insert(*client_order_id);
384
385 self.index
387 .strategy_orders
388 .entry(strategy_id)
389 .or_default()
390 .insert(*client_order_id);
391
392 if let Some(account_id) = order.account_id() {
394 self.index
395 .account_orders
396 .entry(account_id)
397 .or_default()
398 .insert(*client_order_id);
399 }
400
401 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
403 self.index
404 .exec_algorithm_orders
405 .entry(exec_algorithm_id)
406 .or_default()
407 .insert(*client_order_id);
408 }
409
410 if let Some(exec_spawn_id) = order.exec_spawn_id() {
412 self.index
413 .exec_spawn_orders
414 .entry(exec_spawn_id)
415 .or_default()
416 .insert(*client_order_id);
417 }
418
419 self.index.orders.insert(*client_order_id);
421
422 if order.is_active_local() {
424 self.index.orders_active_local.insert(*client_order_id);
425 }
426
427 if order.is_open() {
429 self.index.orders_open.insert(*client_order_id);
430 }
431
432 if order.is_closed() {
434 self.index.orders_closed.insert(*client_order_id);
435 }
436
437 if let Some(emulation_trigger) = order.emulation_trigger()
439 && emulation_trigger != TriggerType::NoTrigger
440 && !order.is_closed()
441 {
442 self.index.orders_emulated.insert(*client_order_id);
443 }
444
445 if order.is_inflight() {
447 self.index.orders_inflight.insert(*client_order_id);
448 }
449
450 self.index.strategies.insert(strategy_id);
452
453 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
455 self.index.exec_algorithms.insert(exec_algorithm_id);
456 }
457 }
458
459 for (position_id, position) in &self.positions {
461 let instrument_id = position.instrument_id;
462 let venue = instrument_id.venue;
463 let strategy_id = position.strategy_id;
464
465 self.index
467 .venue_positions
468 .entry(venue)
469 .or_default()
470 .insert(*position_id);
471
472 self.index
474 .position_strategy
475 .insert(*position_id, position.strategy_id);
476
477 self.index
479 .position_orders
480 .entry(*position_id)
481 .or_default()
482 .extend(position.client_order_ids());
483
484 self.index
486 .instrument_positions
487 .entry(instrument_id)
488 .or_default()
489 .insert(*position_id);
490
491 self.index
493 .strategy_positions
494 .entry(strategy_id)
495 .or_default()
496 .insert(*position_id);
497
498 self.index
500 .account_positions
501 .entry(position.account_id)
502 .or_default()
503 .insert(*position_id);
504
505 self.index.positions.insert(*position_id);
507
508 if position.is_open() {
510 self.index.positions_open.insert(*position_id);
511 }
512
513 if position.is_closed() {
515 self.index.positions_closed.insert(*position_id);
516 }
517
518 self.index.strategies.insert(strategy_id);
520 }
521 }
522
523 #[must_use]
525 pub const fn has_backing(&self) -> bool {
526 self.config.database.is_some()
527 }
528
529 #[must_use]
531 pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
532 let quote = if let Some(quote) = self.quote(&position.instrument_id) {
533 quote
534 } else {
535 log::warn!(
536 "Cannot calculate unrealized PnL for {}, no quotes for {}",
537 position.id,
538 position.instrument_id
539 );
540 return None;
541 };
542
543 let last = match position.side {
545 PositionSide::Flat | PositionSide::NoPositionSide => {
546 return Some(Money::new(0.0, position.settlement_currency));
547 }
548 PositionSide::Long => quote.bid_price,
549 PositionSide::Short => quote.ask_price,
550 };
551
552 Some(position.unrealized_pnl(last))
553 }
554
555 #[must_use]
564 pub fn check_integrity(&mut self) -> bool {
565 let mut error_count = 0;
566 let failure = "Integrity failure";
567
568 let timestamp_us = SystemTime::now()
570 .duration_since(UNIX_EPOCH)
571 .expect("Time went backwards")
572 .as_micros();
573
574 log::info!("Checking data integrity");
575
576 for account_id in self.accounts.keys() {
578 if !self
579 .index
580 .venue_account
581 .contains_key(&account_id.get_issuer())
582 {
583 log::error!(
584 "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
585 );
586 error_count += 1;
587 }
588 }
589
590 for (client_order_id, order) in &self.orders {
591 if !self.index.order_strategy.contains_key(client_order_id) {
592 log::error!(
593 "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
594 );
595 error_count += 1;
596 }
597
598 if !self.index.orders.contains(client_order_id) {
599 log::error!(
600 "{failure} in orders: {client_order_id} not found in `self.index.orders`",
601 );
602 error_count += 1;
603 }
604
605 if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
606 log::error!(
607 "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
608 );
609 error_count += 1;
610 }
611
612 if order.is_active_local() && !self.index.orders_active_local.contains(client_order_id)
613 {
614 log::error!(
615 "{failure} in orders: {client_order_id} not found in `self.index.orders_active_local`",
616 );
617 error_count += 1;
618 }
619
620 if order.is_open() && !self.index.orders_open.contains(client_order_id) {
621 log::error!(
622 "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
623 );
624 error_count += 1;
625 }
626
627 if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
628 log::error!(
629 "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
630 );
631 error_count += 1;
632 }
633
634 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
635 if !self
636 .index
637 .exec_algorithm_orders
638 .contains_key(&exec_algorithm_id)
639 {
640 log::error!(
641 "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
642 );
643 error_count += 1;
644 }
645
646 if order.exec_spawn_id().is_none()
647 && !self.index.exec_spawn_orders.contains_key(client_order_id)
648 {
649 log::error!(
650 "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
651 );
652 error_count += 1;
653 }
654 }
655 }
656
657 for (position_id, position) in &self.positions {
658 if !self.index.position_strategy.contains_key(position_id) {
659 log::error!(
660 "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
661 );
662 error_count += 1;
663 }
664
665 if !self.index.position_orders.contains_key(position_id) {
666 log::error!(
667 "{failure} in positions: {position_id} not found in `self.index.position_orders`",
668 );
669 error_count += 1;
670 }
671
672 if !self.index.positions.contains(position_id) {
673 log::error!(
674 "{failure} in positions: {position_id} not found in `self.index.positions`",
675 );
676 error_count += 1;
677 }
678
679 if position.is_open() && !self.index.positions_open.contains(position_id) {
680 log::error!(
681 "{failure} in positions: {position_id} not found in `self.index.positions_open`",
682 );
683 error_count += 1;
684 }
685
686 if position.is_closed() && !self.index.positions_closed.contains(position_id) {
687 log::error!(
688 "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
689 );
690 error_count += 1;
691 }
692 }
693
694 for account_id in self.index.venue_account.values() {
696 if !self.accounts.contains_key(account_id) {
697 log::error!(
698 "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
699 );
700 error_count += 1;
701 }
702 }
703
704 for client_order_id in self.index.venue_order_ids.values() {
705 if !self.orders.contains_key(client_order_id) {
706 log::error!(
707 "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
708 );
709 error_count += 1;
710 }
711 }
712
713 for client_order_id in self.index.client_order_ids.keys() {
714 if !self.orders.contains_key(client_order_id) {
715 log::error!(
716 "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
717 );
718 error_count += 1;
719 }
720 }
721
722 for client_order_id in self.index.order_position.keys() {
723 if !self.orders.contains_key(client_order_id) {
724 log::error!(
725 "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
726 );
727 error_count += 1;
728 }
729 }
730
731 for client_order_id in self.index.order_strategy.keys() {
733 if !self.orders.contains_key(client_order_id) {
734 log::error!(
735 "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
736 );
737 error_count += 1;
738 }
739 }
740
741 for position_id in self.index.position_strategy.keys() {
742 if !self.positions.contains_key(position_id) {
743 log::error!(
744 "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
745 );
746 error_count += 1;
747 }
748 }
749
750 for position_id in self.index.position_orders.keys() {
751 if !self.positions.contains_key(position_id) {
752 log::error!(
753 "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
754 );
755 error_count += 1;
756 }
757 }
758
759 for (instrument_id, client_order_ids) in &self.index.instrument_orders {
760 for client_order_id in client_order_ids {
761 if !self.orders.contains_key(client_order_id) {
762 log::error!(
763 "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
764 );
765 error_count += 1;
766 }
767 }
768 }
769
770 for instrument_id in self.index.instrument_positions.keys() {
771 if !self.index.instrument_orders.contains_key(instrument_id) {
772 log::error!(
773 "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
774 );
775 error_count += 1;
776 }
777 }
778
779 for client_order_ids in self.index.strategy_orders.values() {
780 for client_order_id in client_order_ids {
781 if !self.orders.contains_key(client_order_id) {
782 log::error!(
783 "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
784 );
785 error_count += 1;
786 }
787 }
788 }
789
790 for position_ids in self.index.strategy_positions.values() {
791 for position_id in position_ids {
792 if !self.positions.contains_key(position_id) {
793 log::error!(
794 "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
795 );
796 error_count += 1;
797 }
798 }
799 }
800
801 for client_order_id in &self.index.orders {
802 if !self.orders.contains_key(client_order_id) {
803 log::error!(
804 "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
805 );
806 error_count += 1;
807 }
808 }
809
810 for client_order_id in &self.index.orders_emulated {
811 if !self.orders.contains_key(client_order_id) {
812 log::error!(
813 "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
814 );
815 error_count += 1;
816 }
817 }
818
819 for client_order_id in &self.index.orders_active_local {
820 if !self.orders.contains_key(client_order_id) {
821 log::error!(
822 "{failure} in `index.orders_active_local`: {client_order_id} not found in `self.orders`",
823 );
824 error_count += 1;
825 }
826 }
827
828 for client_order_id in &self.index.orders_inflight {
829 if !self.orders.contains_key(client_order_id) {
830 log::error!(
831 "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
832 );
833 error_count += 1;
834 }
835 }
836
837 for client_order_id in &self.index.orders_open {
838 if !self.orders.contains_key(client_order_id) {
839 log::error!(
840 "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
841 );
842 error_count += 1;
843 }
844 }
845
846 for client_order_id in &self.index.orders_closed {
847 if !self.orders.contains_key(client_order_id) {
848 log::error!(
849 "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
850 );
851 error_count += 1;
852 }
853 }
854
855 for position_id in &self.index.positions {
856 if !self.positions.contains_key(position_id) {
857 log::error!(
858 "{failure} in `index.positions`: {position_id} not found in `self.positions`",
859 );
860 error_count += 1;
861 }
862 }
863
864 for position_id in &self.index.positions_open {
865 if !self.positions.contains_key(position_id) {
866 log::error!(
867 "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
868 );
869 error_count += 1;
870 }
871 }
872
873 for position_id in &self.index.positions_closed {
874 if !self.positions.contains_key(position_id) {
875 log::error!(
876 "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
877 );
878 error_count += 1;
879 }
880 }
881
882 for strategy_id in &self.index.strategies {
883 if !self.index.strategy_orders.contains_key(strategy_id) {
884 log::error!(
885 "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
886 );
887 error_count += 1;
888 }
889 }
890
891 for exec_algorithm_id in &self.index.exec_algorithms {
892 if !self
893 .index
894 .exec_algorithm_orders
895 .contains_key(exec_algorithm_id)
896 {
897 log::error!(
898 "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
899 );
900 error_count += 1;
901 }
902 }
903
904 let total_us = SystemTime::now()
905 .duration_since(UNIX_EPOCH)
906 .expect("Time went backwards")
907 .as_micros()
908 - timestamp_us;
909
910 if error_count == 0 {
911 log::info!("Integrity check passed in {total_us}μs");
912 true
913 } else {
914 log::error!(
915 "Integrity check failed with {error_count} error{} in {total_us}μs",
916 if error_count == 1 { "" } else { "s" },
917 );
918 false
919 }
920 }
921
922 #[must_use]
926 pub fn check_residuals(&self) -> bool {
927 log::debug!("Checking residuals");
928
929 let mut residuals = false;
930
931 for order in self.orders_open(None, None, None, None, None) {
933 residuals = true;
934 log::warn!("Residual {order}");
935 }
936
937 for position in self.positions_open(None, None, None, None, None) {
939 residuals = true;
940 log::warn!("Residual {position}");
941 }
942
943 residuals
944 }
945
946 pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
952 log::debug!(
953 "Purging closed orders{}",
954 if buffer_secs > 0 {
955 format!(" with buffer_secs={buffer_secs}")
956 } else {
957 String::new()
958 }
959 );
960
961 let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
962
963 let mut affected_order_list_ids: AHashSet<OrderListId> = AHashSet::new();
964
965 'outer: for client_order_id in self.index.orders_closed.clone() {
966 if let Some(order) = self.orders.get(&client_order_id)
967 && order.is_closed()
968 && let Some(ts_closed) = order.ts_closed()
969 && ts_closed + buffer_ns <= ts_now
970 {
971 if let Some(linked_order_ids) = order.linked_order_ids() {
973 for linked_order_id in linked_order_ids {
974 if let Some(linked_order) = self.orders.get(linked_order_id)
975 && linked_order.is_open()
976 {
977 continue 'outer;
979 }
980 }
981 }
982
983 if let Some(order_list_id) = order.order_list_id() {
984 affected_order_list_ids.insert(order_list_id);
985 }
986
987 self.purge_order(client_order_id);
988 }
989 }
990
991 for order_list_id in affected_order_list_ids {
992 if let Some(order_list) = self.order_lists.get(&order_list_id) {
993 let all_purged = order_list
994 .client_order_ids
995 .iter()
996 .all(|id| !self.orders.contains_key(id));
997
998 if all_purged {
999 self.order_lists.remove(&order_list_id);
1000 log::info!("Purged {order_list_id}");
1001 }
1002 }
1003 }
1004 }
1005
1006 pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
1008 log::debug!(
1009 "Purging closed positions{}",
1010 if buffer_secs > 0 {
1011 format!(" with buffer_secs={buffer_secs}")
1012 } else {
1013 String::new()
1014 }
1015 );
1016
1017 let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
1018
1019 for position_id in self.index.positions_closed.clone() {
1020 if let Some(position) = self.positions.get(&position_id)
1021 && position.is_closed()
1022 && let Some(ts_closed) = position.ts_closed
1023 && ts_closed + buffer_ns <= ts_now
1024 {
1025 self.purge_position(position_id);
1026 }
1027 }
1028 }
1029
1030 pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
1034 let order = self.orders.get(&client_order_id).cloned();
1036
1037 if let Some(ref ord) = order
1039 && ord.is_open()
1040 {
1041 log::warn!("Order {client_order_id} found open when purging, skipping purge");
1042 return;
1043 }
1044
1045 if let Some(ref ord) = order {
1047 self.orders.remove(&client_order_id);
1049
1050 if let Some(venue_orders) = self.index.venue_orders.get_mut(&ord.instrument_id().venue)
1052 {
1053 venue_orders.remove(&client_order_id);
1054 if venue_orders.is_empty() {
1055 self.index.venue_orders.remove(&ord.instrument_id().venue);
1056 }
1057 }
1058
1059 if let Some(venue_order_id) = ord.venue_order_id() {
1061 self.index.venue_order_ids.remove(&venue_order_id);
1062 }
1063
1064 if let Some(instrument_orders) =
1066 self.index.instrument_orders.get_mut(&ord.instrument_id())
1067 {
1068 instrument_orders.remove(&client_order_id);
1069 if instrument_orders.is_empty() {
1070 self.index.instrument_orders.remove(&ord.instrument_id());
1071 }
1072 }
1073
1074 if let Some(position_id) = ord.position_id()
1076 && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
1077 {
1078 position_orders.remove(&client_order_id);
1079 if position_orders.is_empty() {
1080 self.index.position_orders.remove(&position_id);
1081 }
1082 }
1083
1084 if let Some(exec_algorithm_id) = ord.exec_algorithm_id()
1086 && let Some(exec_algorithm_orders) =
1087 self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
1088 {
1089 exec_algorithm_orders.remove(&client_order_id);
1090 if exec_algorithm_orders.is_empty() {
1091 self.index.exec_algorithm_orders.remove(&exec_algorithm_id);
1092 }
1093 }
1094
1095 if let Some(strategy_orders) = self.index.strategy_orders.get_mut(&ord.strategy_id()) {
1097 strategy_orders.remove(&client_order_id);
1098 if strategy_orders.is_empty() {
1099 self.index.strategy_orders.remove(&ord.strategy_id());
1100 }
1101 }
1102
1103 if let Some(account_id) = ord.account_id()
1105 && let Some(account_orders) = self.index.account_orders.get_mut(&account_id)
1106 {
1107 account_orders.remove(&client_order_id);
1108 if account_orders.is_empty() {
1109 self.index.account_orders.remove(&account_id);
1110 }
1111 }
1112
1113 if let Some(exec_spawn_id) = ord.exec_spawn_id()
1115 && let Some(spawn_orders) = self.index.exec_spawn_orders.get_mut(&exec_spawn_id)
1116 {
1117 spawn_orders.remove(&client_order_id);
1118 if spawn_orders.is_empty() {
1119 self.index.exec_spawn_orders.remove(&exec_spawn_id);
1120 }
1121 }
1122
1123 log::info!("Purged order {client_order_id}");
1124 } else {
1125 log::warn!("Order {client_order_id} not found when purging");
1126 }
1127
1128 self.index.order_position.remove(&client_order_id);
1130 let strategy_id = self.index.order_strategy.remove(&client_order_id);
1131 self.index.order_client.remove(&client_order_id);
1132 self.index.client_order_ids.remove(&client_order_id);
1133
1134 if let Some(strategy_id) = strategy_id
1136 && let Some(strategy_orders) = self.index.strategy_orders.get_mut(&strategy_id)
1137 {
1138 strategy_orders.remove(&client_order_id);
1139 if strategy_orders.is_empty() {
1140 self.index.strategy_orders.remove(&strategy_id);
1141 }
1142 }
1143
1144 self.index.exec_spawn_orders.remove(&client_order_id);
1146
1147 self.index.orders.remove(&client_order_id);
1148 self.index.orders_active_local.remove(&client_order_id);
1149 self.index.orders_open.remove(&client_order_id);
1150 self.index.orders_closed.remove(&client_order_id);
1151 self.index.orders_emulated.remove(&client_order_id);
1152 self.index.orders_inflight.remove(&client_order_id);
1153 self.index.orders_pending_cancel.remove(&client_order_id);
1154 }
1155
1156 pub fn purge_position(&mut self, position_id: PositionId) {
1160 let position = self.positions.get(&position_id).cloned();
1162
1163 if let Some(ref pos) = position
1165 && pos.is_open()
1166 {
1167 log::warn!("Position {position_id} found open when purging, skipping purge");
1168 return;
1169 }
1170
1171 if let Some(ref pos) = position {
1173 self.positions.remove(&position_id);
1174
1175 if let Some(venue_positions) =
1177 self.index.venue_positions.get_mut(&pos.instrument_id.venue)
1178 {
1179 venue_positions.remove(&position_id);
1180 if venue_positions.is_empty() {
1181 self.index.venue_positions.remove(&pos.instrument_id.venue);
1182 }
1183 }
1184
1185 if let Some(instrument_positions) =
1187 self.index.instrument_positions.get_mut(&pos.instrument_id)
1188 {
1189 instrument_positions.remove(&position_id);
1190 if instrument_positions.is_empty() {
1191 self.index.instrument_positions.remove(&pos.instrument_id);
1192 }
1193 }
1194
1195 if let Some(strategy_positions) =
1197 self.index.strategy_positions.get_mut(&pos.strategy_id)
1198 {
1199 strategy_positions.remove(&position_id);
1200 if strategy_positions.is_empty() {
1201 self.index.strategy_positions.remove(&pos.strategy_id);
1202 }
1203 }
1204
1205 if let Some(account_positions) = self.index.account_positions.get_mut(&pos.account_id) {
1207 account_positions.remove(&position_id);
1208 if account_positions.is_empty() {
1209 self.index.account_positions.remove(&pos.account_id);
1210 }
1211 }
1212
1213 for client_order_id in pos.client_order_ids() {
1215 self.index.order_position.remove(&client_order_id);
1216 }
1217
1218 log::info!("Purged position {position_id}");
1219 } else {
1220 log::warn!("Position {position_id} not found when purging");
1221 }
1222
1223 self.index.position_strategy.remove(&position_id);
1225 self.index.position_orders.remove(&position_id);
1226 self.index.positions.remove(&position_id);
1227 self.index.positions_open.remove(&position_id);
1228 self.index.positions_closed.remove(&position_id);
1229
1230 self.position_snapshots.remove(&position_id);
1232 }
1233
1234 pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1239 log::debug!(
1240 "Purging account events{}",
1241 if lookback_secs > 0 {
1242 format!(" with lookback_secs={lookback_secs}")
1243 } else {
1244 String::new()
1245 }
1246 );
1247
1248 for account in self.accounts.values_mut() {
1249 let event_count = account.event_count();
1250 account.purge_account_events(ts_now, lookback_secs);
1251 let count_diff = event_count - account.event_count();
1252 if count_diff > 0 {
1253 log::info!(
1254 "Purged {} event(s) from account {}",
1255 count_diff,
1256 account.id()
1257 );
1258 }
1259 }
1260 }
1261
1262 pub fn clear_index(&mut self) {
1264 self.index.clear();
1265 log::debug!("Cleared index");
1266 }
1267
1268 pub fn reset(&mut self) {
1272 log::debug!("Resetting cache");
1273
1274 self.general.clear();
1275 self.currencies.clear();
1276 self.instruments.clear();
1277 self.synthetics.clear();
1278 self.books.clear();
1279 self.own_books.clear();
1280 self.quotes.clear();
1281 self.trades.clear();
1282 self.mark_xrates.clear();
1283 self.mark_prices.clear();
1284 self.index_prices.clear();
1285 self.funding_rates.clear();
1286 self.bars.clear();
1287 self.accounts.clear();
1288 self.orders.clear();
1289 self.order_lists.clear();
1290 self.positions.clear();
1291 self.position_snapshots.clear();
1292 self.greeks.clear();
1293 self.yield_curves.clear();
1294
1295 #[cfg(feature = "defi")]
1296 {
1297 self.defi.pools.clear();
1298 self.defi.pool_profilers.clear();
1299 }
1300
1301 self.clear_index();
1302
1303 log::info!("Reset cache");
1304 }
1305
1306 pub fn dispose(&mut self) {
1310 self.reset();
1311
1312 if let Some(database) = &mut self.database
1313 && let Err(e) = database.close()
1314 {
1315 log::error!("Failed to close database during dispose: {e}");
1316 }
1317 }
1318
1319 pub fn flush_db(&mut self) {
1323 if let Some(database) = &mut self.database
1324 && let Err(e) = database.flush()
1325 {
1326 log::error!("Failed to flush database: {e}");
1327 }
1328 }
1329
1330 pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1338 check_valid_string_ascii(key, stringify!(key))?;
1339 check_predicate_false(value.is_empty(), stringify!(value))?;
1340
1341 log::debug!("Adding general {key}");
1342 self.general.insert(key.to_string(), value.clone());
1343
1344 if let Some(database) = &mut self.database {
1345 database.add(key.to_string(), value)?;
1346 }
1347 Ok(())
1348 }
1349
1350 pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1356 log::debug!("Adding `OrderBook` {}", book.instrument_id);
1357
1358 if self.config.save_market_data
1359 && let Some(database) = &mut self.database
1360 {
1361 database.add_order_book(&book)?;
1362 }
1363
1364 self.books.insert(book.instrument_id, book);
1365 Ok(())
1366 }
1367
1368 pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1374 log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1375
1376 self.own_books.insert(own_book.instrument_id, own_book);
1377 Ok(())
1378 }
1379
1380 pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1386 log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1387
1388 if self.config.save_market_data {
1389 }
1391
1392 let mark_prices_deque = self
1393 .mark_prices
1394 .entry(mark_price.instrument_id)
1395 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1396 mark_prices_deque.push_front(mark_price);
1397 Ok(())
1398 }
1399
1400 pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1406 log::debug!(
1407 "Adding `IndexPriceUpdate` for {}",
1408 index_price.instrument_id
1409 );
1410
1411 if self.config.save_market_data {
1412 }
1414
1415 let index_prices_deque = self
1416 .index_prices
1417 .entry(index_price.instrument_id)
1418 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1419 index_prices_deque.push_front(index_price);
1420 Ok(())
1421 }
1422
1423 pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
1429 log::debug!(
1430 "Adding `FundingRateUpdate` for {}",
1431 funding_rate.instrument_id
1432 );
1433
1434 if self.config.save_market_data {
1435 }
1437
1438 let funding_rates_deque = self
1439 .funding_rates
1440 .entry(funding_rate.instrument_id)
1441 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1442 funding_rates_deque.push_front(funding_rate);
1443 Ok(())
1444 }
1445
1446 pub fn add_funding_rates(&mut self, funding_rates: &[FundingRateUpdate]) -> anyhow::Result<()> {
1452 check_slice_not_empty(funding_rates, stringify!(funding_rates))?;
1453
1454 let instrument_id = funding_rates[0].instrument_id;
1455 log::debug!(
1456 "Adding `FundingRateUpdate`[{}] {instrument_id}",
1457 funding_rates.len()
1458 );
1459
1460 if self.config.save_market_data
1461 && let Some(database) = &mut self.database
1462 {
1463 for funding_rate in funding_rates {
1464 database.add_funding_rate(funding_rate)?;
1465 }
1466 }
1467
1468 let funding_rate_deque = self
1469 .funding_rates
1470 .entry(instrument_id)
1471 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1472
1473 for funding_rate in funding_rates {
1474 funding_rate_deque.push_front(*funding_rate);
1475 }
1476 Ok(())
1477 }
1478
1479 pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1485 log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1486
1487 if self.config.save_market_data
1488 && let Some(database) = &mut self.database
1489 {
1490 database.add_quote("e)?;
1491 }
1492
1493 let quotes_deque = self
1494 .quotes
1495 .entry(quote.instrument_id)
1496 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1497 quotes_deque.push_front(quote);
1498 Ok(())
1499 }
1500
1501 pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1507 check_slice_not_empty(quotes, stringify!(quotes))?;
1508
1509 let instrument_id = quotes[0].instrument_id;
1510 log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1511
1512 if self.config.save_market_data
1513 && let Some(database) = &mut self.database
1514 {
1515 for quote in quotes {
1516 database.add_quote(quote)?;
1517 }
1518 }
1519
1520 let quotes_deque = self
1521 .quotes
1522 .entry(instrument_id)
1523 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1524
1525 for quote in quotes {
1526 quotes_deque.push_front(*quote);
1527 }
1528 Ok(())
1529 }
1530
1531 pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1537 log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1538
1539 if self.config.save_market_data
1540 && let Some(database) = &mut self.database
1541 {
1542 database.add_trade(&trade)?;
1543 }
1544
1545 let trades_deque = self
1546 .trades
1547 .entry(trade.instrument_id)
1548 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1549 trades_deque.push_front(trade);
1550 Ok(())
1551 }
1552
1553 pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1559 check_slice_not_empty(trades, stringify!(trades))?;
1560
1561 let instrument_id = trades[0].instrument_id;
1562 log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1563
1564 if self.config.save_market_data
1565 && let Some(database) = &mut self.database
1566 {
1567 for trade in trades {
1568 database.add_trade(trade)?;
1569 }
1570 }
1571
1572 let trades_deque = self
1573 .trades
1574 .entry(instrument_id)
1575 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1576
1577 for trade in trades {
1578 trades_deque.push_front(*trade);
1579 }
1580 Ok(())
1581 }
1582
1583 pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1589 log::debug!("Adding `Bar` {}", bar.bar_type);
1590
1591 if self.config.save_market_data
1592 && let Some(database) = &mut self.database
1593 {
1594 database.add_bar(&bar)?;
1595 }
1596
1597 let bars = self
1598 .bars
1599 .entry(bar.bar_type)
1600 .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1601 bars.push_front(bar);
1602 Ok(())
1603 }
1604
1605 pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1611 check_slice_not_empty(bars, stringify!(bars))?;
1612
1613 let bar_type = bars[0].bar_type;
1614 log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1615
1616 if self.config.save_market_data
1617 && let Some(database) = &mut self.database
1618 {
1619 for bar in bars {
1620 database.add_bar(bar)?;
1621 }
1622 }
1623
1624 let bars_deque = self
1625 .bars
1626 .entry(bar_type)
1627 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1628
1629 for bar in bars {
1630 bars_deque.push_front(*bar);
1631 }
1632 Ok(())
1633 }
1634
1635 pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1641 log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1642
1643 if self.config.save_market_data
1644 && let Some(_database) = &mut self.database
1645 {
1646 }
1648
1649 self.greeks.insert(greeks.instrument_id, greeks);
1650 Ok(())
1651 }
1652
1653 pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1655 self.greeks.get(instrument_id).cloned()
1656 }
1657
1658 pub fn add_option_greeks(&mut self, greeks: OptionGreeks) {
1660 log::debug!("Adding `OptionGreeks` {}", greeks.instrument_id);
1661 self.option_greeks.insert(greeks.instrument_id, greeks);
1662 }
1663
1664 #[must_use]
1666 pub fn option_greeks(&self, instrument_id: &InstrumentId) -> Option<&OptionGreeks> {
1667 self.option_greeks.get(instrument_id)
1668 }
1669
1670 pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1676 log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1677
1678 if self.config.save_market_data
1679 && let Some(_database) = &mut self.database
1680 {
1681 }
1683
1684 self.yield_curves
1685 .insert(yield_curve.curve_name.clone(), yield_curve);
1686 Ok(())
1687 }
1688
1689 pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1691 self.yield_curves.get(key).map(|curve| {
1692 let curve_clone = curve.clone();
1693 Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
1694 as Box<dyn Fn(f64) -> f64>
1695 })
1696 }
1697
1698 pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1704 if self.currencies.contains_key(¤cy.code) {
1705 return Ok(());
1706 }
1707 log::debug!("Adding `Currency` {}", currency.code);
1708
1709 if let Some(database) = &mut self.database {
1710 database.add_currency(¤cy)?;
1711 }
1712
1713 self.currencies.insert(currency.code, currency);
1714 Ok(())
1715 }
1716
1717 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1723 log::debug!("Adding `Instrument` {}", instrument.id());
1724
1725 if let Some(base_currency) = instrument.base_currency() {
1727 self.add_currency(base_currency)?;
1728 }
1729 self.add_currency(instrument.quote_currency())?;
1730 self.add_currency(instrument.settlement_currency())?;
1731
1732 if let Some(database) = &mut self.database {
1733 database.add_instrument(&instrument)?;
1734 }
1735
1736 self.instruments.insert(instrument.id(), instrument);
1737 Ok(())
1738 }
1739
1740 pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1746 log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1747
1748 if let Some(database) = &mut self.database {
1749 database.add_synthetic(&synthetic)?;
1750 }
1751
1752 self.synthetics.insert(synthetic.id, synthetic);
1753 Ok(())
1754 }
1755
1756 pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1762 log::debug!("Adding `Account` {}", account.id());
1763
1764 if let Some(database) = &mut self.database {
1765 database.add_account(&account)?;
1766 }
1767
1768 let account_id = account.id();
1769 self.accounts.insert(account_id, account);
1770 self.index
1771 .venue_account
1772 .insert(account_id.get_issuer(), account_id);
1773 Ok(())
1774 }
1775
1776 pub fn add_venue_order_id(
1784 &mut self,
1785 client_order_id: &ClientOrderId,
1786 venue_order_id: &VenueOrderId,
1787 overwrite: bool,
1788 ) -> anyhow::Result<()> {
1789 if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
1790 && !overwrite
1791 && existing_venue_order_id != venue_order_id
1792 {
1793 anyhow::bail!(
1794 "Existing {existing_venue_order_id} for {client_order_id}
1795 did not match the given {venue_order_id}.
1796 If you are writing a test then try a different `venue_order_id`,
1797 otherwise this is probably a bug."
1798 );
1799 }
1800
1801 self.index
1802 .client_order_ids
1803 .insert(*client_order_id, *venue_order_id);
1804 self.index
1805 .venue_order_ids
1806 .insert(*venue_order_id, *client_order_id);
1807
1808 Ok(())
1809 }
1810
1811 pub fn add_order(
1823 &mut self,
1824 order: OrderAny,
1825 position_id: Option<PositionId>,
1826 client_id: Option<ClientId>,
1827 replace_existing: bool,
1828 ) -> anyhow::Result<()> {
1829 let instrument_id = order.instrument_id();
1830 let venue = instrument_id.venue;
1831 let client_order_id = order.client_order_id();
1832 let strategy_id = order.strategy_id();
1833 let exec_algorithm_id = order.exec_algorithm_id();
1834 let exec_spawn_id = order.exec_spawn_id();
1835
1836 if !replace_existing {
1837 check_key_not_in_map(
1838 &client_order_id,
1839 &self.orders,
1840 stringify!(client_order_id),
1841 stringify!(orders),
1842 )?;
1843 }
1844
1845 log::debug!("Adding {order:?}");
1846
1847 self.index.orders.insert(client_order_id);
1848
1849 if order.is_active_local() {
1850 self.index.orders_active_local.insert(client_order_id);
1851 }
1852 self.index
1853 .order_strategy
1854 .insert(client_order_id, strategy_id);
1855 self.index.strategies.insert(strategy_id);
1856
1857 self.index
1859 .venue_orders
1860 .entry(venue)
1861 .or_default()
1862 .insert(client_order_id);
1863
1864 self.index
1866 .instrument_orders
1867 .entry(instrument_id)
1868 .or_default()
1869 .insert(client_order_id);
1870
1871 self.index
1873 .strategy_orders
1874 .entry(strategy_id)
1875 .or_default()
1876 .insert(client_order_id);
1877
1878 if let Some(account_id) = order.account_id() {
1880 self.index
1881 .account_orders
1882 .entry(account_id)
1883 .or_default()
1884 .insert(client_order_id);
1885 }
1886
1887 if let Some(exec_algorithm_id) = exec_algorithm_id {
1889 self.index.exec_algorithms.insert(exec_algorithm_id);
1890
1891 self.index
1892 .exec_algorithm_orders
1893 .entry(exec_algorithm_id)
1894 .or_default()
1895 .insert(client_order_id);
1896 }
1897
1898 if let Some(exec_spawn_id) = exec_spawn_id {
1900 self.index
1901 .exec_spawn_orders
1902 .entry(exec_spawn_id)
1903 .or_default()
1904 .insert(client_order_id);
1905 }
1906
1907 if let Some(emulation_trigger) = order.emulation_trigger()
1909 && emulation_trigger != TriggerType::NoTrigger
1910 {
1911 self.index.orders_emulated.insert(client_order_id);
1912 }
1913
1914 if let Some(position_id) = position_id {
1916 self.add_position_id(
1917 &position_id,
1918 &order.instrument_id().venue,
1919 &client_order_id,
1920 &strategy_id,
1921 )?;
1922 }
1923
1924 if let Some(client_id) = client_id {
1926 self.index.order_client.insert(client_order_id, client_id);
1927 log::debug!("Indexed {client_id:?}");
1928 }
1929
1930 if let Some(database) = &mut self.database {
1931 database.add_order(&order, client_id)?;
1932 }
1937
1938 self.orders.insert(client_order_id, order);
1939
1940 Ok(())
1941 }
1942
1943 pub fn add_order_list(&mut self, order_list: OrderList) -> anyhow::Result<()> {
1949 let order_list_id = order_list.id;
1950 check_key_not_in_map(
1951 &order_list_id,
1952 &self.order_lists,
1953 stringify!(order_list_id),
1954 stringify!(order_lists),
1955 )?;
1956
1957 log::debug!("Adding {order_list:?}");
1958 self.order_lists.insert(order_list_id, order_list);
1959 Ok(())
1960 }
1961
1962 pub fn add_position_id(
1968 &mut self,
1969 position_id: &PositionId,
1970 venue: &Venue,
1971 client_order_id: &ClientOrderId,
1972 strategy_id: &StrategyId,
1973 ) -> anyhow::Result<()> {
1974 self.index
1975 .order_position
1976 .insert(*client_order_id, *position_id);
1977
1978 if let Some(database) = &mut self.database {
1980 database.index_order_position(*client_order_id, *position_id)?;
1981 }
1982
1983 self.index
1985 .position_strategy
1986 .insert(*position_id, *strategy_id);
1987
1988 self.index
1990 .position_orders
1991 .entry(*position_id)
1992 .or_default()
1993 .insert(*client_order_id);
1994
1995 self.index
1997 .strategy_positions
1998 .entry(*strategy_id)
1999 .or_default()
2000 .insert(*position_id);
2001
2002 self.index
2004 .venue_positions
2005 .entry(*venue)
2006 .or_default()
2007 .insert(*position_id);
2008
2009 Ok(())
2010 }
2011
2012 pub fn add_position(&mut self, position: &Position, _oms_type: OmsType) -> anyhow::Result<()> {
2018 self.positions.insert(position.id, position.clone());
2019 self.index.positions.insert(position.id);
2020 self.index.positions_open.insert(position.id);
2021 self.index.positions_closed.remove(&position.id); log::debug!("Adding {position}");
2024
2025 self.add_position_id(
2026 &position.id,
2027 &position.instrument_id.venue,
2028 &position.opening_order_id,
2029 &position.strategy_id,
2030 )?;
2031
2032 let venue = position.instrument_id.venue;
2033 let venue_positions = self.index.venue_positions.entry(venue).or_default();
2034 venue_positions.insert(position.id);
2035
2036 let instrument_id = position.instrument_id;
2038 let instrument_positions = self
2039 .index
2040 .instrument_positions
2041 .entry(instrument_id)
2042 .or_default();
2043 instrument_positions.insert(position.id);
2044
2045 self.index
2047 .account_positions
2048 .entry(position.account_id)
2049 .or_default()
2050 .insert(position.id);
2051
2052 if let Some(database) = &mut self.database {
2053 database.add_position(position)?;
2054 }
2063
2064 Ok(())
2065 }
2066
2067 pub fn update_account(&mut self, account: &AccountAny) -> anyhow::Result<()> {
2073 let account_id = account.id();
2074 self.accounts.insert(account_id, account.clone());
2075
2076 if let Some(database) = &mut self.database {
2077 database.update_account(account)?;
2078 }
2079 Ok(())
2080 }
2081
2082 pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
2088 let client_order_id = order.client_order_id();
2089
2090 if order.is_active_local() {
2091 self.index.orders_active_local.insert(client_order_id);
2092 } else {
2093 self.index.orders_active_local.remove(&client_order_id);
2094 }
2095
2096 if let Some(venue_order_id) = order.venue_order_id() {
2098 if !self.index.venue_order_ids.contains_key(&venue_order_id) {
2101 self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
2103 }
2104 }
2105
2106 if order.is_inflight() {
2108 self.index.orders_inflight.insert(client_order_id);
2109 } else {
2110 self.index.orders_inflight.remove(&client_order_id);
2111 }
2112
2113 if order.is_open() {
2115 self.index.orders_closed.remove(&client_order_id);
2116 self.index.orders_open.insert(client_order_id);
2117 } else if order.is_closed() {
2118 self.index.orders_open.remove(&client_order_id);
2119 self.index.orders_pending_cancel.remove(&client_order_id);
2120 self.index.orders_closed.insert(client_order_id);
2121 }
2122
2123 if let Some(emulation_trigger) = order.emulation_trigger()
2125 && emulation_trigger != TriggerType::NoTrigger
2126 && !order.is_closed()
2127 {
2128 self.index.orders_emulated.insert(client_order_id);
2129 } else {
2130 self.index.orders_emulated.remove(&client_order_id);
2131 }
2132
2133 if let Some(account_id) = order.account_id() {
2135 self.index
2136 .account_orders
2137 .entry(account_id)
2138 .or_default()
2139 .insert(client_order_id);
2140 }
2141
2142 if self.own_order_book(&order.instrument_id()).is_some()
2144 && should_handle_own_book_order(order)
2145 {
2146 self.update_own_order_book(order);
2147 }
2148
2149 if let Some(database) = &mut self.database {
2150 database.update_order(order.last_event())?;
2151 }
2156
2157 self.orders.insert(client_order_id, order.clone());
2159
2160 Ok(())
2161 }
2162
2163 pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
2165 self.index
2166 .orders_pending_cancel
2167 .insert(order.client_order_id());
2168 }
2169
2170 pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
2176 if position.is_open() {
2179 self.index.positions_open.insert(position.id);
2180 self.index.positions_closed.remove(&position.id);
2181 } else {
2182 self.index.positions_closed.insert(position.id);
2183 self.index.positions_open.remove(&position.id);
2184 }
2185
2186 if let Some(database) = &mut self.database {
2187 database.update_position(position)?;
2188 }
2193
2194 self.positions.insert(position.id, position.clone());
2195
2196 Ok(())
2197 }
2198
2199 pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
2206 let position_id = position.id;
2207
2208 let mut copied_position = position.clone();
2209 let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
2210 copied_position.id = PositionId::new(new_id);
2211
2212 let position_serialized = serde_json::to_vec(&copied_position)?;
2214
2215 let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
2216 let new_snapshots = match snapshots {
2217 Some(existing_snapshots) => {
2218 let mut combined = existing_snapshots.to_vec();
2219 combined.extend(position_serialized);
2220 Bytes::from(combined)
2221 }
2222 None => Bytes::from(position_serialized),
2223 };
2224 self.position_snapshots.insert(position_id, new_snapshots);
2225
2226 log::debug!("Snapshot {copied_position}");
2227 Ok(())
2228 }
2229
2230 pub fn snapshot_position_state(
2236 &mut self,
2237 position: &Position,
2238 open_only: Option<bool>,
2241 ) -> anyhow::Result<()> {
2242 let open_only = open_only.unwrap_or(true);
2243
2244 if open_only && !position.is_open() {
2245 return Ok(());
2246 }
2247
2248 if let Some(database) = &mut self.database {
2249 database.snapshot_position_state(position).map_err(|e| {
2250 log::error!(
2251 "Failed to snapshot position state for {}: {e:?}",
2252 position.id
2253 );
2254 e
2255 })?;
2256 } else {
2257 log::warn!(
2258 "Cannot snapshot position state for {} (no database configured)",
2259 position.id
2260 );
2261 }
2262
2263 todo!()
2265 }
2266
2267 #[must_use]
2269 pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
2270 if self.index.position_strategy.contains_key(position_id) {
2272 Some(OmsType::Netting)
2275 } else {
2276 None
2277 }
2278 }
2279
2280 #[must_use]
2282 pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<u8>> {
2283 self.position_snapshots.get(position_id).map(|b| b.to_vec())
2284 }
2285
2286 #[must_use]
2288 pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> AHashSet<PositionId> {
2289 let mut result = AHashSet::new();
2291 for (position_id, _) in &self.position_snapshots {
2292 if let Some(position) = self.positions.get(position_id)
2294 && position.instrument_id == *instrument_id
2295 {
2296 result.insert(*position_id);
2297 }
2298 }
2299 result
2300 }
2301
2302 pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
2308 let database = if let Some(database) = &self.database {
2309 database
2310 } else {
2311 log::warn!(
2312 "Cannot snapshot order state for {} (no database configured)",
2313 order.client_order_id()
2314 );
2315 return Ok(());
2316 };
2317
2318 database.snapshot_order_state(order)
2319 }
2320
2321 fn build_order_query_filter_set(
2324 &self,
2325 venue: Option<&Venue>,
2326 instrument_id: Option<&InstrumentId>,
2327 strategy_id: Option<&StrategyId>,
2328 account_id: Option<&AccountId>,
2329 ) -> Option<AHashSet<ClientOrderId>> {
2330 let mut query: Option<AHashSet<ClientOrderId>> = None;
2331
2332 if let Some(venue) = venue {
2333 query = Some(
2334 self.index
2335 .venue_orders
2336 .get(venue)
2337 .cloned()
2338 .unwrap_or_default(),
2339 );
2340 }
2341
2342 if let Some(instrument_id) = instrument_id {
2343 let instrument_orders = self
2344 .index
2345 .instrument_orders
2346 .get(instrument_id)
2347 .cloned()
2348 .unwrap_or_default();
2349
2350 if let Some(existing_query) = &mut query {
2351 *existing_query = existing_query
2352 .intersection(&instrument_orders)
2353 .copied()
2354 .collect();
2355 } else {
2356 query = Some(instrument_orders);
2357 }
2358 }
2359
2360 if let Some(strategy_id) = strategy_id {
2361 let strategy_orders = self
2362 .index
2363 .strategy_orders
2364 .get(strategy_id)
2365 .cloned()
2366 .unwrap_or_default();
2367
2368 if let Some(existing_query) = &mut query {
2369 *existing_query = existing_query
2370 .intersection(&strategy_orders)
2371 .copied()
2372 .collect();
2373 } else {
2374 query = Some(strategy_orders);
2375 }
2376 }
2377
2378 if let Some(account_id) = account_id {
2379 let account_orders = self
2380 .index
2381 .account_orders
2382 .get(account_id)
2383 .cloned()
2384 .unwrap_or_default();
2385
2386 if let Some(existing_query) = &mut query {
2387 *existing_query = existing_query
2388 .intersection(&account_orders)
2389 .copied()
2390 .collect();
2391 } else {
2392 query = Some(account_orders);
2393 }
2394 }
2395
2396 query
2397 }
2398
2399 fn build_position_query_filter_set(
2400 &self,
2401 venue: Option<&Venue>,
2402 instrument_id: Option<&InstrumentId>,
2403 strategy_id: Option<&StrategyId>,
2404 account_id: Option<&AccountId>,
2405 ) -> Option<AHashSet<PositionId>> {
2406 let mut query: Option<AHashSet<PositionId>> = None;
2407
2408 if let Some(venue) = venue {
2409 query = Some(
2410 self.index
2411 .venue_positions
2412 .get(venue)
2413 .cloned()
2414 .unwrap_or_default(),
2415 );
2416 }
2417
2418 if let Some(instrument_id) = instrument_id {
2419 let instrument_positions = self
2420 .index
2421 .instrument_positions
2422 .get(instrument_id)
2423 .cloned()
2424 .unwrap_or_default();
2425
2426 if let Some(existing_query) = query {
2427 query = Some(
2428 existing_query
2429 .intersection(&instrument_positions)
2430 .copied()
2431 .collect(),
2432 );
2433 } else {
2434 query = Some(instrument_positions);
2435 }
2436 }
2437
2438 if let Some(strategy_id) = strategy_id {
2439 let strategy_positions = self
2440 .index
2441 .strategy_positions
2442 .get(strategy_id)
2443 .cloned()
2444 .unwrap_or_default();
2445
2446 if let Some(existing_query) = query {
2447 query = Some(
2448 existing_query
2449 .intersection(&strategy_positions)
2450 .copied()
2451 .collect(),
2452 );
2453 } else {
2454 query = Some(strategy_positions);
2455 }
2456 }
2457
2458 if let Some(account_id) = account_id {
2459 let account_positions = self
2460 .index
2461 .account_positions
2462 .get(account_id)
2463 .cloned()
2464 .unwrap_or_default();
2465
2466 if let Some(existing_query) = query {
2467 query = Some(
2468 existing_query
2469 .intersection(&account_positions)
2470 .copied()
2471 .collect(),
2472 );
2473 } else {
2474 query = Some(account_positions);
2475 }
2476 }
2477
2478 query
2479 }
2480
2481 fn get_orders_for_ids(
2487 &self,
2488 client_order_ids: &AHashSet<ClientOrderId>,
2489 side: Option<OrderSide>,
2490 ) -> Vec<&OrderAny> {
2491 let side = side.unwrap_or(OrderSide::NoOrderSide);
2492 let mut orders = Vec::new();
2493
2494 for client_order_id in client_order_ids {
2495 let order = self
2496 .orders
2497 .get(client_order_id)
2498 .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
2499
2500 if side == OrderSide::NoOrderSide || side == order.order_side() {
2501 orders.push(order);
2502 }
2503 }
2504
2505 orders
2506 }
2507
2508 fn get_positions_for_ids(
2514 &self,
2515 position_ids: &AHashSet<PositionId>,
2516 side: Option<PositionSide>,
2517 ) -> Vec<&Position> {
2518 let side = side.unwrap_or(PositionSide::NoPositionSide);
2519 let mut positions = Vec::new();
2520
2521 for position_id in position_ids {
2522 let position = self
2523 .positions
2524 .get(position_id)
2525 .unwrap_or_else(|| panic!("Position {position_id} not found"));
2526
2527 if side == PositionSide::NoPositionSide || side == position.side {
2528 positions.push(position);
2529 }
2530 }
2531
2532 positions
2533 }
2534
2535 #[must_use]
2537 pub fn client_order_ids(
2538 &self,
2539 venue: Option<&Venue>,
2540 instrument_id: Option<&InstrumentId>,
2541 strategy_id: Option<&StrategyId>,
2542 account_id: Option<&AccountId>,
2543 ) -> AHashSet<ClientOrderId> {
2544 let query =
2545 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2546 match query {
2547 Some(query) => self.index.orders.intersection(&query).copied().collect(),
2548 None => self.index.orders.clone(),
2549 }
2550 }
2551
2552 #[must_use]
2554 pub fn client_order_ids_open(
2555 &self,
2556 venue: Option<&Venue>,
2557 instrument_id: Option<&InstrumentId>,
2558 strategy_id: Option<&StrategyId>,
2559 account_id: Option<&AccountId>,
2560 ) -> AHashSet<ClientOrderId> {
2561 let query =
2562 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2563 match query {
2564 Some(query) => self
2565 .index
2566 .orders_open
2567 .intersection(&query)
2568 .copied()
2569 .collect(),
2570 None => self.index.orders_open.clone(),
2571 }
2572 }
2573
2574 #[must_use]
2576 pub fn client_order_ids_closed(
2577 &self,
2578 venue: Option<&Venue>,
2579 instrument_id: Option<&InstrumentId>,
2580 strategy_id: Option<&StrategyId>,
2581 account_id: Option<&AccountId>,
2582 ) -> AHashSet<ClientOrderId> {
2583 let query =
2584 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2585 match query {
2586 Some(query) => self
2587 .index
2588 .orders_closed
2589 .intersection(&query)
2590 .copied()
2591 .collect(),
2592 None => self.index.orders_closed.clone(),
2593 }
2594 }
2595
2596 #[must_use]
2601 pub fn client_order_ids_active_local(
2602 &self,
2603 venue: Option<&Venue>,
2604 instrument_id: Option<&InstrumentId>,
2605 strategy_id: Option<&StrategyId>,
2606 account_id: Option<&AccountId>,
2607 ) -> AHashSet<ClientOrderId> {
2608 let query =
2609 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2610 match query {
2611 Some(query) => self
2612 .index
2613 .orders_active_local
2614 .intersection(&query)
2615 .copied()
2616 .collect(),
2617 None => self.index.orders_active_local.clone(),
2618 }
2619 }
2620
2621 #[must_use]
2623 pub fn client_order_ids_emulated(
2624 &self,
2625 venue: Option<&Venue>,
2626 instrument_id: Option<&InstrumentId>,
2627 strategy_id: Option<&StrategyId>,
2628 account_id: Option<&AccountId>,
2629 ) -> AHashSet<ClientOrderId> {
2630 let query =
2631 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2632 match query {
2633 Some(query) => self
2634 .index
2635 .orders_emulated
2636 .intersection(&query)
2637 .copied()
2638 .collect(),
2639 None => self.index.orders_emulated.clone(),
2640 }
2641 }
2642
2643 #[must_use]
2645 pub fn client_order_ids_inflight(
2646 &self,
2647 venue: Option<&Venue>,
2648 instrument_id: Option<&InstrumentId>,
2649 strategy_id: Option<&StrategyId>,
2650 account_id: Option<&AccountId>,
2651 ) -> AHashSet<ClientOrderId> {
2652 let query =
2653 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2654 match query {
2655 Some(query) => self
2656 .index
2657 .orders_inflight
2658 .intersection(&query)
2659 .copied()
2660 .collect(),
2661 None => self.index.orders_inflight.clone(),
2662 }
2663 }
2664
2665 #[must_use]
2667 pub fn position_ids(
2668 &self,
2669 venue: Option<&Venue>,
2670 instrument_id: Option<&InstrumentId>,
2671 strategy_id: Option<&StrategyId>,
2672 account_id: Option<&AccountId>,
2673 ) -> AHashSet<PositionId> {
2674 let query =
2675 self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2676 match query {
2677 Some(query) => self.index.positions.intersection(&query).copied().collect(),
2678 None => self.index.positions.clone(),
2679 }
2680 }
2681
2682 #[must_use]
2684 pub fn position_open_ids(
2685 &self,
2686 venue: Option<&Venue>,
2687 instrument_id: Option<&InstrumentId>,
2688 strategy_id: Option<&StrategyId>,
2689 account_id: Option<&AccountId>,
2690 ) -> AHashSet<PositionId> {
2691 let query =
2692 self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2693 match query {
2694 Some(query) => self
2695 .index
2696 .positions_open
2697 .intersection(&query)
2698 .copied()
2699 .collect(),
2700 None => self.index.positions_open.clone(),
2701 }
2702 }
2703
2704 #[must_use]
2706 pub fn position_closed_ids(
2707 &self,
2708 venue: Option<&Venue>,
2709 instrument_id: Option<&InstrumentId>,
2710 strategy_id: Option<&StrategyId>,
2711 account_id: Option<&AccountId>,
2712 ) -> AHashSet<PositionId> {
2713 let query =
2714 self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2715 match query {
2716 Some(query) => self
2717 .index
2718 .positions_closed
2719 .intersection(&query)
2720 .copied()
2721 .collect(),
2722 None => self.index.positions_closed.clone(),
2723 }
2724 }
2725
2726 #[must_use]
2728 pub fn actor_ids(&self) -> AHashSet<ComponentId> {
2729 self.index.actors.clone()
2730 }
2731
2732 #[must_use]
2734 pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
2735 self.index.strategies.clone()
2736 }
2737
2738 #[must_use]
2740 pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
2741 self.index.exec_algorithms.clone()
2742 }
2743
2744 #[must_use]
2748 pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2749 self.orders.get(client_order_id)
2750 }
2751
2752 #[must_use]
2754 pub fn orders_for_ids(
2755 &self,
2756 client_order_ids: &[ClientOrderId],
2757 context: &dyn Display,
2758 ) -> Vec<OrderAny> {
2759 let mut orders = Vec::with_capacity(client_order_ids.len());
2760 for id in client_order_ids {
2761 match self.orders.get(id) {
2762 Some(order) => orders.push(order.clone()),
2763 None => log::error!("Order {id} not found in cache for {context}"),
2764 }
2765 }
2766 orders
2767 }
2768
2769 #[must_use]
2771 pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2772 self.orders.get_mut(client_order_id)
2773 }
2774
2775 #[must_use]
2777 pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2778 self.index.venue_order_ids.get(venue_order_id)
2779 }
2780
2781 #[must_use]
2783 pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2784 self.index.client_order_ids.get(client_order_id)
2785 }
2786
2787 #[must_use]
2789 pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2790 self.index.order_client.get(client_order_id)
2791 }
2792
2793 #[must_use]
2795 pub fn orders(
2796 &self,
2797 venue: Option<&Venue>,
2798 instrument_id: Option<&InstrumentId>,
2799 strategy_id: Option<&StrategyId>,
2800 account_id: Option<&AccountId>,
2801 side: Option<OrderSide>,
2802 ) -> Vec<&OrderAny> {
2803 let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id, account_id);
2804 self.get_orders_for_ids(&client_order_ids, side)
2805 }
2806
2807 #[must_use]
2809 pub fn orders_open(
2810 &self,
2811 venue: Option<&Venue>,
2812 instrument_id: Option<&InstrumentId>,
2813 strategy_id: Option<&StrategyId>,
2814 account_id: Option<&AccountId>,
2815 side: Option<OrderSide>,
2816 ) -> Vec<&OrderAny> {
2817 let client_order_ids =
2818 self.client_order_ids_open(venue, instrument_id, strategy_id, account_id);
2819 self.get_orders_for_ids(&client_order_ids, side)
2820 }
2821
2822 #[must_use]
2824 pub fn orders_closed(
2825 &self,
2826 venue: Option<&Venue>,
2827 instrument_id: Option<&InstrumentId>,
2828 strategy_id: Option<&StrategyId>,
2829 account_id: Option<&AccountId>,
2830 side: Option<OrderSide>,
2831 ) -> Vec<&OrderAny> {
2832 let client_order_ids =
2833 self.client_order_ids_closed(venue, instrument_id, strategy_id, account_id);
2834 self.get_orders_for_ids(&client_order_ids, side)
2835 }
2836
2837 #[must_use]
2842 pub fn orders_active_local(
2843 &self,
2844 venue: Option<&Venue>,
2845 instrument_id: Option<&InstrumentId>,
2846 strategy_id: Option<&StrategyId>,
2847 account_id: Option<&AccountId>,
2848 side: Option<OrderSide>,
2849 ) -> Vec<&OrderAny> {
2850 let client_order_ids =
2851 self.client_order_ids_active_local(venue, instrument_id, strategy_id, account_id);
2852 self.get_orders_for_ids(&client_order_ids, side)
2853 }
2854
2855 #[must_use]
2857 pub fn orders_emulated(
2858 &self,
2859 venue: Option<&Venue>,
2860 instrument_id: Option<&InstrumentId>,
2861 strategy_id: Option<&StrategyId>,
2862 account_id: Option<&AccountId>,
2863 side: Option<OrderSide>,
2864 ) -> Vec<&OrderAny> {
2865 let client_order_ids =
2866 self.client_order_ids_emulated(venue, instrument_id, strategy_id, account_id);
2867 self.get_orders_for_ids(&client_order_ids, side)
2868 }
2869
2870 #[must_use]
2872 pub fn orders_inflight(
2873 &self,
2874 venue: Option<&Venue>,
2875 instrument_id: Option<&InstrumentId>,
2876 strategy_id: Option<&StrategyId>,
2877 account_id: Option<&AccountId>,
2878 side: Option<OrderSide>,
2879 ) -> Vec<&OrderAny> {
2880 let client_order_ids =
2881 self.client_order_ids_inflight(venue, instrument_id, strategy_id, account_id);
2882 self.get_orders_for_ids(&client_order_ids, side)
2883 }
2884
2885 #[must_use]
2887 pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2888 let client_order_ids = self.index.position_orders.get(position_id);
2889 match client_order_ids {
2890 Some(client_order_ids) => {
2891 self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2892 }
2893 None => Vec::new(),
2894 }
2895 }
2896
2897 #[must_use]
2899 pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2900 self.index.orders.contains(client_order_id)
2901 }
2902
2903 #[must_use]
2905 pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2906 self.index.orders_open.contains(client_order_id)
2907 }
2908
2909 #[must_use]
2911 pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2912 self.index.orders_closed.contains(client_order_id)
2913 }
2914
2915 #[must_use]
2920 pub fn is_order_active_local(&self, client_order_id: &ClientOrderId) -> bool {
2921 self.index.orders_active_local.contains(client_order_id)
2922 }
2923
2924 #[must_use]
2926 pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2927 self.index.orders_emulated.contains(client_order_id)
2928 }
2929
2930 #[must_use]
2932 pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2933 self.index.orders_inflight.contains(client_order_id)
2934 }
2935
2936 #[must_use]
2938 pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2939 self.index.orders_pending_cancel.contains(client_order_id)
2940 }
2941
2942 #[must_use]
2944 pub fn orders_open_count(
2945 &self,
2946 venue: Option<&Venue>,
2947 instrument_id: Option<&InstrumentId>,
2948 strategy_id: Option<&StrategyId>,
2949 account_id: Option<&AccountId>,
2950 side: Option<OrderSide>,
2951 ) -> usize {
2952 self.orders_open(venue, instrument_id, strategy_id, account_id, side)
2953 .len()
2954 }
2955
2956 #[must_use]
2958 pub fn orders_closed_count(
2959 &self,
2960 venue: Option<&Venue>,
2961 instrument_id: Option<&InstrumentId>,
2962 strategy_id: Option<&StrategyId>,
2963 account_id: Option<&AccountId>,
2964 side: Option<OrderSide>,
2965 ) -> usize {
2966 self.orders_closed(venue, instrument_id, strategy_id, account_id, side)
2967 .len()
2968 }
2969
2970 #[must_use]
2975 pub fn orders_active_local_count(
2976 &self,
2977 venue: Option<&Venue>,
2978 instrument_id: Option<&InstrumentId>,
2979 strategy_id: Option<&StrategyId>,
2980 account_id: Option<&AccountId>,
2981 side: Option<OrderSide>,
2982 ) -> usize {
2983 self.orders_active_local(venue, instrument_id, strategy_id, account_id, side)
2984 .len()
2985 }
2986
2987 #[must_use]
2989 pub fn orders_emulated_count(
2990 &self,
2991 venue: Option<&Venue>,
2992 instrument_id: Option<&InstrumentId>,
2993 strategy_id: Option<&StrategyId>,
2994 account_id: Option<&AccountId>,
2995 side: Option<OrderSide>,
2996 ) -> usize {
2997 self.orders_emulated(venue, instrument_id, strategy_id, account_id, side)
2998 .len()
2999 }
3000
3001 #[must_use]
3003 pub fn orders_inflight_count(
3004 &self,
3005 venue: Option<&Venue>,
3006 instrument_id: Option<&InstrumentId>,
3007 strategy_id: Option<&StrategyId>,
3008 account_id: Option<&AccountId>,
3009 side: Option<OrderSide>,
3010 ) -> usize {
3011 self.orders_inflight(venue, instrument_id, strategy_id, account_id, side)
3012 .len()
3013 }
3014
3015 #[must_use]
3017 pub fn orders_total_count(
3018 &self,
3019 venue: Option<&Venue>,
3020 instrument_id: Option<&InstrumentId>,
3021 strategy_id: Option<&StrategyId>,
3022 account_id: Option<&AccountId>,
3023 side: Option<OrderSide>,
3024 ) -> usize {
3025 self.orders(venue, instrument_id, strategy_id, account_id, side)
3026 .len()
3027 }
3028
3029 #[must_use]
3031 pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
3032 self.order_lists.get(order_list_id)
3033 }
3034
3035 #[must_use]
3037 pub fn order_lists(
3038 &self,
3039 venue: Option<&Venue>,
3040 instrument_id: Option<&InstrumentId>,
3041 strategy_id: Option<&StrategyId>,
3042 account_id: Option<&AccountId>,
3043 ) -> Vec<&OrderList> {
3044 let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
3045
3046 if let Some(venue) = venue {
3047 order_lists.retain(|ol| &ol.instrument_id.venue == venue);
3048 }
3049
3050 if let Some(instrument_id) = instrument_id {
3051 order_lists.retain(|ol| &ol.instrument_id == instrument_id);
3052 }
3053
3054 if let Some(strategy_id) = strategy_id {
3055 order_lists.retain(|ol| &ol.strategy_id == strategy_id);
3056 }
3057
3058 if let Some(account_id) = account_id {
3059 order_lists.retain(|ol| {
3060 ol.client_order_ids.iter().any(|client_order_id| {
3061 self.orders
3062 .get(client_order_id)
3063 .is_some_and(|order| order.account_id().as_ref() == Some(account_id))
3064 })
3065 });
3066 }
3067
3068 order_lists
3069 }
3070
3071 #[must_use]
3073 pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
3074 self.order_lists.contains_key(order_list_id)
3075 }
3076
3077 #[must_use]
3082 pub fn orders_for_exec_algorithm(
3083 &self,
3084 exec_algorithm_id: &ExecAlgorithmId,
3085 venue: Option<&Venue>,
3086 instrument_id: Option<&InstrumentId>,
3087 strategy_id: Option<&StrategyId>,
3088 account_id: Option<&AccountId>,
3089 side: Option<OrderSide>,
3090 ) -> Vec<&OrderAny> {
3091 let query =
3092 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
3093 let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
3094
3095 if let Some(query) = query
3096 && let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids
3097 {
3098 let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
3099 }
3100
3101 if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
3102 self.get_orders_for_ids(exec_algorithm_order_ids, side)
3103 } else {
3104 Vec::new()
3105 }
3106 }
3107
3108 #[must_use]
3110 pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
3111 self.get_orders_for_ids(
3112 self.index
3113 .exec_spawn_orders
3114 .get(exec_spawn_id)
3115 .unwrap_or(&AHashSet::new()),
3116 None,
3117 )
3118 }
3119
3120 #[must_use]
3122 pub fn exec_spawn_total_quantity(
3123 &self,
3124 exec_spawn_id: &ClientOrderId,
3125 active_only: bool,
3126 ) -> Option<Quantity> {
3127 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
3128
3129 let mut total_quantity: Option<Quantity> = None;
3130
3131 for spawn_order in exec_spawn_orders {
3132 if active_only && spawn_order.is_closed() {
3133 continue;
3134 }
3135
3136 match total_quantity.as_mut() {
3137 Some(total) => *total = *total + spawn_order.quantity(),
3138 None => total_quantity = Some(spawn_order.quantity()),
3139 }
3140 }
3141
3142 total_quantity
3143 }
3144
3145 #[must_use]
3147 pub fn exec_spawn_total_filled_qty(
3148 &self,
3149 exec_spawn_id: &ClientOrderId,
3150 active_only: bool,
3151 ) -> Option<Quantity> {
3152 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
3153
3154 let mut total_quantity: Option<Quantity> = None;
3155
3156 for spawn_order in exec_spawn_orders {
3157 if active_only && spawn_order.is_closed() {
3158 continue;
3159 }
3160
3161 match total_quantity.as_mut() {
3162 Some(total) => *total = *total + spawn_order.filled_qty(),
3163 None => total_quantity = Some(spawn_order.filled_qty()),
3164 }
3165 }
3166
3167 total_quantity
3168 }
3169
3170 #[must_use]
3172 pub fn exec_spawn_total_leaves_qty(
3173 &self,
3174 exec_spawn_id: &ClientOrderId,
3175 active_only: bool,
3176 ) -> Option<Quantity> {
3177 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
3178
3179 let mut total_quantity: Option<Quantity> = None;
3180
3181 for spawn_order in exec_spawn_orders {
3182 if active_only && spawn_order.is_closed() {
3183 continue;
3184 }
3185
3186 match total_quantity.as_mut() {
3187 Some(total) => *total = *total + spawn_order.leaves_qty(),
3188 None => total_quantity = Some(spawn_order.leaves_qty()),
3189 }
3190 }
3191
3192 total_quantity
3193 }
3194
3195 #[must_use]
3199 pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
3200 self.positions.get(position_id)
3201 }
3202
3203 #[must_use]
3205 pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
3206 self.index
3207 .order_position
3208 .get(client_order_id)
3209 .and_then(|position_id| self.positions.get(position_id))
3210 }
3211
3212 #[must_use]
3214 pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
3215 self.index.order_position.get(client_order_id)
3216 }
3217
3218 #[must_use]
3220 pub fn positions(
3221 &self,
3222 venue: Option<&Venue>,
3223 instrument_id: Option<&InstrumentId>,
3224 strategy_id: Option<&StrategyId>,
3225 account_id: Option<&AccountId>,
3226 side: Option<PositionSide>,
3227 ) -> Vec<&Position> {
3228 let position_ids = self.position_ids(venue, instrument_id, strategy_id, account_id);
3229 self.get_positions_for_ids(&position_ids, side)
3230 }
3231
3232 #[must_use]
3234 pub fn positions_open(
3235 &self,
3236 venue: Option<&Venue>,
3237 instrument_id: Option<&InstrumentId>,
3238 strategy_id: Option<&StrategyId>,
3239 account_id: Option<&AccountId>,
3240 side: Option<PositionSide>,
3241 ) -> Vec<&Position> {
3242 let position_ids = self.position_open_ids(venue, instrument_id, strategy_id, account_id);
3243 self.get_positions_for_ids(&position_ids, side)
3244 }
3245
3246 #[must_use]
3248 pub fn positions_closed(
3249 &self,
3250 venue: Option<&Venue>,
3251 instrument_id: Option<&InstrumentId>,
3252 strategy_id: Option<&StrategyId>,
3253 account_id: Option<&AccountId>,
3254 side: Option<PositionSide>,
3255 ) -> Vec<&Position> {
3256 let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id, account_id);
3257 self.get_positions_for_ids(&position_ids, side)
3258 }
3259
3260 #[must_use]
3262 pub fn position_exists(&self, position_id: &PositionId) -> bool {
3263 self.index.positions.contains(position_id)
3264 }
3265
3266 #[must_use]
3268 pub fn is_position_open(&self, position_id: &PositionId) -> bool {
3269 self.index.positions_open.contains(position_id)
3270 }
3271
3272 #[must_use]
3274 pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
3275 self.index.positions_closed.contains(position_id)
3276 }
3277
3278 #[must_use]
3280 pub fn positions_open_count(
3281 &self,
3282 venue: Option<&Venue>,
3283 instrument_id: Option<&InstrumentId>,
3284 strategy_id: Option<&StrategyId>,
3285 account_id: Option<&AccountId>,
3286 side: Option<PositionSide>,
3287 ) -> usize {
3288 self.positions_open(venue, instrument_id, strategy_id, account_id, side)
3289 .len()
3290 }
3291
3292 #[must_use]
3294 pub fn positions_closed_count(
3295 &self,
3296 venue: Option<&Venue>,
3297 instrument_id: Option<&InstrumentId>,
3298 strategy_id: Option<&StrategyId>,
3299 account_id: Option<&AccountId>,
3300 side: Option<PositionSide>,
3301 ) -> usize {
3302 self.positions_closed(venue, instrument_id, strategy_id, account_id, side)
3303 .len()
3304 }
3305
3306 #[must_use]
3308 pub fn positions_total_count(
3309 &self,
3310 venue: Option<&Venue>,
3311 instrument_id: Option<&InstrumentId>,
3312 strategy_id: Option<&StrategyId>,
3313 account_id: Option<&AccountId>,
3314 side: Option<PositionSide>,
3315 ) -> usize {
3316 self.positions(venue, instrument_id, strategy_id, account_id, side)
3317 .len()
3318 }
3319
3320 #[must_use]
3324 pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
3325 self.index.order_strategy.get(client_order_id)
3326 }
3327
3328 #[must_use]
3330 pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
3331 self.index.position_strategy.get(position_id)
3332 }
3333
3334 pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
3342 check_valid_string_ascii(key, stringify!(key))?;
3343
3344 Ok(self.general.get(key))
3345 }
3346
3347 #[must_use]
3351 pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
3352 match price_type {
3353 PriceType::Bid => self
3354 .quotes
3355 .get(instrument_id)
3356 .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
3357 PriceType::Ask => self
3358 .quotes
3359 .get(instrument_id)
3360 .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
3361 PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
3362 quotes.front().map(|quote| {
3363 Price::new(
3364 f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
3365 quote.bid_price.precision + 1,
3366 )
3367 })
3368 }),
3369 PriceType::Last => self
3370 .trades
3371 .get(instrument_id)
3372 .and_then(|trades| trades.front().map(|trade| trade.price)),
3373 PriceType::Mark => self
3374 .mark_prices
3375 .get(instrument_id)
3376 .and_then(|marks| marks.front().map(|mark| mark.value)),
3377 }
3378 }
3379
3380 #[must_use]
3382 pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
3383 self.quotes
3384 .get(instrument_id)
3385 .map(|quotes| quotes.iter().copied().collect())
3386 }
3387
3388 #[must_use]
3390 pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
3391 self.trades
3392 .get(instrument_id)
3393 .map(|trades| trades.iter().copied().collect())
3394 }
3395
3396 #[must_use]
3398 pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
3399 self.mark_prices
3400 .get(instrument_id)
3401 .map(|mark_prices| mark_prices.iter().copied().collect())
3402 }
3403
3404 #[must_use]
3406 pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
3407 self.index_prices
3408 .get(instrument_id)
3409 .map(|index_prices| index_prices.iter().copied().collect())
3410 }
3411
3412 #[must_use]
3414 pub fn funding_rates(&self, instrument_id: &InstrumentId) -> Option<Vec<FundingRateUpdate>> {
3415 self.funding_rates
3416 .get(instrument_id)
3417 .map(|funding_rates| funding_rates.iter().copied().collect())
3418 }
3419
3420 #[must_use]
3422 pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
3423 self.bars
3424 .get(bar_type)
3425 .map(|bars| bars.iter().copied().collect())
3426 }
3427
3428 #[must_use]
3430 pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
3431 self.books.get(instrument_id)
3432 }
3433
3434 #[must_use]
3436 pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
3437 self.books.get_mut(instrument_id)
3438 }
3439
3440 #[must_use]
3442 pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
3443 self.own_books.get(instrument_id)
3444 }
3445
3446 #[must_use]
3448 pub fn own_order_book_mut(
3449 &mut self,
3450 instrument_id: &InstrumentId,
3451 ) -> Option<&mut OwnOrderBook> {
3452 self.own_books.get_mut(instrument_id)
3453 }
3454
3455 #[must_use]
3457 pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
3458 self.quotes
3459 .get(instrument_id)
3460 .and_then(|quotes| quotes.front())
3461 }
3462
3463 #[must_use]
3467 pub fn quote_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<&QuoteTick> {
3468 self.quotes
3469 .get(instrument_id)
3470 .and_then(|quotes| quotes.get(index))
3471 }
3472
3473 #[must_use]
3475 pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
3476 self.trades
3477 .get(instrument_id)
3478 .and_then(|trades| trades.front())
3479 }
3480
3481 #[must_use]
3485 pub fn trade_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<&TradeTick> {
3486 self.trades
3487 .get(instrument_id)
3488 .and_then(|trades| trades.get(index))
3489 }
3490
3491 #[must_use]
3493 pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
3494 self.mark_prices
3495 .get(instrument_id)
3496 .and_then(|mark_prices| mark_prices.front())
3497 }
3498
3499 #[must_use]
3501 pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
3502 self.index_prices
3503 .get(instrument_id)
3504 .and_then(|index_prices| index_prices.front())
3505 }
3506
3507 #[must_use]
3509 pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
3510 self.funding_rates
3511 .get(instrument_id)
3512 .and_then(|funding_rates| funding_rates.front())
3513 }
3514
3515 #[must_use]
3517 pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
3518 self.bars.get(bar_type).and_then(|bars| bars.front())
3519 }
3520
3521 #[must_use]
3525 pub fn bar_at_index(&self, bar_type: &BarType, index: usize) -> Option<&Bar> {
3526 self.bars.get(bar_type).and_then(|bars| bars.get(index))
3527 }
3528
3529 #[must_use]
3531 pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
3532 self.books
3533 .get(instrument_id)
3534 .map_or(0, |book| book.update_count) as usize
3535 }
3536
3537 #[must_use]
3539 pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
3540 self.quotes
3541 .get(instrument_id)
3542 .map_or(0, std::collections::VecDeque::len)
3543 }
3544
3545 #[must_use]
3547 pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
3548 self.trades
3549 .get(instrument_id)
3550 .map_or(0, std::collections::VecDeque::len)
3551 }
3552
3553 #[must_use]
3555 pub fn bar_count(&self, bar_type: &BarType) -> usize {
3556 self.bars
3557 .get(bar_type)
3558 .map_or(0, std::collections::VecDeque::len)
3559 }
3560
3561 #[must_use]
3563 pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
3564 self.books.contains_key(instrument_id)
3565 }
3566
3567 #[must_use]
3569 pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
3570 self.quote_count(instrument_id) > 0
3571 }
3572
3573 #[must_use]
3575 pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
3576 self.trade_count(instrument_id) > 0
3577 }
3578
3579 #[must_use]
3581 pub fn has_bars(&self, bar_type: &BarType) -> bool {
3582 self.bar_count(bar_type) > 0
3583 }
3584
3585 #[must_use]
3586 pub fn get_xrate(
3587 &self,
3588 venue: Venue,
3589 from_currency: Currency,
3590 to_currency: Currency,
3591 price_type: PriceType,
3592 ) -> Option<f64> {
3593 if from_currency == to_currency {
3594 return Some(1.0);
3597 }
3598
3599 let (bid_quote, ask_quote) = self.build_quote_table(&venue);
3600
3601 match get_exchange_rate(
3602 from_currency.code,
3603 to_currency.code,
3604 price_type,
3605 bid_quote,
3606 ask_quote,
3607 ) {
3608 Ok(rate) => rate,
3609 Err(e) => {
3610 log::error!("Failed to calculate xrate: {e}");
3611 None
3612 }
3613 }
3614 }
3615
3616 fn build_quote_table(&self, venue: &Venue) -> (AHashMap<String, f64>, AHashMap<String, f64>) {
3617 let mut bid_quotes = AHashMap::new();
3618 let mut ask_quotes = AHashMap::new();
3619
3620 for instrument_id in self.instruments.keys() {
3621 if instrument_id.venue != *venue {
3622 continue;
3623 }
3624
3625 let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
3626 if let Some(tick) = ticks.front() {
3627 (tick.bid_price, tick.ask_price)
3628 } else {
3629 continue; }
3631 } else {
3632 let bid_bar = self
3633 .bars
3634 .iter()
3635 .find(|(k, _)| {
3636 k.instrument_id() == *instrument_id
3637 && matches!(k.spec().price_type, PriceType::Bid)
3638 })
3639 .map(|(_, v)| v);
3640
3641 let ask_bar = self
3642 .bars
3643 .iter()
3644 .find(|(k, _)| {
3645 k.instrument_id() == *instrument_id
3646 && matches!(k.spec().price_type, PriceType::Ask)
3647 })
3648 .map(|(_, v)| v);
3649
3650 match (bid_bar, ask_bar) {
3651 (Some(bid), Some(ask)) => {
3652 match (bid.front(), ask.front()) {
3653 (Some(bid_bar), Some(ask_bar)) => (bid_bar.close, ask_bar.close),
3654 _ => {
3655 continue;
3657 }
3658 }
3659 }
3660 _ => continue,
3661 }
3662 };
3663
3664 bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
3665 ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
3666 }
3667
3668 (bid_quotes, ask_quotes)
3669 }
3670
3671 #[must_use]
3673 pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
3674 self.mark_xrates.get(&(from_currency, to_currency)).copied()
3675 }
3676
3677 pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
3683 assert!(xrate > 0.0, "xrate was zero");
3684 self.mark_xrates.insert((from_currency, to_currency), xrate);
3685 self.mark_xrates
3686 .insert((to_currency, from_currency), 1.0 / xrate);
3687 }
3688
3689 pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
3691 let _ = self.mark_xrates.remove(&(from_currency, to_currency));
3692 }
3693
3694 pub fn clear_mark_xrates(&mut self) {
3696 self.mark_xrates.clear();
3697 }
3698
3699 #[must_use]
3703 pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
3704 self.instruments.get(instrument_id)
3705 }
3706
3707 #[must_use]
3709 pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
3710 match venue {
3711 Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
3712 None => self.instruments.keys().collect(),
3713 }
3714 }
3715
3716 #[must_use]
3718 pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
3719 self.instruments
3720 .values()
3721 .filter(|i| &i.id().venue == venue)
3722 .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
3723 .collect()
3724 }
3725
3726 #[must_use]
3728 pub fn bar_types(
3729 &self,
3730 instrument_id: Option<&InstrumentId>,
3731 price_type: Option<&PriceType>,
3732 aggregation_source: AggregationSource,
3733 ) -> Vec<&BarType> {
3734 let mut bar_types = self
3735 .bars
3736 .keys()
3737 .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
3738 .collect::<Vec<&BarType>>();
3739
3740 if let Some(instrument_id) = instrument_id {
3741 bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
3742 }
3743
3744 if let Some(price_type) = price_type {
3745 bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
3746 }
3747
3748 bar_types
3749 }
3750
3751 #[must_use]
3755 pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
3756 self.synthetics.get(instrument_id)
3757 }
3758
3759 #[must_use]
3761 pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
3762 self.synthetics.keys().collect()
3763 }
3764
3765 #[must_use]
3767 pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
3768 self.synthetics.values().collect()
3769 }
3770
3771 #[must_use]
3775 pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
3776 self.accounts.get(account_id)
3777 }
3778
3779 #[must_use]
3781 pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
3782 self.index
3783 .venue_account
3784 .get(venue)
3785 .and_then(|account_id| self.accounts.get(account_id))
3786 }
3787
3788 #[must_use]
3790 pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
3791 self.index.venue_account.get(venue)
3792 }
3793
3794 #[must_use]
3796 pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
3797 self.accounts
3798 .values()
3799 .filter(|account| &account.id() == account_id)
3800 .collect()
3801 }
3802
3803 pub fn update_own_order_book(&mut self, order: &OrderAny) {
3811 if !order.has_price() {
3812 return;
3813 }
3814
3815 let instrument_id = order.instrument_id();
3816
3817 let own_book = self
3818 .own_books
3819 .entry(instrument_id)
3820 .or_insert_with(|| OwnOrderBook::new(instrument_id));
3821
3822 let own_book_order = order.to_own_book_order();
3823
3824 if order.is_closed() {
3825 if let Err(e) = own_book.delete(own_book_order) {
3826 log::debug!(
3827 "Failed to delete order {} from own book: {e}",
3828 order.client_order_id(),
3829 );
3830 } else {
3831 log::debug!("Deleted order {} from own book", order.client_order_id());
3832 }
3833 } else {
3834 if let Err(e) = own_book.update(own_book_order) {
3836 log::debug!(
3837 "Failed to update order {} in own book: {e}; inserting instead",
3838 order.client_order_id(),
3839 );
3840 own_book.add(own_book_order);
3841 }
3842 log::debug!("Updated order {} in own book", order.client_order_id());
3843 }
3844 }
3845
3846 pub fn force_remove_from_own_order_book(&mut self, client_order_id: &ClientOrderId) {
3852 let order = match self.orders.get(client_order_id) {
3853 Some(order) => order,
3854 None => return,
3855 };
3856
3857 self.index.orders_open.remove(client_order_id);
3858 self.index.orders_pending_cancel.remove(client_order_id);
3859 self.index.orders_inflight.remove(client_order_id);
3860 self.index.orders_emulated.remove(client_order_id);
3861 self.index.orders_active_local.remove(client_order_id);
3862
3863 if let Some(own_book) = self.own_books.get_mut(&order.instrument_id())
3864 && order.has_price()
3865 {
3866 let own_book_order = order.to_own_book_order();
3867 if let Err(e) = own_book.delete(own_book_order) {
3868 log::debug!("Could not force delete {client_order_id} from own book: {e}");
3869 } else {
3870 log::debug!("Force deleted {client_order_id} from own book");
3871 }
3872 }
3873
3874 self.index.orders_closed.insert(*client_order_id);
3875 }
3876
3877 pub fn audit_own_order_books(&mut self) {
3884 log::debug!("Starting own books audit");
3885 let start = std::time::Instant::now();
3886
3887 let valid_order_ids: AHashSet<ClientOrderId> = self
3890 .index
3891 .orders_open
3892 .union(&self.index.orders_inflight)
3893 .copied()
3894 .collect();
3895
3896 for own_book in self.own_books.values_mut() {
3897 own_book.audit_open_orders(&valid_order_ids);
3898 }
3899
3900 log::debug!("Completed own books audit in {:?}", start.elapsed());
3901 }
3902}