1pub mod config;
21pub mod database;
22pub mod fifo;
23pub mod quote;
24
25mod index;
26
27#[cfg(test)]
28mod tests;
29
30use std::{
31 collections::VecDeque,
32 fmt::{Debug, Display},
33 time::{SystemTime, UNIX_EPOCH},
34};
35
36use ahash::{AHashMap, AHashSet};
37use bytes::Bytes;
38pub use config::CacheConfig; use database::{CacheDatabaseAdapter, CacheMap};
40use index::CacheIndex;
41use nautilus_core::{
42 UUID4, UnixNanos,
43 correctness::{
44 check_key_not_in_map, check_predicate_false, check_slice_not_empty,
45 check_valid_string_ascii,
46 },
47 datetime::secs_to_nanos_unchecked,
48};
49use nautilus_model::{
50 accounts::{Account, AccountAny},
51 data::{
52 Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate, QuoteTick,
53 TradeTick, YieldCurveData,
54 },
55 enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
56 identifiers::{
57 AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
58 OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
59 },
60 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
61 orderbook::{
62 OrderBook,
63 own::{OwnOrderBook, should_handle_own_book_order},
64 },
65 orders::{Order, OrderAny, OrderList},
66 position::Position,
67 types::{Currency, Money, Price, Quantity},
68};
69use ustr::Ustr;
70
71use crate::xrate::get_exchange_rate;
72
73#[cfg_attr(
75 feature = "python",
76 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
77)]
78pub struct Cache {
79 config: CacheConfig,
80 index: CacheIndex,
81 database: Option<Box<dyn CacheDatabaseAdapter>>,
82 general: AHashMap<String, Bytes>,
83 currencies: AHashMap<Ustr, Currency>,
84 instruments: AHashMap<InstrumentId, InstrumentAny>,
85 synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
86 books: AHashMap<InstrumentId, OrderBook>,
87 own_books: AHashMap<InstrumentId, OwnOrderBook>,
88 quotes: AHashMap<InstrumentId, VecDeque<QuoteTick>>,
89 trades: AHashMap<InstrumentId, VecDeque<TradeTick>>,
90 mark_xrates: AHashMap<(Currency, Currency), f64>,
91 mark_prices: AHashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
92 index_prices: AHashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
93 funding_rates: AHashMap<InstrumentId, VecDeque<FundingRateUpdate>>,
94 bars: AHashMap<BarType, VecDeque<Bar>>,
95 greeks: AHashMap<InstrumentId, GreeksData>,
96 yield_curves: AHashMap<String, YieldCurveData>,
97 accounts: AHashMap<AccountId, AccountAny>,
98 orders: AHashMap<ClientOrderId, OrderAny>,
99 order_lists: AHashMap<OrderListId, OrderList>,
100 positions: AHashMap<PositionId, Position>,
101 position_snapshots: AHashMap<PositionId, Bytes>,
102 #[cfg(feature = "defi")]
103 pub(crate) defi: crate::defi::cache::DefiCache,
104}
105
106impl Debug for Cache {
107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 f.debug_struct(stringify!(Cache))
109 .field("config", &self.config)
110 .field("index", &self.index)
111 .field("general", &self.general)
112 .field("currencies", &self.currencies)
113 .field("instruments", &self.instruments)
114 .field("synthetics", &self.synthetics)
115 .field("books", &self.books)
116 .field("own_books", &self.own_books)
117 .field("quotes", &self.quotes)
118 .field("trades", &self.trades)
119 .field("mark_xrates", &self.mark_xrates)
120 .field("mark_prices", &self.mark_prices)
121 .field("index_prices", &self.index_prices)
122 .field("funding_rates", &self.funding_rates)
123 .field("bars", &self.bars)
124 .field("greeks", &self.greeks)
125 .field("yield_curves", &self.yield_curves)
126 .field("accounts", &self.accounts)
127 .field("orders", &self.orders)
128 .field("order_lists", &self.order_lists)
129 .field("positions", &self.positions)
130 .field("position_snapshots", &self.position_snapshots)
131 .finish()
132 }
133}
134
135impl Default for Cache {
136 fn default() -> Self {
138 Self::new(Some(CacheConfig::default()), None)
139 }
140}
141
142impl Cache {
143 #[must_use]
145 pub fn new(
149 config: Option<CacheConfig>,
150 database: Option<Box<dyn CacheDatabaseAdapter>>,
151 ) -> Self {
152 Self {
153 config: config.unwrap_or_default(),
154 index: CacheIndex::default(),
155 database,
156 general: AHashMap::new(),
157 currencies: AHashMap::new(),
158 instruments: AHashMap::new(),
159 synthetics: AHashMap::new(),
160 books: AHashMap::new(),
161 own_books: AHashMap::new(),
162 quotes: AHashMap::new(),
163 trades: AHashMap::new(),
164 mark_xrates: AHashMap::new(),
165 mark_prices: AHashMap::new(),
166 index_prices: AHashMap::new(),
167 funding_rates: AHashMap::new(),
168 bars: AHashMap::new(),
169 greeks: AHashMap::new(),
170 yield_curves: AHashMap::new(),
171 accounts: AHashMap::new(),
172 orders: AHashMap::new(),
173 order_lists: AHashMap::new(),
174 positions: AHashMap::new(),
175 position_snapshots: AHashMap::new(),
176 #[cfg(feature = "defi")]
177 defi: crate::defi::cache::DefiCache::default(),
178 }
179 }
180
181 #[must_use]
183 pub fn memory_address(&self) -> String {
184 format!("{:?}", std::ptr::from_ref(self))
185 }
186
187 pub fn set_database(&mut self, database: Box<dyn CacheDatabaseAdapter>) {
191 let type_name = std::any::type_name_of_val(&*database);
192 log::info!("Cache database adapter set: {type_name}");
193 self.database = Some(database);
194 }
195
196 pub fn cache_general(&mut self) -> anyhow::Result<()> {
204 self.general = match &mut self.database {
205 Some(db) => db.load()?,
206 None => AHashMap::new(),
207 };
208
209 log::info!(
210 "Cached {} general object(s) from database",
211 self.general.len()
212 );
213 Ok(())
214 }
215
216 pub async fn cache_all(&mut self) -> anyhow::Result<()> {
222 let cache_map = match &self.database {
223 Some(db) => db.load_all().await?,
224 None => CacheMap::default(),
225 };
226
227 self.currencies = cache_map.currencies;
228 self.instruments = cache_map.instruments;
229 self.synthetics = cache_map.synthetics;
230 self.accounts = cache_map.accounts;
231 self.orders = cache_map.orders;
232 self.positions = cache_map.positions;
233 Ok(())
234 }
235
236 pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
242 self.currencies = match &mut self.database {
243 Some(db) => db.load_currencies().await?,
244 None => AHashMap::new(),
245 };
246
247 log::info!("Cached {} currencies from database", self.general.len());
248 Ok(())
249 }
250
251 pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
257 self.instruments = match &mut self.database {
258 Some(db) => db.load_instruments().await?,
259 None => AHashMap::new(),
260 };
261
262 log::info!("Cached {} instruments from database", self.general.len());
263 Ok(())
264 }
265
266 pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
272 self.synthetics = match &mut self.database {
273 Some(db) => db.load_synthetics().await?,
274 None => AHashMap::new(),
275 };
276
277 log::info!(
278 "Cached {} synthetic instruments from database",
279 self.general.len()
280 );
281 Ok(())
282 }
283
284 pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
290 self.accounts = match &mut self.database {
291 Some(db) => db.load_accounts().await?,
292 None => AHashMap::new(),
293 };
294
295 log::info!(
296 "Cached {} synthetic instruments from database",
297 self.general.len()
298 );
299 Ok(())
300 }
301
302 pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
308 self.orders = match &mut self.database {
309 Some(db) => db.load_orders().await?,
310 None => AHashMap::new(),
311 };
312
313 log::info!("Cached {} orders from database", self.general.len());
314 Ok(())
315 }
316
317 pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
323 self.positions = match &mut self.database {
324 Some(db) => db.load_positions().await?,
325 None => AHashMap::new(),
326 };
327
328 log::info!("Cached {} positions from database", self.general.len());
329 Ok(())
330 }
331
332 pub fn build_index(&mut self) {
334 log::debug!("Building index");
335
336 for account_id in self.accounts.keys() {
338 self.index
339 .venue_account
340 .insert(account_id.get_issuer(), *account_id);
341 }
342
343 for (client_order_id, order) in &self.orders {
345 let instrument_id = order.instrument_id();
346 let venue = instrument_id.venue;
347 let strategy_id = order.strategy_id();
348
349 self.index
351 .venue_orders
352 .entry(venue)
353 .or_default()
354 .insert(*client_order_id);
355
356 if let Some(venue_order_id) = order.venue_order_id() {
358 self.index
359 .venue_order_ids
360 .insert(venue_order_id, *client_order_id);
361 }
362
363 if let Some(position_id) = order.position_id() {
365 self.index
366 .order_position
367 .insert(*client_order_id, position_id);
368 }
369
370 self.index
372 .order_strategy
373 .insert(*client_order_id, order.strategy_id());
374
375 self.index
377 .instrument_orders
378 .entry(instrument_id)
379 .or_default()
380 .insert(*client_order_id);
381
382 self.index
384 .strategy_orders
385 .entry(strategy_id)
386 .or_default()
387 .insert(*client_order_id);
388
389 if let Some(account_id) = order.account_id() {
391 self.index
392 .account_orders
393 .entry(account_id)
394 .or_default()
395 .insert(*client_order_id);
396 }
397
398 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
400 self.index
401 .exec_algorithm_orders
402 .entry(exec_algorithm_id)
403 .or_default()
404 .insert(*client_order_id);
405 }
406
407 if let Some(exec_spawn_id) = order.exec_spawn_id() {
409 self.index
410 .exec_spawn_orders
411 .entry(exec_spawn_id)
412 .or_default()
413 .insert(*client_order_id);
414 }
415
416 self.index.orders.insert(*client_order_id);
418
419 if order.is_open() {
421 self.index.orders_open.insert(*client_order_id);
422 }
423
424 if order.is_closed() {
426 self.index.orders_closed.insert(*client_order_id);
427 }
428
429 if let Some(emulation_trigger) = order.emulation_trigger()
431 && emulation_trigger != TriggerType::NoTrigger
432 && !order.is_closed()
433 {
434 self.index.orders_emulated.insert(*client_order_id);
435 }
436
437 if order.is_inflight() {
439 self.index.orders_inflight.insert(*client_order_id);
440 }
441
442 self.index.strategies.insert(strategy_id);
444
445 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
447 self.index.exec_algorithms.insert(exec_algorithm_id);
448 }
449 }
450
451 for (position_id, position) in &self.positions {
453 let instrument_id = position.instrument_id;
454 let venue = instrument_id.venue;
455 let strategy_id = position.strategy_id;
456
457 self.index
459 .venue_positions
460 .entry(venue)
461 .or_default()
462 .insert(*position_id);
463
464 self.index
466 .position_strategy
467 .insert(*position_id, position.strategy_id);
468
469 self.index
471 .position_orders
472 .entry(*position_id)
473 .or_default()
474 .extend(position.client_order_ids().into_iter());
475
476 self.index
478 .instrument_positions
479 .entry(instrument_id)
480 .or_default()
481 .insert(*position_id);
482
483 self.index
485 .strategy_positions
486 .entry(strategy_id)
487 .or_default()
488 .insert(*position_id);
489
490 self.index
492 .account_positions
493 .entry(position.account_id)
494 .or_default()
495 .insert(*position_id);
496
497 self.index.positions.insert(*position_id);
499
500 if position.is_open() {
502 self.index.positions_open.insert(*position_id);
503 }
504
505 if position.is_closed() {
507 self.index.positions_closed.insert(*position_id);
508 }
509
510 self.index.strategies.insert(strategy_id);
512 }
513 }
514
515 #[must_use]
517 pub const fn has_backing(&self) -> bool {
518 self.config.database.is_some()
519 }
520
521 #[must_use]
523 pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
524 let quote = if let Some(quote) = self.quote(&position.instrument_id) {
525 quote
526 } else {
527 log::warn!(
528 "Cannot calculate unrealized PnL for {}, no quotes for {}",
529 position.id,
530 position.instrument_id
531 );
532 return None;
533 };
534
535 let last = match position.side {
537 PositionSide::Flat | PositionSide::NoPositionSide => {
538 return Some(Money::new(0.0, position.settlement_currency));
539 }
540 PositionSide::Long => quote.bid_price,
541 PositionSide::Short => quote.ask_price,
542 };
543
544 Some(position.unrealized_pnl(last))
545 }
546
547 #[must_use]
556 pub fn check_integrity(&mut self) -> bool {
557 let mut error_count = 0;
558 let failure = "Integrity failure";
559
560 let timestamp_us = SystemTime::now()
562 .duration_since(UNIX_EPOCH)
563 .expect("Time went backwards")
564 .as_micros();
565
566 log::info!("Checking data integrity");
567
568 for account_id in self.accounts.keys() {
570 if !self
571 .index
572 .venue_account
573 .contains_key(&account_id.get_issuer())
574 {
575 log::error!(
576 "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
577 );
578 error_count += 1;
579 }
580 }
581
582 for (client_order_id, order) in &self.orders {
583 if !self.index.order_strategy.contains_key(client_order_id) {
584 log::error!(
585 "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
586 );
587 error_count += 1;
588 }
589
590 if !self.index.orders.contains(client_order_id) {
591 log::error!(
592 "{failure} in orders: {client_order_id} not found in `self.index.orders`",
593 );
594 error_count += 1;
595 }
596
597 if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
598 log::error!(
599 "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
600 );
601 error_count += 1;
602 }
603
604 if order.is_open() && !self.index.orders_open.contains(client_order_id) {
605 log::error!(
606 "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
607 );
608 error_count += 1;
609 }
610
611 if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
612 log::error!(
613 "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
614 );
615 error_count += 1;
616 }
617
618 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
619 if !self
620 .index
621 .exec_algorithm_orders
622 .contains_key(&exec_algorithm_id)
623 {
624 log::error!(
625 "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
626 );
627 error_count += 1;
628 }
629
630 if order.exec_spawn_id().is_none()
631 && !self.index.exec_spawn_orders.contains_key(client_order_id)
632 {
633 log::error!(
634 "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
635 );
636 error_count += 1;
637 }
638 }
639 }
640
641 for (position_id, position) in &self.positions {
642 if !self.index.position_strategy.contains_key(position_id) {
643 log::error!(
644 "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
645 );
646 error_count += 1;
647 }
648
649 if !self.index.position_orders.contains_key(position_id) {
650 log::error!(
651 "{failure} in positions: {position_id} not found in `self.index.position_orders`",
652 );
653 error_count += 1;
654 }
655
656 if !self.index.positions.contains(position_id) {
657 log::error!(
658 "{failure} in positions: {position_id} not found in `self.index.positions`",
659 );
660 error_count += 1;
661 }
662
663 if position.is_open() && !self.index.positions_open.contains(position_id) {
664 log::error!(
665 "{failure} in positions: {position_id} not found in `self.index.positions_open`",
666 );
667 error_count += 1;
668 }
669
670 if position.is_closed() && !self.index.positions_closed.contains(position_id) {
671 log::error!(
672 "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
673 );
674 error_count += 1;
675 }
676 }
677
678 for account_id in self.index.venue_account.values() {
680 if !self.accounts.contains_key(account_id) {
681 log::error!(
682 "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
683 );
684 error_count += 1;
685 }
686 }
687
688 for client_order_id in self.index.venue_order_ids.values() {
689 if !self.orders.contains_key(client_order_id) {
690 log::error!(
691 "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
692 );
693 error_count += 1;
694 }
695 }
696
697 for client_order_id in self.index.client_order_ids.keys() {
698 if !self.orders.contains_key(client_order_id) {
699 log::error!(
700 "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
701 );
702 error_count += 1;
703 }
704 }
705
706 for client_order_id in self.index.order_position.keys() {
707 if !self.orders.contains_key(client_order_id) {
708 log::error!(
709 "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
710 );
711 error_count += 1;
712 }
713 }
714
715 for client_order_id in self.index.order_strategy.keys() {
717 if !self.orders.contains_key(client_order_id) {
718 log::error!(
719 "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
720 );
721 error_count += 1;
722 }
723 }
724
725 for position_id in self.index.position_strategy.keys() {
726 if !self.positions.contains_key(position_id) {
727 log::error!(
728 "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
729 );
730 error_count += 1;
731 }
732 }
733
734 for position_id in self.index.position_orders.keys() {
735 if !self.positions.contains_key(position_id) {
736 log::error!(
737 "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
738 );
739 error_count += 1;
740 }
741 }
742
743 for (instrument_id, client_order_ids) in &self.index.instrument_orders {
744 for client_order_id in client_order_ids {
745 if !self.orders.contains_key(client_order_id) {
746 log::error!(
747 "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
748 );
749 error_count += 1;
750 }
751 }
752 }
753
754 for instrument_id in self.index.instrument_positions.keys() {
755 if !self.index.instrument_orders.contains_key(instrument_id) {
756 log::error!(
757 "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
758 );
759 error_count += 1;
760 }
761 }
762
763 for client_order_ids in self.index.strategy_orders.values() {
764 for client_order_id in client_order_ids {
765 if !self.orders.contains_key(client_order_id) {
766 log::error!(
767 "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
768 );
769 error_count += 1;
770 }
771 }
772 }
773
774 for position_ids in self.index.strategy_positions.values() {
775 for position_id in position_ids {
776 if !self.positions.contains_key(position_id) {
777 log::error!(
778 "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
779 );
780 error_count += 1;
781 }
782 }
783 }
784
785 for client_order_id in &self.index.orders {
786 if !self.orders.contains_key(client_order_id) {
787 log::error!(
788 "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
789 );
790 error_count += 1;
791 }
792 }
793
794 for client_order_id in &self.index.orders_emulated {
795 if !self.orders.contains_key(client_order_id) {
796 log::error!(
797 "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
798 );
799 error_count += 1;
800 }
801 }
802
803 for client_order_id in &self.index.orders_inflight {
804 if !self.orders.contains_key(client_order_id) {
805 log::error!(
806 "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
807 );
808 error_count += 1;
809 }
810 }
811
812 for client_order_id in &self.index.orders_open {
813 if !self.orders.contains_key(client_order_id) {
814 log::error!(
815 "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
816 );
817 error_count += 1;
818 }
819 }
820
821 for client_order_id in &self.index.orders_closed {
822 if !self.orders.contains_key(client_order_id) {
823 log::error!(
824 "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
825 );
826 error_count += 1;
827 }
828 }
829
830 for position_id in &self.index.positions {
831 if !self.positions.contains_key(position_id) {
832 log::error!(
833 "{failure} in `index.positions`: {position_id} not found in `self.positions`",
834 );
835 error_count += 1;
836 }
837 }
838
839 for position_id in &self.index.positions_open {
840 if !self.positions.contains_key(position_id) {
841 log::error!(
842 "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
843 );
844 error_count += 1;
845 }
846 }
847
848 for position_id in &self.index.positions_closed {
849 if !self.positions.contains_key(position_id) {
850 log::error!(
851 "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
852 );
853 error_count += 1;
854 }
855 }
856
857 for strategy_id in &self.index.strategies {
858 if !self.index.strategy_orders.contains_key(strategy_id) {
859 log::error!(
860 "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
861 );
862 error_count += 1;
863 }
864 }
865
866 for exec_algorithm_id in &self.index.exec_algorithms {
867 if !self
868 .index
869 .exec_algorithm_orders
870 .contains_key(exec_algorithm_id)
871 {
872 log::error!(
873 "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
874 );
875 error_count += 1;
876 }
877 }
878
879 let total_us = SystemTime::now()
880 .duration_since(UNIX_EPOCH)
881 .expect("Time went backwards")
882 .as_micros()
883 - timestamp_us;
884
885 if error_count == 0 {
886 log::info!("Integrity check passed in {total_us}μs");
887 true
888 } else {
889 log::error!(
890 "Integrity check failed with {error_count} error{} in {total_us}μs",
891 if error_count == 1 { "" } else { "s" },
892 );
893 false
894 }
895 }
896
897 #[must_use]
901 pub fn check_residuals(&self) -> bool {
902 log::debug!("Checking residuals");
903
904 let mut residuals = false;
905
906 for order in self.orders_open(None, None, None, None, None) {
908 residuals = true;
909 log::warn!("Residual {order}");
910 }
911
912 for position in self.positions_open(None, None, None, None, None) {
914 residuals = true;
915 log::warn!("Residual {position}");
916 }
917
918 residuals
919 }
920
921 pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
927 log::debug!(
928 "Purging closed orders{}",
929 if buffer_secs > 0 {
930 format!(" with buffer_secs={buffer_secs}")
931 } else {
932 String::new()
933 }
934 );
935
936 let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
937
938 let mut affected_order_list_ids: AHashSet<OrderListId> = AHashSet::new();
939
940 'outer: for client_order_id in self.index.orders_closed.clone() {
941 if let Some(order) = self.orders.get(&client_order_id)
942 && order.is_closed()
943 && let Some(ts_closed) = order.ts_closed()
944 && ts_closed + buffer_ns <= ts_now
945 {
946 if let Some(linked_order_ids) = order.linked_order_ids() {
948 for linked_order_id in linked_order_ids {
949 if let Some(linked_order) = self.orders.get(linked_order_id)
950 && linked_order.is_open()
951 {
952 continue 'outer;
954 }
955 }
956 }
957
958 if let Some(order_list_id) = order.order_list_id() {
959 affected_order_list_ids.insert(order_list_id);
960 }
961
962 self.purge_order(client_order_id);
963 }
964 }
965
966 for order_list_id in affected_order_list_ids {
967 if let Some(order_list) = self.order_lists.get(&order_list_id) {
968 let all_purged = order_list
969 .client_order_ids
970 .iter()
971 .all(|id| !self.orders.contains_key(id));
972
973 if all_purged {
974 self.order_lists.remove(&order_list_id);
975 log::info!("Purged {order_list_id}");
976 }
977 }
978 }
979 }
980
981 pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
983 log::debug!(
984 "Purging closed positions{}",
985 if buffer_secs > 0 {
986 format!(" with buffer_secs={buffer_secs}")
987 } else {
988 String::new()
989 }
990 );
991
992 let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
993
994 for position_id in self.index.positions_closed.clone() {
995 if let Some(position) = self.positions.get(&position_id)
996 && position.is_closed()
997 && let Some(ts_closed) = position.ts_closed
998 && ts_closed + buffer_ns <= ts_now
999 {
1000 self.purge_position(position_id);
1001 }
1002 }
1003 }
1004
1005 pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
1009 let order = self.orders.get(&client_order_id).cloned();
1011
1012 if let Some(ref ord) = order
1014 && ord.is_open()
1015 {
1016 log::warn!("Order {client_order_id} found open when purging, skipping purge");
1017 return;
1018 }
1019
1020 if let Some(ref ord) = order {
1022 self.orders.remove(&client_order_id);
1024
1025 if let Some(venue_orders) = self.index.venue_orders.get_mut(&ord.instrument_id().venue)
1027 {
1028 venue_orders.remove(&client_order_id);
1029 }
1030
1031 if let Some(venue_order_id) = ord.venue_order_id() {
1033 self.index.venue_order_ids.remove(&venue_order_id);
1034 }
1035
1036 if let Some(instrument_orders) =
1038 self.index.instrument_orders.get_mut(&ord.instrument_id())
1039 {
1040 instrument_orders.remove(&client_order_id);
1041 }
1042
1043 if let Some(position_id) = ord.position_id()
1045 && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
1046 {
1047 position_orders.remove(&client_order_id);
1048 }
1049
1050 if let Some(exec_algorithm_id) = ord.exec_algorithm_id()
1052 && let Some(exec_algorithm_orders) =
1053 self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
1054 {
1055 exec_algorithm_orders.remove(&client_order_id);
1056 }
1057
1058 if let Some(strategy_orders) = self.index.strategy_orders.get_mut(&ord.strategy_id()) {
1060 strategy_orders.remove(&client_order_id);
1061 if strategy_orders.is_empty() {
1062 self.index.strategy_orders.remove(&ord.strategy_id());
1063 }
1064 }
1065
1066 if let Some(account_id) = ord.account_id()
1068 && let Some(account_orders) = self.index.account_orders.get_mut(&account_id)
1069 {
1070 account_orders.remove(&client_order_id);
1071 if account_orders.is_empty() {
1072 self.index.account_orders.remove(&account_id);
1073 }
1074 }
1075
1076 if let Some(exec_spawn_id) = ord.exec_spawn_id()
1078 && let Some(spawn_orders) = self.index.exec_spawn_orders.get_mut(&exec_spawn_id)
1079 {
1080 spawn_orders.remove(&client_order_id);
1081 if spawn_orders.is_empty() {
1082 self.index.exec_spawn_orders.remove(&exec_spawn_id);
1083 }
1084 }
1085
1086 log::info!("Purged order {client_order_id}");
1087 } else {
1088 log::warn!("Order {client_order_id} not found when purging");
1089 }
1090
1091 self.index.order_position.remove(&client_order_id);
1093 let strategy_id = self.index.order_strategy.remove(&client_order_id);
1094 self.index.order_client.remove(&client_order_id);
1095 self.index.client_order_ids.remove(&client_order_id);
1096
1097 if let Some(strategy_id) = strategy_id
1099 && let Some(strategy_orders) = self.index.strategy_orders.get_mut(&strategy_id)
1100 {
1101 strategy_orders.remove(&client_order_id);
1102 if strategy_orders.is_empty() {
1103 self.index.strategy_orders.remove(&strategy_id);
1104 }
1105 }
1106
1107 self.index.exec_spawn_orders.remove(&client_order_id);
1109
1110 self.index.orders.remove(&client_order_id);
1111 self.index.orders_open.remove(&client_order_id);
1112 self.index.orders_closed.remove(&client_order_id);
1113 self.index.orders_emulated.remove(&client_order_id);
1114 self.index.orders_inflight.remove(&client_order_id);
1115 self.index.orders_pending_cancel.remove(&client_order_id);
1116 }
1117
1118 pub fn purge_position(&mut self, position_id: PositionId) {
1122 let position = self.positions.get(&position_id).cloned();
1124
1125 if let Some(ref pos) = position
1127 && pos.is_open()
1128 {
1129 log::warn!("Position {position_id} found open when purging, skipping purge");
1130 return;
1131 }
1132
1133 if let Some(ref pos) = position {
1135 self.positions.remove(&position_id);
1136
1137 if let Some(venue_positions) =
1139 self.index.venue_positions.get_mut(&pos.instrument_id.venue)
1140 {
1141 venue_positions.remove(&position_id);
1142 }
1143
1144 if let Some(instrument_positions) =
1146 self.index.instrument_positions.get_mut(&pos.instrument_id)
1147 {
1148 instrument_positions.remove(&position_id);
1149 }
1150
1151 if let Some(strategy_positions) =
1153 self.index.strategy_positions.get_mut(&pos.strategy_id)
1154 {
1155 strategy_positions.remove(&position_id);
1156 }
1157
1158 if let Some(account_positions) = self.index.account_positions.get_mut(&pos.account_id) {
1160 account_positions.remove(&position_id);
1161 if account_positions.is_empty() {
1162 self.index.account_positions.remove(&pos.account_id);
1163 }
1164 }
1165
1166 for client_order_id in pos.client_order_ids() {
1168 self.index.order_position.remove(&client_order_id);
1169 }
1170
1171 log::info!("Purged position {position_id}");
1172 } else {
1173 log::warn!("Position {position_id} not found when purging");
1174 }
1175
1176 self.index.position_strategy.remove(&position_id);
1178 self.index.position_orders.remove(&position_id);
1179 self.index.positions.remove(&position_id);
1180 self.index.positions_open.remove(&position_id);
1181 self.index.positions_closed.remove(&position_id);
1182
1183 self.position_snapshots.remove(&position_id);
1185 }
1186
1187 pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1192 log::debug!(
1193 "Purging account events{}",
1194 if lookback_secs > 0 {
1195 format!(" with lookback_secs={lookback_secs}")
1196 } else {
1197 String::new()
1198 }
1199 );
1200
1201 for account in self.accounts.values_mut() {
1202 let event_count = account.event_count();
1203 account.purge_account_events(ts_now, lookback_secs);
1204 let count_diff = event_count - account.event_count();
1205 if count_diff > 0 {
1206 log::info!(
1207 "Purged {} event(s) from account {}",
1208 count_diff,
1209 account.id()
1210 );
1211 }
1212 }
1213 }
1214
1215 pub fn clear_index(&mut self) {
1217 self.index.clear();
1218 log::debug!("Cleared index");
1219 }
1220
1221 pub fn reset(&mut self) {
1225 log::debug!("Resetting cache");
1226
1227 self.general.clear();
1228 self.currencies.clear();
1229 self.instruments.clear();
1230 self.synthetics.clear();
1231 self.books.clear();
1232 self.own_books.clear();
1233 self.quotes.clear();
1234 self.trades.clear();
1235 self.mark_xrates.clear();
1236 self.mark_prices.clear();
1237 self.index_prices.clear();
1238 self.funding_rates.clear();
1239 self.bars.clear();
1240 self.accounts.clear();
1241 self.orders.clear();
1242 self.order_lists.clear();
1243 self.positions.clear();
1244 self.position_snapshots.clear();
1245 self.greeks.clear();
1246 self.yield_curves.clear();
1247
1248 #[cfg(feature = "defi")]
1249 {
1250 self.defi.pools.clear();
1251 self.defi.pool_profilers.clear();
1252 }
1253
1254 self.clear_index();
1255
1256 log::info!("Reset cache");
1257 }
1258
1259 pub fn dispose(&mut self) {
1263 if let Some(database) = &mut self.database
1264 && let Err(e) = database.close()
1265 {
1266 log::error!("Failed to close database during dispose: {e}");
1267 }
1268 }
1269
1270 pub fn flush_db(&mut self) {
1274 if let Some(database) = &mut self.database
1275 && let Err(e) = database.flush()
1276 {
1277 log::error!("Failed to flush database: {e}");
1278 }
1279 }
1280
1281 pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1289 check_valid_string_ascii(key, stringify!(key))?;
1290 check_predicate_false(value.is_empty(), stringify!(value))?;
1291
1292 log::debug!("Adding general {key}");
1293 self.general.insert(key.to_string(), value.clone());
1294
1295 if let Some(database) = &mut self.database {
1296 database.add(key.to_string(), value)?;
1297 }
1298 Ok(())
1299 }
1300
1301 pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1307 log::debug!("Adding `OrderBook` {}", book.instrument_id);
1308
1309 if self.config.save_market_data
1310 && let Some(database) = &mut self.database
1311 {
1312 database.add_order_book(&book)?;
1313 }
1314
1315 self.books.insert(book.instrument_id, book);
1316 Ok(())
1317 }
1318
1319 pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1325 log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1326
1327 self.own_books.insert(own_book.instrument_id, own_book);
1328 Ok(())
1329 }
1330
1331 pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1337 log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1338
1339 if self.config.save_market_data {
1340 }
1342
1343 let mark_prices_deque = self
1344 .mark_prices
1345 .entry(mark_price.instrument_id)
1346 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1347 mark_prices_deque.push_front(mark_price);
1348 Ok(())
1349 }
1350
1351 pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1357 log::debug!(
1358 "Adding `IndexPriceUpdate` for {}",
1359 index_price.instrument_id
1360 );
1361
1362 if self.config.save_market_data {
1363 }
1365
1366 let index_prices_deque = self
1367 .index_prices
1368 .entry(index_price.instrument_id)
1369 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1370 index_prices_deque.push_front(index_price);
1371 Ok(())
1372 }
1373
1374 pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
1380 log::debug!(
1381 "Adding `FundingRateUpdate` for {}",
1382 funding_rate.instrument_id
1383 );
1384
1385 if self.config.save_market_data {
1386 }
1388
1389 let funding_rates_deque = self
1390 .funding_rates
1391 .entry(funding_rate.instrument_id)
1392 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1393 funding_rates_deque.push_front(funding_rate);
1394 Ok(())
1395 }
1396
1397 pub fn add_funding_rates(&mut self, funding_rates: &[FundingRateUpdate]) -> anyhow::Result<()> {
1403 check_slice_not_empty(funding_rates, stringify!(funding_rates))?;
1404
1405 let instrument_id = funding_rates[0].instrument_id;
1406 log::debug!(
1407 "Adding `FundingRateUpdate`[{}] {instrument_id}",
1408 funding_rates.len()
1409 );
1410
1411 if self.config.save_market_data
1412 && let Some(database) = &mut self.database
1413 {
1414 for funding_rate in funding_rates {
1415 database.add_funding_rate(funding_rate)?;
1416 }
1417 }
1418
1419 let funding_rate_deque = self
1420 .funding_rates
1421 .entry(instrument_id)
1422 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1423
1424 for funding_rate in funding_rates {
1425 funding_rate_deque.push_front(*funding_rate);
1426 }
1427 Ok(())
1428 }
1429
1430 pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1436 log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1437
1438 if self.config.save_market_data
1439 && let Some(database) = &mut self.database
1440 {
1441 database.add_quote("e)?;
1442 }
1443
1444 let quotes_deque = self
1445 .quotes
1446 .entry(quote.instrument_id)
1447 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1448 quotes_deque.push_front(quote);
1449 Ok(())
1450 }
1451
1452 pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1458 check_slice_not_empty(quotes, stringify!(quotes))?;
1459
1460 let instrument_id = quotes[0].instrument_id;
1461 log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1462
1463 if self.config.save_market_data
1464 && let Some(database) = &mut self.database
1465 {
1466 for quote in quotes {
1467 database.add_quote(quote)?;
1468 }
1469 }
1470
1471 let quotes_deque = self
1472 .quotes
1473 .entry(instrument_id)
1474 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1475
1476 for quote in quotes {
1477 quotes_deque.push_front(*quote);
1478 }
1479 Ok(())
1480 }
1481
1482 pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1488 log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1489
1490 if self.config.save_market_data
1491 && let Some(database) = &mut self.database
1492 {
1493 database.add_trade(&trade)?;
1494 }
1495
1496 let trades_deque = self
1497 .trades
1498 .entry(trade.instrument_id)
1499 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1500 trades_deque.push_front(trade);
1501 Ok(())
1502 }
1503
1504 pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1510 check_slice_not_empty(trades, stringify!(trades))?;
1511
1512 let instrument_id = trades[0].instrument_id;
1513 log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1514
1515 if self.config.save_market_data
1516 && let Some(database) = &mut self.database
1517 {
1518 for trade in trades {
1519 database.add_trade(trade)?;
1520 }
1521 }
1522
1523 let trades_deque = self
1524 .trades
1525 .entry(instrument_id)
1526 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1527
1528 for trade in trades {
1529 trades_deque.push_front(*trade);
1530 }
1531 Ok(())
1532 }
1533
1534 pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1540 log::debug!("Adding `Bar` {}", bar.bar_type);
1541
1542 if self.config.save_market_data
1543 && let Some(database) = &mut self.database
1544 {
1545 database.add_bar(&bar)?;
1546 }
1547
1548 let bars = self
1549 .bars
1550 .entry(bar.bar_type)
1551 .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1552 bars.push_front(bar);
1553 Ok(())
1554 }
1555
1556 pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1562 check_slice_not_empty(bars, stringify!(bars))?;
1563
1564 let bar_type = bars[0].bar_type;
1565 log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1566
1567 if self.config.save_market_data
1568 && let Some(database) = &mut self.database
1569 {
1570 for bar in bars {
1571 database.add_bar(bar)?;
1572 }
1573 }
1574
1575 let bars_deque = self
1576 .bars
1577 .entry(bar_type)
1578 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1579
1580 for bar in bars {
1581 bars_deque.push_front(*bar);
1582 }
1583 Ok(())
1584 }
1585
1586 pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1592 log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1593
1594 if self.config.save_market_data
1595 && let Some(_database) = &mut self.database
1596 {
1597 }
1599
1600 self.greeks.insert(greeks.instrument_id, greeks);
1601 Ok(())
1602 }
1603
1604 pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1606 self.greeks.get(instrument_id).cloned()
1607 }
1608
1609 pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1615 log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1616
1617 if self.config.save_market_data
1618 && let Some(_database) = &mut self.database
1619 {
1620 }
1622
1623 self.yield_curves
1624 .insert(yield_curve.curve_name.clone(), yield_curve);
1625 Ok(())
1626 }
1627
1628 pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1630 self.yield_curves.get(key).map(|curve| {
1631 let curve_clone = curve.clone();
1632 Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
1633 as Box<dyn Fn(f64) -> f64>
1634 })
1635 }
1636
1637 pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1643 if self.currencies.contains_key(¤cy.code) {
1644 return Ok(());
1645 }
1646 log::debug!("Adding `Currency` {}", currency.code);
1647
1648 if let Some(database) = &mut self.database {
1649 database.add_currency(¤cy)?;
1650 }
1651
1652 self.currencies.insert(currency.code, currency);
1653 Ok(())
1654 }
1655
1656 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1662 log::debug!("Adding `Instrument` {}", instrument.id());
1663
1664 if let Some(base_currency) = instrument.base_currency() {
1666 self.add_currency(base_currency)?;
1667 }
1668 self.add_currency(instrument.quote_currency())?;
1669 self.add_currency(instrument.settlement_currency())?;
1670
1671 if let Some(database) = &mut self.database {
1672 database.add_instrument(&instrument)?;
1673 }
1674
1675 self.instruments.insert(instrument.id(), instrument);
1676 Ok(())
1677 }
1678
1679 pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1685 log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1686
1687 if let Some(database) = &mut self.database {
1688 database.add_synthetic(&synthetic)?;
1689 }
1690
1691 self.synthetics.insert(synthetic.id, synthetic);
1692 Ok(())
1693 }
1694
1695 pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1701 log::debug!("Adding `Account` {}", account.id());
1702
1703 if let Some(database) = &mut self.database {
1704 database.add_account(&account)?;
1705 }
1706
1707 let account_id = account.id();
1708 self.accounts.insert(account_id, account);
1709 self.index
1710 .venue_account
1711 .insert(account_id.get_issuer(), account_id);
1712 Ok(())
1713 }
1714
1715 pub fn add_venue_order_id(
1723 &mut self,
1724 client_order_id: &ClientOrderId,
1725 venue_order_id: &VenueOrderId,
1726 overwrite: bool,
1727 ) -> anyhow::Result<()> {
1728 if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
1729 && !overwrite
1730 && existing_venue_order_id != venue_order_id
1731 {
1732 anyhow::bail!(
1733 "Existing {existing_venue_order_id} for {client_order_id}
1734 did not match the given {venue_order_id}.
1735 If you are writing a test then try a different `venue_order_id`,
1736 otherwise this is probably a bug."
1737 );
1738 }
1739
1740 self.index
1741 .client_order_ids
1742 .insert(*client_order_id, *venue_order_id);
1743 self.index
1744 .venue_order_ids
1745 .insert(*venue_order_id, *client_order_id);
1746
1747 Ok(())
1748 }
1749
1750 pub fn add_order(
1762 &mut self,
1763 order: OrderAny,
1764 position_id: Option<PositionId>,
1765 client_id: Option<ClientId>,
1766 replace_existing: bool,
1767 ) -> anyhow::Result<()> {
1768 let instrument_id = order.instrument_id();
1769 let venue = instrument_id.venue;
1770 let client_order_id = order.client_order_id();
1771 let strategy_id = order.strategy_id();
1772 let exec_algorithm_id = order.exec_algorithm_id();
1773 let exec_spawn_id = order.exec_spawn_id();
1774
1775 if !replace_existing {
1776 check_key_not_in_map(
1777 &client_order_id,
1778 &self.orders,
1779 stringify!(client_order_id),
1780 stringify!(orders),
1781 )?;
1782 }
1783
1784 log::debug!("Adding {order:?}");
1785
1786 self.index.orders.insert(client_order_id);
1787 self.index
1788 .order_strategy
1789 .insert(client_order_id, strategy_id);
1790 self.index.strategies.insert(strategy_id);
1791
1792 self.index
1794 .venue_orders
1795 .entry(venue)
1796 .or_default()
1797 .insert(client_order_id);
1798
1799 self.index
1801 .instrument_orders
1802 .entry(instrument_id)
1803 .or_default()
1804 .insert(client_order_id);
1805
1806 self.index
1808 .strategy_orders
1809 .entry(strategy_id)
1810 .or_default()
1811 .insert(client_order_id);
1812
1813 if let Some(account_id) = order.account_id() {
1815 self.index
1816 .account_orders
1817 .entry(account_id)
1818 .or_default()
1819 .insert(client_order_id);
1820 }
1821
1822 if let Some(exec_algorithm_id) = exec_algorithm_id {
1824 self.index.exec_algorithms.insert(exec_algorithm_id);
1825
1826 self.index
1827 .exec_algorithm_orders
1828 .entry(exec_algorithm_id)
1829 .or_default()
1830 .insert(client_order_id);
1831 }
1832
1833 if let Some(exec_spawn_id) = exec_spawn_id {
1835 self.index
1836 .exec_spawn_orders
1837 .entry(exec_spawn_id)
1838 .or_default()
1839 .insert(client_order_id);
1840 }
1841
1842 if let Some(emulation_trigger) = order.emulation_trigger()
1844 && emulation_trigger != TriggerType::NoTrigger
1845 {
1846 self.index.orders_emulated.insert(client_order_id);
1847 }
1848
1849 if let Some(position_id) = position_id {
1851 self.add_position_id(
1852 &position_id,
1853 &order.instrument_id().venue,
1854 &client_order_id,
1855 &strategy_id,
1856 )?;
1857 }
1858
1859 if let Some(client_id) = client_id {
1861 self.index.order_client.insert(client_order_id, client_id);
1862 log::debug!("Indexed {client_id:?}");
1863 }
1864
1865 if let Some(database) = &mut self.database {
1866 database.add_order(&order, client_id)?;
1867 }
1872
1873 self.orders.insert(client_order_id, order);
1874
1875 Ok(())
1876 }
1877
1878 pub fn add_order_list(&mut self, order_list: OrderList) -> anyhow::Result<()> {
1884 let order_list_id = order_list.id;
1885 check_key_not_in_map(
1886 &order_list_id,
1887 &self.order_lists,
1888 stringify!(order_list_id),
1889 stringify!(order_lists),
1890 )?;
1891
1892 log::debug!("Adding {order_list:?}");
1893 self.order_lists.insert(order_list_id, order_list);
1894 Ok(())
1895 }
1896
1897 pub fn add_position_id(
1903 &mut self,
1904 position_id: &PositionId,
1905 venue: &Venue,
1906 client_order_id: &ClientOrderId,
1907 strategy_id: &StrategyId,
1908 ) -> anyhow::Result<()> {
1909 self.index
1910 .order_position
1911 .insert(*client_order_id, *position_id);
1912
1913 if let Some(database) = &mut self.database {
1915 database.index_order_position(*client_order_id, *position_id)?;
1916 }
1917
1918 self.index
1920 .position_strategy
1921 .insert(*position_id, *strategy_id);
1922
1923 self.index
1925 .position_orders
1926 .entry(*position_id)
1927 .or_default()
1928 .insert(*client_order_id);
1929
1930 self.index
1932 .strategy_positions
1933 .entry(*strategy_id)
1934 .or_default()
1935 .insert(*position_id);
1936
1937 self.index
1939 .venue_positions
1940 .entry(*venue)
1941 .or_default()
1942 .insert(*position_id);
1943
1944 Ok(())
1945 }
1946
1947 pub fn add_position(&mut self, position: Position, _oms_type: OmsType) -> anyhow::Result<()> {
1953 self.positions.insert(position.id, position.clone());
1954 self.index.positions.insert(position.id);
1955 self.index.positions_open.insert(position.id);
1956 self.index.positions_closed.remove(&position.id); log::debug!("Adding {position}");
1959
1960 self.add_position_id(
1961 &position.id,
1962 &position.instrument_id.venue,
1963 &position.opening_order_id,
1964 &position.strategy_id,
1965 )?;
1966
1967 let venue = position.instrument_id.venue;
1968 let venue_positions = self.index.venue_positions.entry(venue).or_default();
1969 venue_positions.insert(position.id);
1970
1971 let instrument_id = position.instrument_id;
1973 let instrument_positions = self
1974 .index
1975 .instrument_positions
1976 .entry(instrument_id)
1977 .or_default();
1978 instrument_positions.insert(position.id);
1979
1980 self.index
1982 .account_positions
1983 .entry(position.account_id)
1984 .or_default()
1985 .insert(position.id);
1986
1987 if let Some(database) = &mut self.database {
1988 database.add_position(&position)?;
1989 }
1998
1999 Ok(())
2000 }
2001
2002 pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
2008 let account_id = account.id();
2009 self.accounts.insert(account_id, account.clone());
2010
2011 if let Some(database) = &mut self.database {
2012 database.update_account(&account)?;
2013 }
2014 Ok(())
2015 }
2016
2017 pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
2023 let client_order_id = order.client_order_id();
2024
2025 if let Some(venue_order_id) = order.venue_order_id() {
2027 if !self.index.venue_order_ids.contains_key(&venue_order_id) {
2030 self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
2032 }
2033 }
2034
2035 if order.is_inflight() {
2037 self.index.orders_inflight.insert(client_order_id);
2038 } else {
2039 self.index.orders_inflight.remove(&client_order_id);
2040 }
2041
2042 if order.is_open() {
2044 self.index.orders_closed.remove(&client_order_id);
2045 self.index.orders_open.insert(client_order_id);
2046 } else if order.is_closed() {
2047 self.index.orders_open.remove(&client_order_id);
2048 self.index.orders_pending_cancel.remove(&client_order_id);
2049 self.index.orders_closed.insert(client_order_id);
2050 }
2051
2052 if let Some(emulation_trigger) = order.emulation_trigger()
2054 && emulation_trigger != TriggerType::NoTrigger
2055 && !order.is_closed()
2056 {
2057 self.index.orders_emulated.insert(client_order_id);
2058 } else {
2059 self.index.orders_emulated.remove(&client_order_id);
2060 }
2061
2062 if let Some(account_id) = order.account_id() {
2064 self.index
2065 .account_orders
2066 .entry(account_id)
2067 .or_default()
2068 .insert(client_order_id);
2069 }
2070
2071 if self.own_order_book(&order.instrument_id()).is_some()
2073 && should_handle_own_book_order(order)
2074 {
2075 self.update_own_order_book(order);
2076 }
2077
2078 if let Some(database) = &mut self.database {
2079 database.update_order(order.last_event())?;
2080 }
2085
2086 self.orders.insert(client_order_id, order.clone());
2088
2089 Ok(())
2090 }
2091
2092 pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
2094 self.index
2095 .orders_pending_cancel
2096 .insert(order.client_order_id());
2097 }
2098
2099 pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
2105 if position.is_open() {
2108 self.index.positions_open.insert(position.id);
2109 self.index.positions_closed.remove(&position.id);
2110 } else {
2111 self.index.positions_closed.insert(position.id);
2112 self.index.positions_open.remove(&position.id);
2113 }
2114
2115 if let Some(database) = &mut self.database {
2116 database.update_position(position)?;
2117 }
2122
2123 self.positions.insert(position.id, position.clone());
2124
2125 Ok(())
2126 }
2127
2128 pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
2135 let position_id = position.id;
2136
2137 let mut copied_position = position.clone();
2138 let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
2139 copied_position.id = PositionId::new(new_id);
2140
2141 let position_serialized = serde_json::to_vec(&copied_position)?;
2143
2144 let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
2145 let new_snapshots = match snapshots {
2146 Some(existing_snapshots) => {
2147 let mut combined = existing_snapshots.to_vec();
2148 combined.extend(position_serialized);
2149 Bytes::from(combined)
2150 }
2151 None => Bytes::from(position_serialized),
2152 };
2153 self.position_snapshots.insert(position_id, new_snapshots);
2154
2155 log::debug!("Snapshot {copied_position}");
2156 Ok(())
2157 }
2158
2159 pub fn snapshot_position_state(
2165 &mut self,
2166 position: &Position,
2167 open_only: Option<bool>,
2170 ) -> anyhow::Result<()> {
2171 let open_only = open_only.unwrap_or(true);
2172
2173 if open_only && !position.is_open() {
2174 return Ok(());
2175 }
2176
2177 if let Some(database) = &mut self.database {
2178 database.snapshot_position_state(position).map_err(|e| {
2179 log::error!(
2180 "Failed to snapshot position state for {}: {e:?}",
2181 position.id
2182 );
2183 e
2184 })?;
2185 } else {
2186 log::warn!(
2187 "Cannot snapshot position state for {} (no database configured)",
2188 position.id
2189 );
2190 }
2191
2192 todo!()
2194 }
2195
2196 #[must_use]
2198 pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
2199 if self.index.position_strategy.contains_key(position_id) {
2201 Some(OmsType::Netting)
2204 } else {
2205 None
2206 }
2207 }
2208
2209 #[must_use]
2211 pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<u8>> {
2212 self.position_snapshots.get(position_id).map(|b| b.to_vec())
2213 }
2214
2215 #[must_use]
2217 pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> AHashSet<PositionId> {
2218 let mut result = AHashSet::new();
2220 for (position_id, _) in &self.position_snapshots {
2221 if let Some(position) = self.positions.get(position_id)
2223 && position.instrument_id == *instrument_id
2224 {
2225 result.insert(*position_id);
2226 }
2227 }
2228 result
2229 }
2230
2231 pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
2237 let database = if let Some(database) = &self.database {
2238 database
2239 } else {
2240 log::warn!(
2241 "Cannot snapshot order state for {} (no database configured)",
2242 order.client_order_id()
2243 );
2244 return Ok(());
2245 };
2246
2247 database.snapshot_order_state(order)
2248 }
2249
2250 fn build_order_query_filter_set(
2253 &self,
2254 venue: Option<&Venue>,
2255 instrument_id: Option<&InstrumentId>,
2256 strategy_id: Option<&StrategyId>,
2257 account_id: Option<&AccountId>,
2258 ) -> Option<AHashSet<ClientOrderId>> {
2259 let mut query: Option<AHashSet<ClientOrderId>> = None;
2260
2261 if let Some(venue) = venue {
2262 query = Some(
2263 self.index
2264 .venue_orders
2265 .get(venue)
2266 .cloned()
2267 .unwrap_or_default(),
2268 );
2269 }
2270
2271 if let Some(instrument_id) = instrument_id {
2272 let instrument_orders = self
2273 .index
2274 .instrument_orders
2275 .get(instrument_id)
2276 .cloned()
2277 .unwrap_or_default();
2278
2279 if let Some(existing_query) = &mut query {
2280 *existing_query = existing_query
2281 .intersection(&instrument_orders)
2282 .copied()
2283 .collect();
2284 } else {
2285 query = Some(instrument_orders);
2286 }
2287 }
2288
2289 if let Some(strategy_id) = strategy_id {
2290 let strategy_orders = self
2291 .index
2292 .strategy_orders
2293 .get(strategy_id)
2294 .cloned()
2295 .unwrap_or_default();
2296
2297 if let Some(existing_query) = &mut query {
2298 *existing_query = existing_query
2299 .intersection(&strategy_orders)
2300 .copied()
2301 .collect();
2302 } else {
2303 query = Some(strategy_orders);
2304 }
2305 }
2306
2307 if let Some(account_id) = account_id {
2308 let account_orders = self
2309 .index
2310 .account_orders
2311 .get(account_id)
2312 .cloned()
2313 .unwrap_or_default();
2314
2315 if let Some(existing_query) = &mut query {
2316 *existing_query = existing_query
2317 .intersection(&account_orders)
2318 .copied()
2319 .collect();
2320 } else {
2321 query = Some(account_orders);
2322 }
2323 }
2324
2325 query
2326 }
2327
2328 fn build_position_query_filter_set(
2329 &self,
2330 venue: Option<&Venue>,
2331 instrument_id: Option<&InstrumentId>,
2332 strategy_id: Option<&StrategyId>,
2333 account_id: Option<&AccountId>,
2334 ) -> Option<AHashSet<PositionId>> {
2335 let mut query: Option<AHashSet<PositionId>> = None;
2336
2337 if let Some(venue) = venue {
2338 query = Some(
2339 self.index
2340 .venue_positions
2341 .get(venue)
2342 .cloned()
2343 .unwrap_or_default(),
2344 );
2345 }
2346
2347 if let Some(instrument_id) = instrument_id {
2348 let instrument_positions = self
2349 .index
2350 .instrument_positions
2351 .get(instrument_id)
2352 .cloned()
2353 .unwrap_or_default();
2354
2355 if let Some(existing_query) = query {
2356 query = Some(
2357 existing_query
2358 .intersection(&instrument_positions)
2359 .copied()
2360 .collect(),
2361 );
2362 } else {
2363 query = Some(instrument_positions);
2364 }
2365 }
2366
2367 if let Some(strategy_id) = strategy_id {
2368 let strategy_positions = self
2369 .index
2370 .strategy_positions
2371 .get(strategy_id)
2372 .cloned()
2373 .unwrap_or_default();
2374
2375 if let Some(existing_query) = query {
2376 query = Some(
2377 existing_query
2378 .intersection(&strategy_positions)
2379 .copied()
2380 .collect(),
2381 );
2382 } else {
2383 query = Some(strategy_positions);
2384 }
2385 }
2386
2387 if let Some(account_id) = account_id {
2388 let account_positions = self
2389 .index
2390 .account_positions
2391 .get(account_id)
2392 .cloned()
2393 .unwrap_or_default();
2394
2395 if let Some(existing_query) = query {
2396 query = Some(
2397 existing_query
2398 .intersection(&account_positions)
2399 .copied()
2400 .collect(),
2401 );
2402 } else {
2403 query = Some(account_positions);
2404 }
2405 }
2406
2407 query
2408 }
2409
2410 fn get_orders_for_ids(
2416 &self,
2417 client_order_ids: &AHashSet<ClientOrderId>,
2418 side: Option<OrderSide>,
2419 ) -> Vec<&OrderAny> {
2420 let side = side.unwrap_or(OrderSide::NoOrderSide);
2421 let mut orders = Vec::new();
2422
2423 for client_order_id in client_order_ids {
2424 let order = self
2425 .orders
2426 .get(client_order_id)
2427 .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
2428
2429 if side == OrderSide::NoOrderSide || side == order.order_side() {
2430 orders.push(order);
2431 }
2432 }
2433
2434 orders
2435 }
2436
2437 fn get_positions_for_ids(
2443 &self,
2444 position_ids: &AHashSet<PositionId>,
2445 side: Option<PositionSide>,
2446 ) -> Vec<&Position> {
2447 let side = side.unwrap_or(PositionSide::NoPositionSide);
2448 let mut positions = Vec::new();
2449
2450 for position_id in position_ids {
2451 let position = self
2452 .positions
2453 .get(position_id)
2454 .unwrap_or_else(|| panic!("Position {position_id} not found"));
2455
2456 if side == PositionSide::NoPositionSide || side == position.side {
2457 positions.push(position);
2458 }
2459 }
2460
2461 positions
2462 }
2463
2464 #[must_use]
2466 pub fn client_order_ids(
2467 &self,
2468 venue: Option<&Venue>,
2469 instrument_id: Option<&InstrumentId>,
2470 strategy_id: Option<&StrategyId>,
2471 account_id: Option<&AccountId>,
2472 ) -> AHashSet<ClientOrderId> {
2473 let query =
2474 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2475 match query {
2476 Some(query) => self.index.orders.intersection(&query).copied().collect(),
2477 None => self.index.orders.clone(),
2478 }
2479 }
2480
2481 #[must_use]
2483 pub fn client_order_ids_open(
2484 &self,
2485 venue: Option<&Venue>,
2486 instrument_id: Option<&InstrumentId>,
2487 strategy_id: Option<&StrategyId>,
2488 account_id: Option<&AccountId>,
2489 ) -> AHashSet<ClientOrderId> {
2490 let query =
2491 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2492 match query {
2493 Some(query) => self
2494 .index
2495 .orders_open
2496 .intersection(&query)
2497 .copied()
2498 .collect(),
2499 None => self.index.orders_open.clone(),
2500 }
2501 }
2502
2503 #[must_use]
2505 pub fn client_order_ids_closed(
2506 &self,
2507 venue: Option<&Venue>,
2508 instrument_id: Option<&InstrumentId>,
2509 strategy_id: Option<&StrategyId>,
2510 account_id: Option<&AccountId>,
2511 ) -> AHashSet<ClientOrderId> {
2512 let query =
2513 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2514 match query {
2515 Some(query) => self
2516 .index
2517 .orders_closed
2518 .intersection(&query)
2519 .copied()
2520 .collect(),
2521 None => self.index.orders_closed.clone(),
2522 }
2523 }
2524
2525 #[must_use]
2527 pub fn client_order_ids_emulated(
2528 &self,
2529 venue: Option<&Venue>,
2530 instrument_id: Option<&InstrumentId>,
2531 strategy_id: Option<&StrategyId>,
2532 account_id: Option<&AccountId>,
2533 ) -> AHashSet<ClientOrderId> {
2534 let query =
2535 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2536 match query {
2537 Some(query) => self
2538 .index
2539 .orders_emulated
2540 .intersection(&query)
2541 .copied()
2542 .collect(),
2543 None => self.index.orders_emulated.clone(),
2544 }
2545 }
2546
2547 #[must_use]
2549 pub fn client_order_ids_inflight(
2550 &self,
2551 venue: Option<&Venue>,
2552 instrument_id: Option<&InstrumentId>,
2553 strategy_id: Option<&StrategyId>,
2554 account_id: Option<&AccountId>,
2555 ) -> AHashSet<ClientOrderId> {
2556 let query =
2557 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2558 match query {
2559 Some(query) => self
2560 .index
2561 .orders_inflight
2562 .intersection(&query)
2563 .copied()
2564 .collect(),
2565 None => self.index.orders_inflight.clone(),
2566 }
2567 }
2568
2569 #[must_use]
2571 pub fn position_ids(
2572 &self,
2573 venue: Option<&Venue>,
2574 instrument_id: Option<&InstrumentId>,
2575 strategy_id: Option<&StrategyId>,
2576 account_id: Option<&AccountId>,
2577 ) -> AHashSet<PositionId> {
2578 let query =
2579 self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2580 match query {
2581 Some(query) => self.index.positions.intersection(&query).copied().collect(),
2582 None => self.index.positions.clone(),
2583 }
2584 }
2585
2586 #[must_use]
2588 pub fn position_open_ids(
2589 &self,
2590 venue: Option<&Venue>,
2591 instrument_id: Option<&InstrumentId>,
2592 strategy_id: Option<&StrategyId>,
2593 account_id: Option<&AccountId>,
2594 ) -> AHashSet<PositionId> {
2595 let query =
2596 self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2597 match query {
2598 Some(query) => self
2599 .index
2600 .positions_open
2601 .intersection(&query)
2602 .copied()
2603 .collect(),
2604 None => self.index.positions_open.clone(),
2605 }
2606 }
2607
2608 #[must_use]
2610 pub fn position_closed_ids(
2611 &self,
2612 venue: Option<&Venue>,
2613 instrument_id: Option<&InstrumentId>,
2614 strategy_id: Option<&StrategyId>,
2615 account_id: Option<&AccountId>,
2616 ) -> AHashSet<PositionId> {
2617 let query =
2618 self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2619 match query {
2620 Some(query) => self
2621 .index
2622 .positions_closed
2623 .intersection(&query)
2624 .copied()
2625 .collect(),
2626 None => self.index.positions_closed.clone(),
2627 }
2628 }
2629
2630 #[must_use]
2632 pub fn actor_ids(&self) -> AHashSet<ComponentId> {
2633 self.index.actors.clone()
2634 }
2635
2636 #[must_use]
2638 pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
2639 self.index.strategies.clone()
2640 }
2641
2642 #[must_use]
2644 pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
2645 self.index.exec_algorithms.clone()
2646 }
2647
2648 #[must_use]
2652 pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2653 self.orders.get(client_order_id)
2654 }
2655
2656 #[must_use]
2658 pub fn orders_for_ids(
2659 &self,
2660 client_order_ids: &[ClientOrderId],
2661 context: &dyn Display,
2662 ) -> Vec<OrderAny> {
2663 let mut orders = Vec::with_capacity(client_order_ids.len());
2664 for id in client_order_ids {
2665 match self.orders.get(id) {
2666 Some(order) => orders.push(order.clone()),
2667 None => log::error!("Order {id} not found in cache for {context}"),
2668 }
2669 }
2670 orders
2671 }
2672
2673 #[must_use]
2675 pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2676 self.orders.get_mut(client_order_id)
2677 }
2678
2679 #[must_use]
2681 pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2682 self.index.venue_order_ids.get(venue_order_id)
2683 }
2684
2685 #[must_use]
2687 pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2688 self.index.client_order_ids.get(client_order_id)
2689 }
2690
2691 #[must_use]
2693 pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2694 self.index.order_client.get(client_order_id)
2695 }
2696
2697 #[must_use]
2699 pub fn orders(
2700 &self,
2701 venue: Option<&Venue>,
2702 instrument_id: Option<&InstrumentId>,
2703 strategy_id: Option<&StrategyId>,
2704 account_id: Option<&AccountId>,
2705 side: Option<OrderSide>,
2706 ) -> Vec<&OrderAny> {
2707 let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id, account_id);
2708 self.get_orders_for_ids(&client_order_ids, side)
2709 }
2710
2711 #[must_use]
2713 pub fn orders_open(
2714 &self,
2715 venue: Option<&Venue>,
2716 instrument_id: Option<&InstrumentId>,
2717 strategy_id: Option<&StrategyId>,
2718 account_id: Option<&AccountId>,
2719 side: Option<OrderSide>,
2720 ) -> Vec<&OrderAny> {
2721 let client_order_ids =
2722 self.client_order_ids_open(venue, instrument_id, strategy_id, account_id);
2723 self.get_orders_for_ids(&client_order_ids, side)
2724 }
2725
2726 #[must_use]
2728 pub fn orders_closed(
2729 &self,
2730 venue: Option<&Venue>,
2731 instrument_id: Option<&InstrumentId>,
2732 strategy_id: Option<&StrategyId>,
2733 account_id: Option<&AccountId>,
2734 side: Option<OrderSide>,
2735 ) -> Vec<&OrderAny> {
2736 let client_order_ids =
2737 self.client_order_ids_closed(venue, instrument_id, strategy_id, account_id);
2738 self.get_orders_for_ids(&client_order_ids, side)
2739 }
2740
2741 #[must_use]
2743 pub fn orders_emulated(
2744 &self,
2745 venue: Option<&Venue>,
2746 instrument_id: Option<&InstrumentId>,
2747 strategy_id: Option<&StrategyId>,
2748 account_id: Option<&AccountId>,
2749 side: Option<OrderSide>,
2750 ) -> Vec<&OrderAny> {
2751 let client_order_ids =
2752 self.client_order_ids_emulated(venue, instrument_id, strategy_id, account_id);
2753 self.get_orders_for_ids(&client_order_ids, side)
2754 }
2755
2756 #[must_use]
2758 pub fn orders_inflight(
2759 &self,
2760 venue: Option<&Venue>,
2761 instrument_id: Option<&InstrumentId>,
2762 strategy_id: Option<&StrategyId>,
2763 account_id: Option<&AccountId>,
2764 side: Option<OrderSide>,
2765 ) -> Vec<&OrderAny> {
2766 let client_order_ids =
2767 self.client_order_ids_inflight(venue, instrument_id, strategy_id, account_id);
2768 self.get_orders_for_ids(&client_order_ids, side)
2769 }
2770
2771 #[must_use]
2773 pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2774 let client_order_ids = self.index.position_orders.get(position_id);
2775 match client_order_ids {
2776 Some(client_order_ids) => {
2777 self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2778 }
2779 None => Vec::new(),
2780 }
2781 }
2782
2783 #[must_use]
2785 pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2786 self.index.orders.contains(client_order_id)
2787 }
2788
2789 #[must_use]
2791 pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2792 self.index.orders_open.contains(client_order_id)
2793 }
2794
2795 #[must_use]
2797 pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2798 self.index.orders_closed.contains(client_order_id)
2799 }
2800
2801 #[must_use]
2803 pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2804 self.index.orders_emulated.contains(client_order_id)
2805 }
2806
2807 #[must_use]
2809 pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2810 self.index.orders_inflight.contains(client_order_id)
2811 }
2812
2813 #[must_use]
2815 pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2816 self.index.orders_pending_cancel.contains(client_order_id)
2817 }
2818
2819 #[must_use]
2821 pub fn orders_open_count(
2822 &self,
2823 venue: Option<&Venue>,
2824 instrument_id: Option<&InstrumentId>,
2825 strategy_id: Option<&StrategyId>,
2826 account_id: Option<&AccountId>,
2827 side: Option<OrderSide>,
2828 ) -> usize {
2829 self.orders_open(venue, instrument_id, strategy_id, account_id, side)
2830 .len()
2831 }
2832
2833 #[must_use]
2835 pub fn orders_closed_count(
2836 &self,
2837 venue: Option<&Venue>,
2838 instrument_id: Option<&InstrumentId>,
2839 strategy_id: Option<&StrategyId>,
2840 account_id: Option<&AccountId>,
2841 side: Option<OrderSide>,
2842 ) -> usize {
2843 self.orders_closed(venue, instrument_id, strategy_id, account_id, side)
2844 .len()
2845 }
2846
2847 #[must_use]
2849 pub fn orders_emulated_count(
2850 &self,
2851 venue: Option<&Venue>,
2852 instrument_id: Option<&InstrumentId>,
2853 strategy_id: Option<&StrategyId>,
2854 account_id: Option<&AccountId>,
2855 side: Option<OrderSide>,
2856 ) -> usize {
2857 self.orders_emulated(venue, instrument_id, strategy_id, account_id, side)
2858 .len()
2859 }
2860
2861 #[must_use]
2863 pub fn orders_inflight_count(
2864 &self,
2865 venue: Option<&Venue>,
2866 instrument_id: Option<&InstrumentId>,
2867 strategy_id: Option<&StrategyId>,
2868 account_id: Option<&AccountId>,
2869 side: Option<OrderSide>,
2870 ) -> usize {
2871 self.orders_inflight(venue, instrument_id, strategy_id, account_id, side)
2872 .len()
2873 }
2874
2875 #[must_use]
2877 pub fn orders_total_count(
2878 &self,
2879 venue: Option<&Venue>,
2880 instrument_id: Option<&InstrumentId>,
2881 strategy_id: Option<&StrategyId>,
2882 account_id: Option<&AccountId>,
2883 side: Option<OrderSide>,
2884 ) -> usize {
2885 self.orders(venue, instrument_id, strategy_id, account_id, side)
2886 .len()
2887 }
2888
2889 #[must_use]
2891 pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
2892 self.order_lists.get(order_list_id)
2893 }
2894
2895 #[must_use]
2897 pub fn order_lists(
2898 &self,
2899 venue: Option<&Venue>,
2900 instrument_id: Option<&InstrumentId>,
2901 strategy_id: Option<&StrategyId>,
2902 account_id: Option<&AccountId>,
2903 ) -> Vec<&OrderList> {
2904 let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
2905
2906 if let Some(venue) = venue {
2907 order_lists.retain(|ol| &ol.instrument_id.venue == venue);
2908 }
2909
2910 if let Some(instrument_id) = instrument_id {
2911 order_lists.retain(|ol| &ol.instrument_id == instrument_id);
2912 }
2913
2914 if let Some(strategy_id) = strategy_id {
2915 order_lists.retain(|ol| &ol.strategy_id == strategy_id);
2916 }
2917
2918 if let Some(account_id) = account_id {
2919 order_lists.retain(|ol| {
2920 ol.client_order_ids.iter().any(|client_order_id| {
2921 self.orders
2922 .get(client_order_id)
2923 .is_some_and(|order| order.account_id().as_ref() == Some(account_id))
2924 })
2925 });
2926 }
2927
2928 order_lists
2929 }
2930
2931 #[must_use]
2933 pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
2934 self.order_lists.contains_key(order_list_id)
2935 }
2936
2937 #[must_use]
2942 pub fn orders_for_exec_algorithm(
2943 &self,
2944 exec_algorithm_id: &ExecAlgorithmId,
2945 venue: Option<&Venue>,
2946 instrument_id: Option<&InstrumentId>,
2947 strategy_id: Option<&StrategyId>,
2948 account_id: Option<&AccountId>,
2949 side: Option<OrderSide>,
2950 ) -> Vec<&OrderAny> {
2951 let query =
2952 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2953 let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
2954
2955 if let Some(query) = query
2956 && let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids
2957 {
2958 let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
2959 }
2960
2961 if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2962 self.get_orders_for_ids(exec_algorithm_order_ids, side)
2963 } else {
2964 Vec::new()
2965 }
2966 }
2967
2968 #[must_use]
2970 pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
2971 self.get_orders_for_ids(
2972 self.index
2973 .exec_spawn_orders
2974 .get(exec_spawn_id)
2975 .unwrap_or(&AHashSet::new()),
2976 None,
2977 )
2978 }
2979
2980 #[must_use]
2982 pub fn exec_spawn_total_quantity(
2983 &self,
2984 exec_spawn_id: &ClientOrderId,
2985 active_only: bool,
2986 ) -> Option<Quantity> {
2987 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2988
2989 let mut total_quantity: Option<Quantity> = None;
2990
2991 for spawn_order in exec_spawn_orders {
2992 if active_only && spawn_order.is_closed() {
2993 continue;
2994 }
2995
2996 match total_quantity.as_mut() {
2997 Some(total) => *total = *total + spawn_order.quantity(),
2998 None => total_quantity = Some(spawn_order.quantity()),
2999 }
3000 }
3001
3002 total_quantity
3003 }
3004
3005 #[must_use]
3007 pub fn exec_spawn_total_filled_qty(
3008 &self,
3009 exec_spawn_id: &ClientOrderId,
3010 active_only: bool,
3011 ) -> Option<Quantity> {
3012 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
3013
3014 let mut total_quantity: Option<Quantity> = None;
3015
3016 for spawn_order in exec_spawn_orders {
3017 if active_only && spawn_order.is_closed() {
3018 continue;
3019 }
3020
3021 match total_quantity.as_mut() {
3022 Some(total) => *total = *total + spawn_order.filled_qty(),
3023 None => total_quantity = Some(spawn_order.filled_qty()),
3024 }
3025 }
3026
3027 total_quantity
3028 }
3029
3030 #[must_use]
3032 pub fn exec_spawn_total_leaves_qty(
3033 &self,
3034 exec_spawn_id: &ClientOrderId,
3035 active_only: bool,
3036 ) -> Option<Quantity> {
3037 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
3038
3039 let mut total_quantity: Option<Quantity> = None;
3040
3041 for spawn_order in exec_spawn_orders {
3042 if active_only && spawn_order.is_closed() {
3043 continue;
3044 }
3045
3046 match total_quantity.as_mut() {
3047 Some(total) => *total = *total + spawn_order.leaves_qty(),
3048 None => total_quantity = Some(spawn_order.leaves_qty()),
3049 }
3050 }
3051
3052 total_quantity
3053 }
3054
3055 #[must_use]
3059 pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
3060 self.positions.get(position_id)
3061 }
3062
3063 #[must_use]
3065 pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
3066 self.index
3067 .order_position
3068 .get(client_order_id)
3069 .and_then(|position_id| self.positions.get(position_id))
3070 }
3071
3072 #[must_use]
3074 pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
3075 self.index.order_position.get(client_order_id)
3076 }
3077
3078 #[must_use]
3080 pub fn positions(
3081 &self,
3082 venue: Option<&Venue>,
3083 instrument_id: Option<&InstrumentId>,
3084 strategy_id: Option<&StrategyId>,
3085 account_id: Option<&AccountId>,
3086 side: Option<PositionSide>,
3087 ) -> Vec<&Position> {
3088 let position_ids = self.position_ids(venue, instrument_id, strategy_id, account_id);
3089 self.get_positions_for_ids(&position_ids, side)
3090 }
3091
3092 #[must_use]
3094 pub fn positions_open(
3095 &self,
3096 venue: Option<&Venue>,
3097 instrument_id: Option<&InstrumentId>,
3098 strategy_id: Option<&StrategyId>,
3099 account_id: Option<&AccountId>,
3100 side: Option<PositionSide>,
3101 ) -> Vec<&Position> {
3102 let position_ids = self.position_open_ids(venue, instrument_id, strategy_id, account_id);
3103 self.get_positions_for_ids(&position_ids, side)
3104 }
3105
3106 #[must_use]
3108 pub fn positions_closed(
3109 &self,
3110 venue: Option<&Venue>,
3111 instrument_id: Option<&InstrumentId>,
3112 strategy_id: Option<&StrategyId>,
3113 account_id: Option<&AccountId>,
3114 side: Option<PositionSide>,
3115 ) -> Vec<&Position> {
3116 let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id, account_id);
3117 self.get_positions_for_ids(&position_ids, side)
3118 }
3119
3120 #[must_use]
3122 pub fn position_exists(&self, position_id: &PositionId) -> bool {
3123 self.index.positions.contains(position_id)
3124 }
3125
3126 #[must_use]
3128 pub fn is_position_open(&self, position_id: &PositionId) -> bool {
3129 self.index.positions_open.contains(position_id)
3130 }
3131
3132 #[must_use]
3134 pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
3135 self.index.positions_closed.contains(position_id)
3136 }
3137
3138 #[must_use]
3140 pub fn positions_open_count(
3141 &self,
3142 venue: Option<&Venue>,
3143 instrument_id: Option<&InstrumentId>,
3144 strategy_id: Option<&StrategyId>,
3145 account_id: Option<&AccountId>,
3146 side: Option<PositionSide>,
3147 ) -> usize {
3148 self.positions_open(venue, instrument_id, strategy_id, account_id, side)
3149 .len()
3150 }
3151
3152 #[must_use]
3154 pub fn positions_closed_count(
3155 &self,
3156 venue: Option<&Venue>,
3157 instrument_id: Option<&InstrumentId>,
3158 strategy_id: Option<&StrategyId>,
3159 account_id: Option<&AccountId>,
3160 side: Option<PositionSide>,
3161 ) -> usize {
3162 self.positions_closed(venue, instrument_id, strategy_id, account_id, side)
3163 .len()
3164 }
3165
3166 #[must_use]
3168 pub fn positions_total_count(
3169 &self,
3170 venue: Option<&Venue>,
3171 instrument_id: Option<&InstrumentId>,
3172 strategy_id: Option<&StrategyId>,
3173 account_id: Option<&AccountId>,
3174 side: Option<PositionSide>,
3175 ) -> usize {
3176 self.positions(venue, instrument_id, strategy_id, account_id, side)
3177 .len()
3178 }
3179
3180 #[must_use]
3184 pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
3185 self.index.order_strategy.get(client_order_id)
3186 }
3187
3188 #[must_use]
3190 pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
3191 self.index.position_strategy.get(position_id)
3192 }
3193
3194 pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
3202 check_valid_string_ascii(key, stringify!(key))?;
3203
3204 Ok(self.general.get(key))
3205 }
3206
3207 #[must_use]
3211 pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
3212 match price_type {
3213 PriceType::Bid => self
3214 .quotes
3215 .get(instrument_id)
3216 .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
3217 PriceType::Ask => self
3218 .quotes
3219 .get(instrument_id)
3220 .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
3221 PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
3222 quotes.front().map(|quote| {
3223 Price::new(
3224 f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
3225 quote.bid_price.precision + 1,
3226 )
3227 })
3228 }),
3229 PriceType::Last => self
3230 .trades
3231 .get(instrument_id)
3232 .and_then(|trades| trades.front().map(|trade| trade.price)),
3233 PriceType::Mark => self
3234 .mark_prices
3235 .get(instrument_id)
3236 .and_then(|marks| marks.front().map(|mark| mark.value)),
3237 }
3238 }
3239
3240 #[must_use]
3242 pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
3243 self.quotes
3244 .get(instrument_id)
3245 .map(|quotes| quotes.iter().copied().collect())
3246 }
3247
3248 #[must_use]
3250 pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
3251 self.trades
3252 .get(instrument_id)
3253 .map(|trades| trades.iter().copied().collect())
3254 }
3255
3256 #[must_use]
3258 pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
3259 self.mark_prices
3260 .get(instrument_id)
3261 .map(|mark_prices| mark_prices.iter().copied().collect())
3262 }
3263
3264 #[must_use]
3266 pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
3267 self.index_prices
3268 .get(instrument_id)
3269 .map(|index_prices| index_prices.iter().copied().collect())
3270 }
3271
3272 #[must_use]
3274 pub fn funding_rates(&self, instrument_id: &InstrumentId) -> Option<Vec<FundingRateUpdate>> {
3275 self.funding_rates
3276 .get(instrument_id)
3277 .map(|funding_rates| funding_rates.iter().copied().collect())
3278 }
3279
3280 #[must_use]
3282 pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
3283 self.bars
3284 .get(bar_type)
3285 .map(|bars| bars.iter().copied().collect())
3286 }
3287
3288 #[must_use]
3290 pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
3291 self.books.get(instrument_id)
3292 }
3293
3294 #[must_use]
3296 pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
3297 self.books.get_mut(instrument_id)
3298 }
3299
3300 #[must_use]
3302 pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
3303 self.own_books.get(instrument_id)
3304 }
3305
3306 #[must_use]
3308 pub fn own_order_book_mut(
3309 &mut self,
3310 instrument_id: &InstrumentId,
3311 ) -> Option<&mut OwnOrderBook> {
3312 self.own_books.get_mut(instrument_id)
3313 }
3314
3315 #[must_use]
3317 pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
3318 self.quotes
3319 .get(instrument_id)
3320 .and_then(|quotes| quotes.front())
3321 }
3322
3323 #[must_use]
3325 pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
3326 self.trades
3327 .get(instrument_id)
3328 .and_then(|trades| trades.front())
3329 }
3330
3331 #[must_use]
3333 pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
3334 self.mark_prices
3335 .get(instrument_id)
3336 .and_then(|mark_prices| mark_prices.front())
3337 }
3338
3339 #[must_use]
3341 pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
3342 self.index_prices
3343 .get(instrument_id)
3344 .and_then(|index_prices| index_prices.front())
3345 }
3346
3347 #[must_use]
3349 pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
3350 self.funding_rates
3351 .get(instrument_id)
3352 .and_then(|funding_rates| funding_rates.front())
3353 }
3354
3355 #[must_use]
3357 pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
3358 self.bars.get(bar_type).and_then(|bars| bars.front())
3359 }
3360
3361 #[must_use]
3363 pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
3364 self.books
3365 .get(instrument_id)
3366 .map_or(0, |book| book.update_count) as usize
3367 }
3368
3369 #[must_use]
3371 pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
3372 self.quotes
3373 .get(instrument_id)
3374 .map_or(0, std::collections::VecDeque::len)
3375 }
3376
3377 #[must_use]
3379 pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
3380 self.trades
3381 .get(instrument_id)
3382 .map_or(0, std::collections::VecDeque::len)
3383 }
3384
3385 #[must_use]
3387 pub fn bar_count(&self, bar_type: &BarType) -> usize {
3388 self.bars
3389 .get(bar_type)
3390 .map_or(0, std::collections::VecDeque::len)
3391 }
3392
3393 #[must_use]
3395 pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
3396 self.books.contains_key(instrument_id)
3397 }
3398
3399 #[must_use]
3401 pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
3402 self.quote_count(instrument_id) > 0
3403 }
3404
3405 #[must_use]
3407 pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
3408 self.trade_count(instrument_id) > 0
3409 }
3410
3411 #[must_use]
3413 pub fn has_bars(&self, bar_type: &BarType) -> bool {
3414 self.bar_count(bar_type) > 0
3415 }
3416
3417 #[must_use]
3418 pub fn get_xrate(
3419 &self,
3420 venue: Venue,
3421 from_currency: Currency,
3422 to_currency: Currency,
3423 price_type: PriceType,
3424 ) -> Option<f64> {
3425 if from_currency == to_currency {
3426 return Some(1.0);
3429 }
3430
3431 let (bid_quote, ask_quote) = self.build_quote_table(&venue);
3432
3433 match get_exchange_rate(
3434 from_currency.code,
3435 to_currency.code,
3436 price_type,
3437 bid_quote,
3438 ask_quote,
3439 ) {
3440 Ok(rate) => rate,
3441 Err(e) => {
3442 log::error!("Failed to calculate xrate: {e}");
3443 None
3444 }
3445 }
3446 }
3447
3448 fn build_quote_table(&self, venue: &Venue) -> (AHashMap<String, f64>, AHashMap<String, f64>) {
3449 let mut bid_quotes = AHashMap::new();
3450 let mut ask_quotes = AHashMap::new();
3451
3452 for instrument_id in self.instruments.keys() {
3453 if instrument_id.venue != *venue {
3454 continue;
3455 }
3456
3457 let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
3458 if let Some(tick) = ticks.front() {
3459 (tick.bid_price, tick.ask_price)
3460 } else {
3461 continue; }
3463 } else {
3464 let bid_bar = self
3465 .bars
3466 .iter()
3467 .find(|(k, _)| {
3468 k.instrument_id() == *instrument_id
3469 && matches!(k.spec().price_type, PriceType::Bid)
3470 })
3471 .map(|(_, v)| v);
3472
3473 let ask_bar = self
3474 .bars
3475 .iter()
3476 .find(|(k, _)| {
3477 k.instrument_id() == *instrument_id
3478 && matches!(k.spec().price_type, PriceType::Ask)
3479 })
3480 .map(|(_, v)| v);
3481
3482 match (bid_bar, ask_bar) {
3483 (Some(bid), Some(ask)) => {
3484 match (bid.front(), ask.front()) {
3485 (Some(bid_bar), Some(ask_bar)) => (bid_bar.close, ask_bar.close),
3486 _ => {
3487 continue;
3489 }
3490 }
3491 }
3492 _ => continue,
3493 }
3494 };
3495
3496 bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
3497 ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
3498 }
3499
3500 (bid_quotes, ask_quotes)
3501 }
3502
3503 #[must_use]
3505 pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
3506 self.mark_xrates.get(&(from_currency, to_currency)).copied()
3507 }
3508
3509 pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
3515 assert!(xrate > 0.0, "xrate was zero");
3516 self.mark_xrates.insert((from_currency, to_currency), xrate);
3517 self.mark_xrates
3518 .insert((to_currency, from_currency), 1.0 / xrate);
3519 }
3520
3521 pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
3523 let _ = self.mark_xrates.remove(&(from_currency, to_currency));
3524 }
3525
3526 pub fn clear_mark_xrates(&mut self) {
3528 self.mark_xrates.clear();
3529 }
3530
3531 #[must_use]
3535 pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
3536 self.instruments.get(instrument_id)
3537 }
3538
3539 #[must_use]
3541 pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
3542 match venue {
3543 Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
3544 None => self.instruments.keys().collect(),
3545 }
3546 }
3547
3548 #[must_use]
3550 pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
3551 self.instruments
3552 .values()
3553 .filter(|i| &i.id().venue == venue)
3554 .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
3555 .collect()
3556 }
3557
3558 #[must_use]
3560 pub fn bar_types(
3561 &self,
3562 instrument_id: Option<&InstrumentId>,
3563 price_type: Option<&PriceType>,
3564 aggregation_source: AggregationSource,
3565 ) -> Vec<&BarType> {
3566 let mut bar_types = self
3567 .bars
3568 .keys()
3569 .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
3570 .collect::<Vec<&BarType>>();
3571
3572 if let Some(instrument_id) = instrument_id {
3573 bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
3574 }
3575
3576 if let Some(price_type) = price_type {
3577 bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
3578 }
3579
3580 bar_types
3581 }
3582
3583 #[must_use]
3587 pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
3588 self.synthetics.get(instrument_id)
3589 }
3590
3591 #[must_use]
3593 pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
3594 self.synthetics.keys().collect()
3595 }
3596
3597 #[must_use]
3599 pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
3600 self.synthetics.values().collect()
3601 }
3602
3603 #[must_use]
3607 pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
3608 self.accounts.get(account_id)
3609 }
3610
3611 #[must_use]
3613 pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
3614 self.index
3615 .venue_account
3616 .get(venue)
3617 .and_then(|account_id| self.accounts.get(account_id))
3618 }
3619
3620 #[must_use]
3622 pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
3623 self.index.venue_account.get(venue)
3624 }
3625
3626 #[must_use]
3628 pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
3629 self.accounts
3630 .values()
3631 .filter(|account| &account.id() == account_id)
3632 .collect()
3633 }
3634
3635 pub fn update_own_order_book(&mut self, order: &OrderAny) {
3643 if !order.has_price() {
3644 return;
3645 }
3646
3647 let instrument_id = order.instrument_id();
3648
3649 let own_book = self
3650 .own_books
3651 .entry(instrument_id)
3652 .or_insert_with(|| OwnOrderBook::new(instrument_id));
3653
3654 let own_book_order = order.to_own_book_order();
3655
3656 if order.is_closed() {
3657 if let Err(e) = own_book.delete(own_book_order) {
3658 log::debug!(
3659 "Failed to delete order {} from own book: {e}",
3660 order.client_order_id(),
3661 );
3662 } else {
3663 log::debug!("Deleted order {} from own book", order.client_order_id());
3664 }
3665 } else {
3666 if let Err(e) = own_book.update(own_book_order) {
3668 log::debug!(
3669 "Failed to update order {} in own book: {e}; inserting instead",
3670 order.client_order_id(),
3671 );
3672 own_book.add(own_book_order);
3673 }
3674 log::debug!("Updated order {} in own book", order.client_order_id());
3675 }
3676 }
3677
3678 pub fn force_remove_from_own_order_book(&mut self, client_order_id: &ClientOrderId) {
3684 let order = match self.orders.get(client_order_id) {
3685 Some(order) => order,
3686 None => return,
3687 };
3688
3689 self.index.orders_open.remove(client_order_id);
3690 self.index.orders_pending_cancel.remove(client_order_id);
3691 self.index.orders_inflight.remove(client_order_id);
3692 self.index.orders_emulated.remove(client_order_id);
3693
3694 if let Some(own_book) = self.own_books.get_mut(&order.instrument_id())
3695 && order.has_price()
3696 {
3697 let own_book_order = order.to_own_book_order();
3698 if let Err(e) = own_book.delete(own_book_order) {
3699 log::debug!("Could not force delete {client_order_id} from own book: {e}");
3700 } else {
3701 log::debug!("Force deleted {client_order_id} from own book");
3702 }
3703 }
3704
3705 self.index.orders_closed.insert(*client_order_id);
3706 }
3707
3708 pub fn audit_own_order_books(&mut self) {
3715 log::debug!("Starting own books audit");
3716 let start = std::time::Instant::now();
3717
3718 let valid_order_ids: AHashSet<ClientOrderId> = self
3721 .index
3722 .orders_open
3723 .union(&self.index.orders_inflight)
3724 .copied()
3725 .collect();
3726
3727 for own_book in self.own_books.values_mut() {
3728 own_book.audit_open_orders(&valid_order_ids);
3729 }
3730
3731 log::debug!("Completed own books audit in {:?}", start.elapsed());
3732 }
3733}