1pub mod config;
21pub mod database;
22pub mod quote;
23
24mod index;
25
26#[cfg(test)]
27mod tests;
28
29use std::{
30 collections::VecDeque,
31 fmt::Debug,
32 time::{SystemTime, UNIX_EPOCH},
33};
34
35use ahash::{AHashMap, AHashSet};
36use bytes::Bytes;
37pub use config::CacheConfig; use database::{CacheDatabaseAdapter, CacheMap};
39use index::CacheIndex;
40use nautilus_core::{
41 UUID4, UnixNanos,
42 correctness::{
43 check_key_not_in_map, check_predicate_false, check_slice_not_empty,
44 check_valid_string_ascii,
45 },
46 datetime::secs_to_nanos_unchecked,
47};
48use nautilus_model::{
49 accounts::{Account, AccountAny},
50 data::{
51 Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate, QuoteTick,
52 TradeTick, YieldCurveData,
53 },
54 enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
55 identifiers::{
56 AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
57 OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
58 },
59 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
60 orderbook::{
61 OrderBook,
62 own::{OwnOrderBook, should_handle_own_book_order},
63 },
64 orders::{Order, OrderAny, OrderList},
65 position::Position,
66 types::{Currency, Money, Price, Quantity},
67};
68use ustr::Ustr;
69
70use crate::xrate::get_exchange_rate;
71
72#[cfg_attr(
74 feature = "python",
75 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
76)]
77pub struct Cache {
78 config: CacheConfig,
79 index: CacheIndex,
80 database: Option<Box<dyn CacheDatabaseAdapter>>,
81 general: AHashMap<String, Bytes>,
82 currencies: AHashMap<Ustr, Currency>,
83 instruments: AHashMap<InstrumentId, InstrumentAny>,
84 synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
85 books: AHashMap<InstrumentId, OrderBook>,
86 own_books: AHashMap<InstrumentId, OwnOrderBook>,
87 quotes: AHashMap<InstrumentId, VecDeque<QuoteTick>>,
88 trades: AHashMap<InstrumentId, VecDeque<TradeTick>>,
89 mark_xrates: AHashMap<(Currency, Currency), f64>,
90 mark_prices: AHashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
91 index_prices: AHashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
92 funding_rates: AHashMap<InstrumentId, FundingRateUpdate>,
93 bars: AHashMap<BarType, VecDeque<Bar>>,
94 greeks: AHashMap<InstrumentId, GreeksData>,
95 yield_curves: AHashMap<String, YieldCurveData>,
96 accounts: AHashMap<AccountId, AccountAny>,
97 orders: AHashMap<ClientOrderId, OrderAny>,
98 order_lists: AHashMap<OrderListId, OrderList>,
99 positions: AHashMap<PositionId, Position>,
100 position_snapshots: AHashMap<PositionId, Bytes>,
101 #[cfg(feature = "defi")]
102 pub(crate) defi: crate::defi::cache::DefiCache,
103}
104
105impl Debug for Cache {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 f.debug_struct(stringify!(Cache))
108 .field("config", &self.config)
109 .field("index", &self.index)
110 .field("general", &self.general)
111 .field("currencies", &self.currencies)
112 .field("instruments", &self.instruments)
113 .field("synthetics", &self.synthetics)
114 .field("books", &self.books)
115 .field("own_books", &self.own_books)
116 .field("quotes", &self.quotes)
117 .field("trades", &self.trades)
118 .field("mark_xrates", &self.mark_xrates)
119 .field("mark_prices", &self.mark_prices)
120 .field("index_prices", &self.index_prices)
121 .field("funding_rates", &self.funding_rates)
122 .field("bars", &self.bars)
123 .field("greeks", &self.greeks)
124 .field("yield_curves", &self.yield_curves)
125 .field("accounts", &self.accounts)
126 .field("orders", &self.orders)
127 .field("order_lists", &self.order_lists)
128 .field("positions", &self.positions)
129 .field("position_snapshots", &self.position_snapshots)
130 .finish()
131 }
132}
133
134impl Default for Cache {
135 fn default() -> Self {
137 Self::new(Some(CacheConfig::default()), None)
138 }
139}
140
141impl Cache {
142 #[must_use]
144 pub fn new(
148 config: Option<CacheConfig>,
149 database: Option<Box<dyn CacheDatabaseAdapter>>,
150 ) -> Self {
151 Self {
152 config: config.unwrap_or_default(),
153 index: CacheIndex::default(),
154 database,
155 general: AHashMap::new(),
156 currencies: AHashMap::new(),
157 instruments: AHashMap::new(),
158 synthetics: AHashMap::new(),
159 books: AHashMap::new(),
160 own_books: AHashMap::new(),
161 quotes: AHashMap::new(),
162 trades: AHashMap::new(),
163 mark_xrates: AHashMap::new(),
164 mark_prices: AHashMap::new(),
165 index_prices: AHashMap::new(),
166 funding_rates: AHashMap::new(),
167 bars: AHashMap::new(),
168 greeks: AHashMap::new(),
169 yield_curves: AHashMap::new(),
170 accounts: AHashMap::new(),
171 orders: AHashMap::new(),
172 order_lists: AHashMap::new(),
173 positions: AHashMap::new(),
174 position_snapshots: AHashMap::new(),
175 #[cfg(feature = "defi")]
176 defi: crate::defi::cache::DefiCache::default(),
177 }
178 }
179
180 #[must_use]
182 pub fn memory_address(&self) -> String {
183 format!("{:?}", std::ptr::from_ref(self))
184 }
185
186 pub fn cache_general(&mut self) -> anyhow::Result<()> {
194 self.general = match &mut self.database {
195 Some(db) => db.load()?,
196 None => AHashMap::new(),
197 };
198
199 log::info!(
200 "Cached {} general object(s) from database",
201 self.general.len()
202 );
203 Ok(())
204 }
205
206 pub async fn cache_all(&mut self) -> anyhow::Result<()> {
212 let cache_map = match &self.database {
213 Some(db) => db.load_all().await?,
214 None => CacheMap::default(),
215 };
216
217 self.currencies = cache_map.currencies;
218 self.instruments = cache_map.instruments;
219 self.synthetics = cache_map.synthetics;
220 self.accounts = cache_map.accounts;
221 self.orders = cache_map.orders;
222 self.positions = cache_map.positions;
223 Ok(())
224 }
225
226 pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
232 self.currencies = match &mut self.database {
233 Some(db) => db.load_currencies().await?,
234 None => AHashMap::new(),
235 };
236
237 log::info!("Cached {} currencies from database", self.general.len());
238 Ok(())
239 }
240
241 pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
247 self.instruments = match &mut self.database {
248 Some(db) => db.load_instruments().await?,
249 None => AHashMap::new(),
250 };
251
252 log::info!("Cached {} instruments from database", self.general.len());
253 Ok(())
254 }
255
256 pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
262 self.synthetics = match &mut self.database {
263 Some(db) => db.load_synthetics().await?,
264 None => AHashMap::new(),
265 };
266
267 log::info!(
268 "Cached {} synthetic instruments from database",
269 self.general.len()
270 );
271 Ok(())
272 }
273
274 pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
280 self.accounts = match &mut self.database {
281 Some(db) => db.load_accounts().await?,
282 None => AHashMap::new(),
283 };
284
285 log::info!(
286 "Cached {} synthetic instruments from database",
287 self.general.len()
288 );
289 Ok(())
290 }
291
292 pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
298 self.orders = match &mut self.database {
299 Some(db) => db.load_orders().await?,
300 None => AHashMap::new(),
301 };
302
303 log::info!("Cached {} orders from database", self.general.len());
304 Ok(())
305 }
306
307 pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
313 self.positions = match &mut self.database {
314 Some(db) => db.load_positions().await?,
315 None => AHashMap::new(),
316 };
317
318 log::info!("Cached {} positions from database", self.general.len());
319 Ok(())
320 }
321
322 pub fn build_index(&mut self) {
324 log::debug!("Building index");
325
326 for account_id in self.accounts.keys() {
328 self.index
329 .venue_account
330 .insert(account_id.get_issuer(), *account_id);
331 }
332
333 for (client_order_id, order) in &self.orders {
335 let instrument_id = order.instrument_id();
336 let venue = instrument_id.venue;
337 let strategy_id = order.strategy_id();
338
339 self.index
341 .venue_orders
342 .entry(venue)
343 .or_default()
344 .insert(*client_order_id);
345
346 if let Some(venue_order_id) = order.venue_order_id() {
348 self.index
349 .venue_order_ids
350 .insert(venue_order_id, *client_order_id);
351 }
352
353 if let Some(position_id) = order.position_id() {
355 self.index
356 .order_position
357 .insert(*client_order_id, position_id);
358 }
359
360 self.index
362 .order_strategy
363 .insert(*client_order_id, order.strategy_id());
364
365 self.index
367 .instrument_orders
368 .entry(instrument_id)
369 .or_default()
370 .insert(*client_order_id);
371
372 self.index
374 .strategy_orders
375 .entry(strategy_id)
376 .or_default()
377 .insert(*client_order_id);
378
379 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
381 self.index
382 .exec_algorithm_orders
383 .entry(exec_algorithm_id)
384 .or_default()
385 .insert(*client_order_id);
386 }
387
388 if let Some(exec_spawn_id) = order.exec_spawn_id() {
390 self.index
391 .exec_spawn_orders
392 .entry(exec_spawn_id)
393 .or_default()
394 .insert(*client_order_id);
395 }
396
397 self.index.orders.insert(*client_order_id);
399
400 if order.is_open() {
402 self.index.orders_open.insert(*client_order_id);
403 }
404
405 if order.is_closed() {
407 self.index.orders_closed.insert(*client_order_id);
408 }
409
410 if let Some(emulation_trigger) = order.emulation_trigger()
412 && emulation_trigger != TriggerType::NoTrigger
413 && !order.is_closed()
414 {
415 self.index.orders_emulated.insert(*client_order_id);
416 }
417
418 if order.is_inflight() {
420 self.index.orders_inflight.insert(*client_order_id);
421 }
422
423 self.index.strategies.insert(strategy_id);
425
426 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
428 self.index.exec_algorithms.insert(exec_algorithm_id);
429 }
430 }
431
432 for (position_id, position) in &self.positions {
434 let instrument_id = position.instrument_id;
435 let venue = instrument_id.venue;
436 let strategy_id = position.strategy_id;
437
438 self.index
440 .venue_positions
441 .entry(venue)
442 .or_default()
443 .insert(*position_id);
444
445 self.index
447 .position_strategy
448 .insert(*position_id, position.strategy_id);
449
450 self.index
452 .position_orders
453 .entry(*position_id)
454 .or_default()
455 .extend(position.client_order_ids().into_iter());
456
457 self.index
459 .instrument_positions
460 .entry(instrument_id)
461 .or_default()
462 .insert(*position_id);
463
464 self.index
466 .strategy_positions
467 .entry(strategy_id)
468 .or_default()
469 .insert(*position_id);
470
471 self.index.positions.insert(*position_id);
473
474 if position.is_open() {
476 self.index.positions_open.insert(*position_id);
477 }
478
479 if position.is_closed() {
481 self.index.positions_closed.insert(*position_id);
482 }
483
484 self.index.strategies.insert(strategy_id);
486 }
487 }
488
489 #[must_use]
491 pub const fn has_backing(&self) -> bool {
492 self.config.database.is_some()
493 }
494
495 #[must_use]
497 pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
498 let quote = if let Some(quote) = self.quote(&position.instrument_id) {
499 quote
500 } else {
501 log::warn!(
502 "Cannot calculate unrealized PnL for {}, no quotes for {}",
503 position.id,
504 position.instrument_id
505 );
506 return None;
507 };
508
509 let last = match position.side {
511 PositionSide::Flat | PositionSide::NoPositionSide => {
512 return Some(Money::new(0.0, position.settlement_currency));
513 }
514 PositionSide::Long => quote.bid_price,
515 PositionSide::Short => quote.ask_price,
516 };
517
518 Some(position.unrealized_pnl(last))
519 }
520
521 #[must_use]
530 pub fn check_integrity(&mut self) -> bool {
531 let mut error_count = 0;
532 let failure = "Integrity failure";
533
534 let timestamp_us = SystemTime::now()
536 .duration_since(UNIX_EPOCH)
537 .expect("Time went backwards")
538 .as_micros();
539
540 log::info!("Checking data integrity");
541
542 for account_id in self.accounts.keys() {
544 if !self
545 .index
546 .venue_account
547 .contains_key(&account_id.get_issuer())
548 {
549 log::error!(
550 "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
551 );
552 error_count += 1;
553 }
554 }
555
556 for (client_order_id, order) in &self.orders {
557 if !self.index.order_strategy.contains_key(client_order_id) {
558 log::error!(
559 "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
560 );
561 error_count += 1;
562 }
563 if !self.index.orders.contains(client_order_id) {
564 log::error!(
565 "{failure} in orders: {client_order_id} not found in `self.index.orders`",
566 );
567 error_count += 1;
568 }
569 if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
570 log::error!(
571 "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
572 );
573 error_count += 1;
574 }
575 if order.is_open() && !self.index.orders_open.contains(client_order_id) {
576 log::error!(
577 "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
578 );
579 error_count += 1;
580 }
581 if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
582 log::error!(
583 "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
584 );
585 error_count += 1;
586 }
587 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
588 if !self
589 .index
590 .exec_algorithm_orders
591 .contains_key(&exec_algorithm_id)
592 {
593 log::error!(
594 "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
595 );
596 error_count += 1;
597 }
598 if order.exec_spawn_id().is_none()
599 && !self.index.exec_spawn_orders.contains_key(client_order_id)
600 {
601 log::error!(
602 "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
603 );
604 error_count += 1;
605 }
606 }
607 }
608
609 for (position_id, position) in &self.positions {
610 if !self.index.position_strategy.contains_key(position_id) {
611 log::error!(
612 "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
613 );
614 error_count += 1;
615 }
616 if !self.index.position_orders.contains_key(position_id) {
617 log::error!(
618 "{failure} in positions: {position_id} not found in `self.index.position_orders`",
619 );
620 error_count += 1;
621 }
622 if !self.index.positions.contains(position_id) {
623 log::error!(
624 "{failure} in positions: {position_id} not found in `self.index.positions`",
625 );
626 error_count += 1;
627 }
628 if position.is_open() && !self.index.positions_open.contains(position_id) {
629 log::error!(
630 "{failure} in positions: {position_id} not found in `self.index.positions_open`",
631 );
632 error_count += 1;
633 }
634 if position.is_closed() && !self.index.positions_closed.contains(position_id) {
635 log::error!(
636 "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
637 );
638 error_count += 1;
639 }
640 }
641
642 for account_id in self.index.venue_account.values() {
644 if !self.accounts.contains_key(account_id) {
645 log::error!(
646 "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
647 );
648 error_count += 1;
649 }
650 }
651
652 for client_order_id in self.index.venue_order_ids.values() {
653 if !self.orders.contains_key(client_order_id) {
654 log::error!(
655 "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
656 );
657 error_count += 1;
658 }
659 }
660
661 for client_order_id in self.index.client_order_ids.keys() {
662 if !self.orders.contains_key(client_order_id) {
663 log::error!(
664 "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
665 );
666 error_count += 1;
667 }
668 }
669
670 for client_order_id in self.index.order_position.keys() {
671 if !self.orders.contains_key(client_order_id) {
672 log::error!(
673 "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
674 );
675 error_count += 1;
676 }
677 }
678
679 for client_order_id in self.index.order_strategy.keys() {
681 if !self.orders.contains_key(client_order_id) {
682 log::error!(
683 "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
684 );
685 error_count += 1;
686 }
687 }
688
689 for position_id in self.index.position_strategy.keys() {
690 if !self.positions.contains_key(position_id) {
691 log::error!(
692 "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
693 );
694 error_count += 1;
695 }
696 }
697
698 for position_id in self.index.position_orders.keys() {
699 if !self.positions.contains_key(position_id) {
700 log::error!(
701 "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
702 );
703 error_count += 1;
704 }
705 }
706
707 for (instrument_id, client_order_ids) in &self.index.instrument_orders {
708 for client_order_id in client_order_ids {
709 if !self.orders.contains_key(client_order_id) {
710 log::error!(
711 "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
712 );
713 error_count += 1;
714 }
715 }
716 }
717
718 for instrument_id in self.index.instrument_positions.keys() {
719 if !self.index.instrument_orders.contains_key(instrument_id) {
720 log::error!(
721 "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
722 );
723 error_count += 1;
724 }
725 }
726
727 for client_order_ids in self.index.strategy_orders.values() {
728 for client_order_id in client_order_ids {
729 if !self.orders.contains_key(client_order_id) {
730 log::error!(
731 "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
732 );
733 error_count += 1;
734 }
735 }
736 }
737
738 for position_ids in self.index.strategy_positions.values() {
739 for position_id in position_ids {
740 if !self.positions.contains_key(position_id) {
741 log::error!(
742 "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
743 );
744 error_count += 1;
745 }
746 }
747 }
748
749 for client_order_id in &self.index.orders {
750 if !self.orders.contains_key(client_order_id) {
751 log::error!(
752 "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
753 );
754 error_count += 1;
755 }
756 }
757
758 for client_order_id in &self.index.orders_emulated {
759 if !self.orders.contains_key(client_order_id) {
760 log::error!(
761 "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
762 );
763 error_count += 1;
764 }
765 }
766
767 for client_order_id in &self.index.orders_inflight {
768 if !self.orders.contains_key(client_order_id) {
769 log::error!(
770 "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
771 );
772 error_count += 1;
773 }
774 }
775
776 for client_order_id in &self.index.orders_open {
777 if !self.orders.contains_key(client_order_id) {
778 log::error!(
779 "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
780 );
781 error_count += 1;
782 }
783 }
784
785 for client_order_id in &self.index.orders_closed {
786 if !self.orders.contains_key(client_order_id) {
787 log::error!(
788 "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
789 );
790 error_count += 1;
791 }
792 }
793
794 for position_id in &self.index.positions {
795 if !self.positions.contains_key(position_id) {
796 log::error!(
797 "{failure} in `index.positions`: {position_id} not found in `self.positions`",
798 );
799 error_count += 1;
800 }
801 }
802
803 for position_id in &self.index.positions_open {
804 if !self.positions.contains_key(position_id) {
805 log::error!(
806 "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
807 );
808 error_count += 1;
809 }
810 }
811
812 for position_id in &self.index.positions_closed {
813 if !self.positions.contains_key(position_id) {
814 log::error!(
815 "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
816 );
817 error_count += 1;
818 }
819 }
820
821 for strategy_id in &self.index.strategies {
822 if !self.index.strategy_orders.contains_key(strategy_id) {
823 log::error!(
824 "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
825 );
826 error_count += 1;
827 }
828 }
829
830 for exec_algorithm_id in &self.index.exec_algorithms {
831 if !self
832 .index
833 .exec_algorithm_orders
834 .contains_key(exec_algorithm_id)
835 {
836 log::error!(
837 "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
838 );
839 error_count += 1;
840 }
841 }
842
843 let total_us = SystemTime::now()
844 .duration_since(UNIX_EPOCH)
845 .expect("Time went backwards")
846 .as_micros()
847 - timestamp_us;
848
849 if error_count == 0 {
850 log::info!("Integrity check passed in {total_us}μs");
851 true
852 } else {
853 log::error!(
854 "Integrity check failed with {error_count} error{} in {total_us}μs",
855 if error_count == 1 { "" } else { "s" },
856 );
857 false
858 }
859 }
860
861 #[must_use]
865 pub fn check_residuals(&self) -> bool {
866 log::debug!("Checking residuals");
867
868 let mut residuals = false;
869
870 for order in self.orders_open(None, None, None, None) {
872 residuals = true;
873 log::warn!("Residual {order}");
874 }
875
876 for position in self.positions_open(None, None, None, None) {
878 residuals = true;
879 log::warn!("Residual {position}");
880 }
881
882 residuals
883 }
884
885 pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
891 log::debug!(
892 "Purging closed orders{}",
893 if buffer_secs > 0 {
894 format!(" with buffer_secs={buffer_secs}")
895 } else {
896 String::new()
897 }
898 );
899
900 let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
901
902 'outer: for client_order_id in self.index.orders_closed.clone() {
903 if let Some(order) = self.orders.get(&client_order_id)
904 && order.is_closed()
905 && let Some(ts_closed) = order.ts_closed()
906 && ts_closed + buffer_ns <= ts_now
907 {
908 if let Some(linked_order_ids) = order.linked_order_ids() {
910 for linked_order_id in linked_order_ids {
911 if let Some(linked_order) = self.orders.get(linked_order_id)
912 && linked_order.is_open()
913 {
914 continue 'outer;
916 }
917 }
918 }
919
920 self.purge_order(client_order_id);
921 }
922 }
923 }
924
925 pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
927 log::debug!(
928 "Purging closed positions{}",
929 if buffer_secs > 0 {
930 format!(" with buffer_secs={buffer_secs}")
931 } else {
932 String::new()
933 }
934 );
935
936 let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
937
938 for position_id in self.index.positions_closed.clone() {
939 if let Some(position) = self.positions.get(&position_id)
940 && position.is_closed()
941 && let Some(ts_closed) = position.ts_closed
942 && ts_closed + buffer_ns <= ts_now
943 {
944 self.purge_position(position_id);
945 }
946 }
947 }
948
949 pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
953 let order = self.orders.get(&client_order_id).cloned();
955
956 if let Some(ref ord) = order
958 && ord.is_open()
959 {
960 log::warn!("Order {client_order_id} found open when purging, skipping purge");
961 return;
962 }
963
964 if let Some(ref ord) = order {
966 self.orders.remove(&client_order_id);
968
969 if let Some(venue_orders) = self.index.venue_orders.get_mut(&ord.instrument_id().venue)
971 {
972 venue_orders.remove(&client_order_id);
973 }
974
975 if let Some(venue_order_id) = ord.venue_order_id() {
977 self.index.venue_order_ids.remove(&venue_order_id);
978 }
979
980 if let Some(instrument_orders) =
982 self.index.instrument_orders.get_mut(&ord.instrument_id())
983 {
984 instrument_orders.remove(&client_order_id);
985 }
986
987 if let Some(position_id) = ord.position_id()
989 && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
990 {
991 position_orders.remove(&client_order_id);
992 }
993
994 if let Some(exec_algorithm_id) = ord.exec_algorithm_id()
996 && let Some(exec_algorithm_orders) =
997 self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
998 {
999 exec_algorithm_orders.remove(&client_order_id);
1000 }
1001
1002 if let Some(strategy_orders) = self.index.strategy_orders.get_mut(&ord.strategy_id()) {
1004 strategy_orders.remove(&client_order_id);
1005 if strategy_orders.is_empty() {
1006 self.index.strategy_orders.remove(&ord.strategy_id());
1007 }
1008 }
1009
1010 if let Some(exec_spawn_id) = ord.exec_spawn_id()
1012 && let Some(spawn_orders) = self.index.exec_spawn_orders.get_mut(&exec_spawn_id)
1013 {
1014 spawn_orders.remove(&client_order_id);
1015 if spawn_orders.is_empty() {
1016 self.index.exec_spawn_orders.remove(&exec_spawn_id);
1017 }
1018 }
1019
1020 log::info!("Purged order {client_order_id}");
1021 } else {
1022 log::warn!("Order {client_order_id} not found when purging");
1023 }
1024
1025 self.index.order_position.remove(&client_order_id);
1027 let strategy_id = self.index.order_strategy.remove(&client_order_id);
1028 self.index.order_client.remove(&client_order_id);
1029 self.index.client_order_ids.remove(&client_order_id);
1030
1031 if let Some(strategy_id) = strategy_id
1033 && let Some(strategy_orders) = self.index.strategy_orders.get_mut(&strategy_id)
1034 {
1035 strategy_orders.remove(&client_order_id);
1036 if strategy_orders.is_empty() {
1037 self.index.strategy_orders.remove(&strategy_id);
1038 }
1039 }
1040
1041 self.index.exec_spawn_orders.remove(&client_order_id);
1043
1044 self.index.orders.remove(&client_order_id);
1045 self.index.orders_closed.remove(&client_order_id);
1046 self.index.orders_emulated.remove(&client_order_id);
1047 self.index.orders_inflight.remove(&client_order_id);
1048 self.index.orders_pending_cancel.remove(&client_order_id);
1049 }
1050
1051 pub fn purge_position(&mut self, position_id: PositionId) {
1055 let position = self.positions.get(&position_id).cloned();
1057
1058 if let Some(ref pos) = position
1060 && pos.is_open()
1061 {
1062 log::warn!("Position {position_id} found open when purging, skipping purge");
1063 return;
1064 }
1065
1066 if let Some(ref pos) = position {
1068 self.positions.remove(&position_id);
1069
1070 if let Some(venue_positions) =
1072 self.index.venue_positions.get_mut(&pos.instrument_id.venue)
1073 {
1074 venue_positions.remove(&position_id);
1075 }
1076
1077 if let Some(instrument_positions) =
1079 self.index.instrument_positions.get_mut(&pos.instrument_id)
1080 {
1081 instrument_positions.remove(&position_id);
1082 }
1083
1084 if let Some(strategy_positions) =
1086 self.index.strategy_positions.get_mut(&pos.strategy_id)
1087 {
1088 strategy_positions.remove(&position_id);
1089 }
1090
1091 for client_order_id in pos.client_order_ids() {
1093 self.index.order_position.remove(&client_order_id);
1094 }
1095
1096 log::info!("Purged position {position_id}");
1097 } else {
1098 log::warn!("Position {position_id} not found when purging");
1099 }
1100
1101 self.index.position_strategy.remove(&position_id);
1103 self.index.position_orders.remove(&position_id);
1104 self.index.positions.remove(&position_id);
1105 self.index.positions_open.remove(&position_id);
1106 self.index.positions_closed.remove(&position_id);
1107
1108 self.position_snapshots.remove(&position_id);
1110 }
1111
1112 pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1117 log::debug!(
1118 "Purging account events{}",
1119 if lookback_secs > 0 {
1120 format!(" with lookback_secs={lookback_secs}")
1121 } else {
1122 String::new()
1123 }
1124 );
1125
1126 for account in self.accounts.values_mut() {
1127 let event_count = account.event_count();
1128 account.purge_account_events(ts_now, lookback_secs);
1129 let count_diff = event_count - account.event_count();
1130 if count_diff > 0 {
1131 log::info!(
1132 "Purged {} event(s) from account {}",
1133 count_diff,
1134 account.id()
1135 );
1136 }
1137 }
1138 }
1139
1140 pub fn clear_index(&mut self) {
1142 self.index.clear();
1143 log::debug!("Cleared index");
1144 }
1145
1146 pub fn reset(&mut self) {
1150 log::debug!("Resetting cache");
1151
1152 self.general.clear();
1153 self.currencies.clear();
1154 self.instruments.clear();
1155 self.synthetics.clear();
1156 self.books.clear();
1157 self.own_books.clear();
1158 self.quotes.clear();
1159 self.trades.clear();
1160 self.mark_xrates.clear();
1161 self.mark_prices.clear();
1162 self.index_prices.clear();
1163 self.bars.clear();
1164 self.accounts.clear();
1165 self.orders.clear();
1166 self.order_lists.clear();
1167 self.positions.clear();
1168 self.position_snapshots.clear();
1169 self.greeks.clear();
1170 self.yield_curves.clear();
1171
1172 #[cfg(feature = "defi")]
1173 {
1174 self.defi.pools.clear();
1175 self.defi.pool_profilers.clear();
1176 }
1177
1178 self.clear_index();
1179
1180 log::info!("Reset cache");
1181 }
1182
1183 pub fn dispose(&mut self) {
1187 if let Some(database) = &mut self.database
1188 && let Err(e) = database.close()
1189 {
1190 log::error!("Failed to close database during dispose: {e}");
1191 }
1192 }
1193
1194 pub fn flush_db(&mut self) {
1198 if let Some(database) = &mut self.database
1199 && let Err(e) = database.flush()
1200 {
1201 log::error!("Failed to flush database: {e}");
1202 }
1203 }
1204
1205 pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1213 check_valid_string_ascii(key, stringify!(key))?;
1214 check_predicate_false(value.is_empty(), stringify!(value))?;
1215
1216 log::debug!("Adding general {key}");
1217 self.general.insert(key.to_string(), value.clone());
1218
1219 if let Some(database) = &mut self.database {
1220 database.add(key.to_string(), value)?;
1221 }
1222 Ok(())
1223 }
1224
1225 pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1231 log::debug!("Adding `OrderBook` {}", book.instrument_id);
1232
1233 if self.config.save_market_data
1234 && let Some(database) = &mut self.database
1235 {
1236 database.add_order_book(&book)?;
1237 }
1238
1239 self.books.insert(book.instrument_id, book);
1240 Ok(())
1241 }
1242
1243 pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1249 log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1250
1251 self.own_books.insert(own_book.instrument_id, own_book);
1252 Ok(())
1253 }
1254
1255 pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1261 log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1262
1263 if self.config.save_market_data {
1264 }
1266
1267 let mark_prices_deque = self
1268 .mark_prices
1269 .entry(mark_price.instrument_id)
1270 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1271 mark_prices_deque.push_front(mark_price);
1272 Ok(())
1273 }
1274
1275 pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1281 log::debug!(
1282 "Adding `IndexPriceUpdate` for {}",
1283 index_price.instrument_id
1284 );
1285
1286 if self.config.save_market_data {
1287 }
1289
1290 let index_prices_deque = self
1291 .index_prices
1292 .entry(index_price.instrument_id)
1293 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1294 index_prices_deque.push_front(index_price);
1295 Ok(())
1296 }
1297
1298 pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
1304 log::debug!(
1305 "Adding `FundingRateUpdate` for {}",
1306 funding_rate.instrument_id
1307 );
1308
1309 if self.config.save_market_data {
1310 }
1312
1313 self.funding_rates
1314 .insert(funding_rate.instrument_id, funding_rate);
1315 Ok(())
1316 }
1317
1318 pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1324 log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1325
1326 if self.config.save_market_data
1327 && let Some(database) = &mut self.database
1328 {
1329 database.add_quote("e)?;
1330 }
1331
1332 let quotes_deque = self
1333 .quotes
1334 .entry(quote.instrument_id)
1335 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1336 quotes_deque.push_front(quote);
1337 Ok(())
1338 }
1339
1340 pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1346 check_slice_not_empty(quotes, stringify!(quotes))?;
1347
1348 let instrument_id = quotes[0].instrument_id;
1349 log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1350
1351 if self.config.save_market_data
1352 && let Some(database) = &mut self.database
1353 {
1354 for quote in quotes {
1355 database.add_quote(quote)?;
1356 }
1357 }
1358
1359 let quotes_deque = self
1360 .quotes
1361 .entry(instrument_id)
1362 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1363
1364 for quote in quotes {
1365 quotes_deque.push_front(*quote);
1366 }
1367 Ok(())
1368 }
1369
1370 pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1376 log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1377
1378 if self.config.save_market_data
1379 && let Some(database) = &mut self.database
1380 {
1381 database.add_trade(&trade)?;
1382 }
1383
1384 let trades_deque = self
1385 .trades
1386 .entry(trade.instrument_id)
1387 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1388 trades_deque.push_front(trade);
1389 Ok(())
1390 }
1391
1392 pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1398 check_slice_not_empty(trades, stringify!(trades))?;
1399
1400 let instrument_id = trades[0].instrument_id;
1401 log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1402
1403 if self.config.save_market_data
1404 && let Some(database) = &mut self.database
1405 {
1406 for trade in trades {
1407 database.add_trade(trade)?;
1408 }
1409 }
1410
1411 let trades_deque = self
1412 .trades
1413 .entry(instrument_id)
1414 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1415
1416 for trade in trades {
1417 trades_deque.push_front(*trade);
1418 }
1419 Ok(())
1420 }
1421
1422 pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1428 log::debug!("Adding `Bar` {}", bar.bar_type);
1429
1430 if self.config.save_market_data
1431 && let Some(database) = &mut self.database
1432 {
1433 database.add_bar(&bar)?;
1434 }
1435
1436 let bars = self
1437 .bars
1438 .entry(bar.bar_type)
1439 .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1440 bars.push_front(bar);
1441 Ok(())
1442 }
1443
1444 pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1450 check_slice_not_empty(bars, stringify!(bars))?;
1451
1452 let bar_type = bars[0].bar_type;
1453 log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1454
1455 if self.config.save_market_data
1456 && let Some(database) = &mut self.database
1457 {
1458 for bar in bars {
1459 database.add_bar(bar)?;
1460 }
1461 }
1462
1463 let bars_deque = self
1464 .bars
1465 .entry(bar_type)
1466 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1467
1468 for bar in bars {
1469 bars_deque.push_front(*bar);
1470 }
1471 Ok(())
1472 }
1473
1474 pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1480 log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1481
1482 if self.config.save_market_data
1483 && let Some(_database) = &mut self.database
1484 {
1485 }
1487
1488 self.greeks.insert(greeks.instrument_id, greeks);
1489 Ok(())
1490 }
1491
1492 pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1494 self.greeks.get(instrument_id).cloned()
1495 }
1496
1497 pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1503 log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1504
1505 if self.config.save_market_data
1506 && let Some(_database) = &mut self.database
1507 {
1508 }
1510
1511 self.yield_curves
1512 .insert(yield_curve.curve_name.clone(), yield_curve);
1513 Ok(())
1514 }
1515
1516 pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1518 self.yield_curves.get(key).map(|curve| {
1519 let curve_clone = curve.clone();
1520 Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
1521 as Box<dyn Fn(f64) -> f64>
1522 })
1523 }
1524
1525 pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1531 log::debug!("Adding `Currency` {}", currency.code);
1532
1533 if let Some(database) = &mut self.database {
1534 database.add_currency(¤cy)?;
1535 }
1536
1537 self.currencies.insert(currency.code, currency);
1538 Ok(())
1539 }
1540
1541 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1547 log::debug!("Adding `Instrument` {}", instrument.id());
1548
1549 if let Some(database) = &mut self.database {
1550 database.add_instrument(&instrument)?;
1551 }
1552
1553 self.instruments.insert(instrument.id(), instrument);
1554 Ok(())
1555 }
1556
1557 pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1563 log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1564
1565 if let Some(database) = &mut self.database {
1566 database.add_synthetic(&synthetic)?;
1567 }
1568
1569 self.synthetics.insert(synthetic.id, synthetic);
1570 Ok(())
1571 }
1572
1573 pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1579 log::debug!("Adding `Account` {}", account.id());
1580
1581 if let Some(database) = &mut self.database {
1582 database.add_account(&account)?;
1583 }
1584
1585 let account_id = account.id();
1586 self.accounts.insert(account_id, account);
1587 self.index
1588 .venue_account
1589 .insert(account_id.get_issuer(), account_id);
1590 Ok(())
1591 }
1592
1593 pub fn add_venue_order_id(
1601 &mut self,
1602 client_order_id: &ClientOrderId,
1603 venue_order_id: &VenueOrderId,
1604 overwrite: bool,
1605 ) -> anyhow::Result<()> {
1606 if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
1607 && !overwrite
1608 && existing_venue_order_id != venue_order_id
1609 {
1610 anyhow::bail!(
1611 "Existing {existing_venue_order_id} for {client_order_id}
1612 did not match the given {venue_order_id}.
1613 If you are writing a test then try a different `venue_order_id`,
1614 otherwise this is probably a bug."
1615 );
1616 }
1617
1618 self.index
1619 .client_order_ids
1620 .insert(*client_order_id, *venue_order_id);
1621 self.index
1622 .venue_order_ids
1623 .insert(*venue_order_id, *client_order_id);
1624
1625 Ok(())
1626 }
1627
1628 pub fn add_order(
1640 &mut self,
1641 order: OrderAny,
1642 position_id: Option<PositionId>,
1643 client_id: Option<ClientId>,
1644 replace_existing: bool,
1645 ) -> anyhow::Result<()> {
1646 let instrument_id = order.instrument_id();
1647 let venue = instrument_id.venue;
1648 let client_order_id = order.client_order_id();
1649 let strategy_id = order.strategy_id();
1650 let exec_algorithm_id = order.exec_algorithm_id();
1651 let exec_spawn_id = order.exec_spawn_id();
1652
1653 if !replace_existing {
1654 check_key_not_in_map(
1655 &client_order_id,
1656 &self.orders,
1657 stringify!(client_order_id),
1658 stringify!(orders),
1659 )?;
1660 }
1661
1662 log::debug!("Adding {order:?}");
1663
1664 self.index.orders.insert(client_order_id);
1665 self.index
1666 .order_strategy
1667 .insert(client_order_id, strategy_id);
1668 self.index.strategies.insert(strategy_id);
1669
1670 self.index
1672 .venue_orders
1673 .entry(venue)
1674 .or_default()
1675 .insert(client_order_id);
1676
1677 self.index
1679 .instrument_orders
1680 .entry(instrument_id)
1681 .or_default()
1682 .insert(client_order_id);
1683
1684 self.index
1686 .strategy_orders
1687 .entry(strategy_id)
1688 .or_default()
1689 .insert(client_order_id);
1690
1691 if let Some(exec_algorithm_id) = exec_algorithm_id {
1693 self.index.exec_algorithms.insert(exec_algorithm_id);
1694
1695 self.index
1696 .exec_algorithm_orders
1697 .entry(exec_algorithm_id)
1698 .or_default()
1699 .insert(client_order_id);
1700 }
1701
1702 if let Some(exec_spawn_id) = exec_spawn_id {
1704 self.index
1705 .exec_spawn_orders
1706 .entry(exec_spawn_id)
1707 .or_default()
1708 .insert(client_order_id);
1709 }
1710
1711 match order.emulation_trigger() {
1713 Some(_) => {
1714 self.index.orders_emulated.remove(&client_order_id);
1715 }
1716 None => {
1717 self.index.orders_emulated.insert(client_order_id);
1718 }
1719 }
1720
1721 if let Some(position_id) = position_id {
1723 self.add_position_id(
1724 &position_id,
1725 &order.instrument_id().venue,
1726 &client_order_id,
1727 &strategy_id,
1728 )?;
1729 }
1730
1731 if let Some(client_id) = client_id {
1733 self.index.order_client.insert(client_order_id, client_id);
1734 log::debug!("Indexed {client_id:?}");
1735 }
1736
1737 if let Some(database) = &mut self.database {
1738 database.add_order(&order, client_id)?;
1739 }
1744
1745 self.orders.insert(client_order_id, order);
1746
1747 Ok(())
1748 }
1749
1750 pub fn add_position_id(
1756 &mut self,
1757 position_id: &PositionId,
1758 venue: &Venue,
1759 client_order_id: &ClientOrderId,
1760 strategy_id: &StrategyId,
1761 ) -> anyhow::Result<()> {
1762 self.index
1763 .order_position
1764 .insert(*client_order_id, *position_id);
1765
1766 if let Some(database) = &mut self.database {
1768 database.index_order_position(*client_order_id, *position_id)?;
1769 }
1770
1771 self.index
1773 .position_strategy
1774 .insert(*position_id, *strategy_id);
1775
1776 self.index
1778 .position_orders
1779 .entry(*position_id)
1780 .or_default()
1781 .insert(*client_order_id);
1782
1783 self.index
1785 .strategy_positions
1786 .entry(*strategy_id)
1787 .or_default()
1788 .insert(*position_id);
1789
1790 self.index
1792 .venue_positions
1793 .entry(*venue)
1794 .or_default()
1795 .insert(*position_id);
1796
1797 Ok(())
1798 }
1799
1800 pub fn add_position(&mut self, position: Position, _oms_type: OmsType) -> anyhow::Result<()> {
1806 self.positions.insert(position.id, position.clone());
1807 self.index.positions.insert(position.id);
1808 self.index.positions_open.insert(position.id);
1809 self.index.positions_closed.remove(&position.id); log::debug!("Adding {position}");
1812
1813 self.add_position_id(
1814 &position.id,
1815 &position.instrument_id.venue,
1816 &position.opening_order_id,
1817 &position.strategy_id,
1818 )?;
1819
1820 let venue = position.instrument_id.venue;
1821 let venue_positions = self.index.venue_positions.entry(venue).or_default();
1822 venue_positions.insert(position.id);
1823
1824 let instrument_id = position.instrument_id;
1826 let instrument_positions = self
1827 .index
1828 .instrument_positions
1829 .entry(instrument_id)
1830 .or_default();
1831 instrument_positions.insert(position.id);
1832
1833 if let Some(database) = &mut self.database {
1834 database.add_position(&position)?;
1835 }
1844
1845 Ok(())
1846 }
1847
1848 pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1854 if let Some(database) = &mut self.database {
1855 database.update_account(&account)?;
1856 }
1857 Ok(())
1858 }
1859
1860 pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
1866 let client_order_id = order.client_order_id();
1867
1868 if let Some(venue_order_id) = order.venue_order_id() {
1870 if !self.index.venue_order_ids.contains_key(&venue_order_id) {
1873 self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
1875 }
1876 }
1877
1878 if order.is_inflight() {
1880 self.index.orders_inflight.insert(client_order_id);
1881 } else {
1882 self.index.orders_inflight.remove(&client_order_id);
1883 }
1884
1885 if order.is_open() {
1887 self.index.orders_closed.remove(&client_order_id);
1888 self.index.orders_open.insert(client_order_id);
1889 } else if order.is_closed() {
1890 self.index.orders_open.remove(&client_order_id);
1891 self.index.orders_pending_cancel.remove(&client_order_id);
1892 self.index.orders_closed.insert(client_order_id);
1893 }
1894
1895 if let Some(emulation_trigger) = order.emulation_trigger() {
1897 match emulation_trigger {
1898 TriggerType::NoTrigger => self.index.orders_emulated.remove(&client_order_id),
1899 _ => self.index.orders_emulated.insert(client_order_id),
1900 };
1901 }
1902
1903 if self.own_order_book(&order.instrument_id()).is_some()
1905 && should_handle_own_book_order(order)
1906 {
1907 self.update_own_order_book(order);
1908 }
1909
1910 if let Some(database) = &mut self.database {
1911 database.update_order(order.last_event())?;
1912 }
1917
1918 self.orders.insert(client_order_id, order.clone());
1920
1921 Ok(())
1922 }
1923
1924 pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
1926 self.index
1927 .orders_pending_cancel
1928 .insert(order.client_order_id());
1929 }
1930
1931 pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
1937 if position.is_open() {
1940 self.index.positions_open.insert(position.id);
1941 self.index.positions_closed.remove(&position.id);
1942 } else {
1943 self.index.positions_closed.insert(position.id);
1944 self.index.positions_open.remove(&position.id);
1945 }
1946
1947 if let Some(database) = &mut self.database {
1948 database.update_position(position)?;
1949 }
1954
1955 self.positions.insert(position.id, position.clone());
1956
1957 Ok(())
1958 }
1959
1960 pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
1967 let position_id = position.id;
1968
1969 let mut copied_position = position.clone();
1970 let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
1971 copied_position.id = PositionId::new(new_id);
1972
1973 let position_serialized = serde_json::to_vec(&copied_position)?;
1975
1976 let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
1977 let new_snapshots = match snapshots {
1978 Some(existing_snapshots) => {
1979 let mut combined = existing_snapshots.to_vec();
1980 combined.extend(position_serialized);
1981 Bytes::from(combined)
1982 }
1983 None => Bytes::from(position_serialized),
1984 };
1985 self.position_snapshots.insert(position_id, new_snapshots);
1986
1987 log::debug!("Snapshot {copied_position}");
1988 Ok(())
1989 }
1990
1991 pub fn snapshot_position_state(
1997 &mut self,
1998 position: &Position,
1999 open_only: Option<bool>,
2002 ) -> anyhow::Result<()> {
2003 let open_only = open_only.unwrap_or(true);
2004
2005 if open_only && !position.is_open() {
2006 return Ok(());
2007 }
2008
2009 if let Some(database) = &mut self.database {
2010 database.snapshot_position_state(position).map_err(|e| {
2011 log::error!(
2012 "Failed to snapshot position state for {}: {e:?}",
2013 position.id
2014 );
2015 e
2016 })?;
2017 } else {
2018 log::warn!(
2019 "Cannot snapshot position state for {} (no database configured)",
2020 position.id
2021 );
2022 }
2023
2024 todo!()
2026 }
2027
2028 #[must_use]
2030 pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
2031 if self.index.position_strategy.contains_key(position_id) {
2033 Some(OmsType::Netting)
2036 } else {
2037 None
2038 }
2039 }
2040
2041 #[must_use]
2043 pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<u8>> {
2044 self.position_snapshots.get(position_id).map(|b| b.to_vec())
2045 }
2046
2047 #[must_use]
2049 pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> AHashSet<PositionId> {
2050 let mut result = AHashSet::new();
2052 for (position_id, _) in &self.position_snapshots {
2053 if let Some(position) = self.positions.get(position_id)
2055 && position.instrument_id == *instrument_id
2056 {
2057 result.insert(*position_id);
2058 }
2059 }
2060 result
2061 }
2062
2063 pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
2069 let database = if let Some(database) = &self.database {
2070 database
2071 } else {
2072 log::warn!(
2073 "Cannot snapshot order state for {} (no database configured)",
2074 order.client_order_id()
2075 );
2076 return Ok(());
2077 };
2078
2079 database.snapshot_order_state(order)
2080 }
2081
2082 fn build_order_query_filter_set(
2085 &self,
2086 venue: Option<&Venue>,
2087 instrument_id: Option<&InstrumentId>,
2088 strategy_id: Option<&StrategyId>,
2089 ) -> Option<AHashSet<ClientOrderId>> {
2090 let mut query: Option<AHashSet<ClientOrderId>> = None;
2091
2092 if let Some(venue) = venue {
2093 query = Some(
2094 self.index
2095 .venue_orders
2096 .get(venue)
2097 .cloned()
2098 .unwrap_or_default(),
2099 );
2100 }
2101
2102 if let Some(instrument_id) = instrument_id {
2103 let instrument_orders = self
2104 .index
2105 .instrument_orders
2106 .get(instrument_id)
2107 .cloned()
2108 .unwrap_or_default();
2109
2110 if let Some(existing_query) = &mut query {
2111 *existing_query = existing_query
2112 .intersection(&instrument_orders)
2113 .copied()
2114 .collect();
2115 } else {
2116 query = Some(instrument_orders);
2117 }
2118 }
2119
2120 if let Some(strategy_id) = strategy_id {
2121 let strategy_orders = self
2122 .index
2123 .strategy_orders
2124 .get(strategy_id)
2125 .cloned()
2126 .unwrap_or_default();
2127
2128 if let Some(existing_query) = &mut query {
2129 *existing_query = existing_query
2130 .intersection(&strategy_orders)
2131 .copied()
2132 .collect();
2133 } else {
2134 query = Some(strategy_orders);
2135 }
2136 }
2137
2138 query
2139 }
2140
2141 fn build_position_query_filter_set(
2142 &self,
2143 venue: Option<&Venue>,
2144 instrument_id: Option<&InstrumentId>,
2145 strategy_id: Option<&StrategyId>,
2146 ) -> Option<AHashSet<PositionId>> {
2147 let mut query: Option<AHashSet<PositionId>> = None;
2148
2149 if let Some(venue) = venue {
2150 query = Some(
2151 self.index
2152 .venue_positions
2153 .get(venue)
2154 .cloned()
2155 .unwrap_or_default(),
2156 );
2157 }
2158
2159 if let Some(instrument_id) = instrument_id {
2160 let instrument_positions = self
2161 .index
2162 .instrument_positions
2163 .get(instrument_id)
2164 .cloned()
2165 .unwrap_or_default();
2166
2167 if let Some(existing_query) = query {
2168 query = Some(
2169 existing_query
2170 .intersection(&instrument_positions)
2171 .copied()
2172 .collect(),
2173 );
2174 } else {
2175 query = Some(instrument_positions);
2176 }
2177 }
2178
2179 if let Some(strategy_id) = strategy_id {
2180 let strategy_positions = self
2181 .index
2182 .strategy_positions
2183 .get(strategy_id)
2184 .cloned()
2185 .unwrap_or_default();
2186
2187 if let Some(existing_query) = query {
2188 query = Some(
2189 existing_query
2190 .intersection(&strategy_positions)
2191 .copied()
2192 .collect(),
2193 );
2194 } else {
2195 query = Some(strategy_positions);
2196 }
2197 }
2198
2199 query
2200 }
2201
2202 fn get_orders_for_ids(
2208 &self,
2209 client_order_ids: &AHashSet<ClientOrderId>,
2210 side: Option<OrderSide>,
2211 ) -> Vec<&OrderAny> {
2212 let side = side.unwrap_or(OrderSide::NoOrderSide);
2213 let mut orders = Vec::new();
2214
2215 for client_order_id in client_order_ids {
2216 let order = self
2217 .orders
2218 .get(client_order_id)
2219 .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
2220 if side == OrderSide::NoOrderSide || side == order.order_side() {
2221 orders.push(order);
2222 }
2223 }
2224
2225 orders
2226 }
2227
2228 fn get_positions_for_ids(
2234 &self,
2235 position_ids: &AHashSet<PositionId>,
2236 side: Option<PositionSide>,
2237 ) -> Vec<&Position> {
2238 let side = side.unwrap_or(PositionSide::NoPositionSide);
2239 let mut positions = Vec::new();
2240
2241 for position_id in position_ids {
2242 let position = self
2243 .positions
2244 .get(position_id)
2245 .unwrap_or_else(|| panic!("Position {position_id} not found"));
2246 if side == PositionSide::NoPositionSide || side == position.side {
2247 positions.push(position);
2248 }
2249 }
2250
2251 positions
2252 }
2253
2254 #[must_use]
2256 pub fn client_order_ids(
2257 &self,
2258 venue: Option<&Venue>,
2259 instrument_id: Option<&InstrumentId>,
2260 strategy_id: Option<&StrategyId>,
2261 ) -> AHashSet<ClientOrderId> {
2262 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2263 match query {
2264 Some(query) => self.index.orders.intersection(&query).copied().collect(),
2265 None => self.index.orders.clone(),
2266 }
2267 }
2268
2269 #[must_use]
2271 pub fn client_order_ids_open(
2272 &self,
2273 venue: Option<&Venue>,
2274 instrument_id: Option<&InstrumentId>,
2275 strategy_id: Option<&StrategyId>,
2276 ) -> AHashSet<ClientOrderId> {
2277 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2278 match query {
2279 Some(query) => self
2280 .index
2281 .orders_open
2282 .intersection(&query)
2283 .copied()
2284 .collect(),
2285 None => self.index.orders_open.clone(),
2286 }
2287 }
2288
2289 #[must_use]
2291 pub fn client_order_ids_closed(
2292 &self,
2293 venue: Option<&Venue>,
2294 instrument_id: Option<&InstrumentId>,
2295 strategy_id: Option<&StrategyId>,
2296 ) -> AHashSet<ClientOrderId> {
2297 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2298 match query {
2299 Some(query) => self
2300 .index
2301 .orders_closed
2302 .intersection(&query)
2303 .copied()
2304 .collect(),
2305 None => self.index.orders_closed.clone(),
2306 }
2307 }
2308
2309 #[must_use]
2311 pub fn client_order_ids_emulated(
2312 &self,
2313 venue: Option<&Venue>,
2314 instrument_id: Option<&InstrumentId>,
2315 strategy_id: Option<&StrategyId>,
2316 ) -> AHashSet<ClientOrderId> {
2317 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2318 match query {
2319 Some(query) => self
2320 .index
2321 .orders_emulated
2322 .intersection(&query)
2323 .copied()
2324 .collect(),
2325 None => self.index.orders_emulated.clone(),
2326 }
2327 }
2328
2329 #[must_use]
2331 pub fn client_order_ids_inflight(
2332 &self,
2333 venue: Option<&Venue>,
2334 instrument_id: Option<&InstrumentId>,
2335 strategy_id: Option<&StrategyId>,
2336 ) -> AHashSet<ClientOrderId> {
2337 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2338 match query {
2339 Some(query) => self
2340 .index
2341 .orders_inflight
2342 .intersection(&query)
2343 .copied()
2344 .collect(),
2345 None => self.index.orders_inflight.clone(),
2346 }
2347 }
2348
2349 #[must_use]
2351 pub fn position_ids(
2352 &self,
2353 venue: Option<&Venue>,
2354 instrument_id: Option<&InstrumentId>,
2355 strategy_id: Option<&StrategyId>,
2356 ) -> AHashSet<PositionId> {
2357 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2358 match query {
2359 Some(query) => self.index.positions.intersection(&query).copied().collect(),
2360 None => self.index.positions.clone(),
2361 }
2362 }
2363
2364 #[must_use]
2366 pub fn position_open_ids(
2367 &self,
2368 venue: Option<&Venue>,
2369 instrument_id: Option<&InstrumentId>,
2370 strategy_id: Option<&StrategyId>,
2371 ) -> AHashSet<PositionId> {
2372 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2373 match query {
2374 Some(query) => self
2375 .index
2376 .positions_open
2377 .intersection(&query)
2378 .copied()
2379 .collect(),
2380 None => self.index.positions_open.clone(),
2381 }
2382 }
2383
2384 #[must_use]
2386 pub fn position_closed_ids(
2387 &self,
2388 venue: Option<&Venue>,
2389 instrument_id: Option<&InstrumentId>,
2390 strategy_id: Option<&StrategyId>,
2391 ) -> AHashSet<PositionId> {
2392 let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2393 match query {
2394 Some(query) => self
2395 .index
2396 .positions_closed
2397 .intersection(&query)
2398 .copied()
2399 .collect(),
2400 None => self.index.positions_closed.clone(),
2401 }
2402 }
2403
2404 #[must_use]
2406 pub fn actor_ids(&self) -> AHashSet<ComponentId> {
2407 self.index.actors.clone()
2408 }
2409
2410 #[must_use]
2412 pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
2413 self.index.strategies.clone()
2414 }
2415
2416 #[must_use]
2418 pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
2419 self.index.exec_algorithms.clone()
2420 }
2421
2422 #[must_use]
2426 pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2427 self.orders.get(client_order_id)
2428 }
2429
2430 #[must_use]
2432 pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2433 self.orders.get_mut(client_order_id)
2434 }
2435
2436 #[must_use]
2438 pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2439 self.index.venue_order_ids.get(venue_order_id)
2440 }
2441
2442 #[must_use]
2444 pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2445 self.index.client_order_ids.get(client_order_id)
2446 }
2447
2448 #[must_use]
2450 pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2451 self.index.order_client.get(client_order_id)
2452 }
2453
2454 #[must_use]
2456 pub fn orders(
2457 &self,
2458 venue: Option<&Venue>,
2459 instrument_id: Option<&InstrumentId>,
2460 strategy_id: Option<&StrategyId>,
2461 side: Option<OrderSide>,
2462 ) -> Vec<&OrderAny> {
2463 let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id);
2464 self.get_orders_for_ids(&client_order_ids, side)
2465 }
2466
2467 #[must_use]
2469 pub fn orders_open(
2470 &self,
2471 venue: Option<&Venue>,
2472 instrument_id: Option<&InstrumentId>,
2473 strategy_id: Option<&StrategyId>,
2474 side: Option<OrderSide>,
2475 ) -> Vec<&OrderAny> {
2476 let client_order_ids = self.client_order_ids_open(venue, instrument_id, strategy_id);
2477 self.get_orders_for_ids(&client_order_ids, side)
2478 }
2479
2480 #[must_use]
2482 pub fn orders_closed(
2483 &self,
2484 venue: Option<&Venue>,
2485 instrument_id: Option<&InstrumentId>,
2486 strategy_id: Option<&StrategyId>,
2487 side: Option<OrderSide>,
2488 ) -> Vec<&OrderAny> {
2489 let client_order_ids = self.client_order_ids_closed(venue, instrument_id, strategy_id);
2490 self.get_orders_for_ids(&client_order_ids, side)
2491 }
2492
2493 #[must_use]
2495 pub fn orders_emulated(
2496 &self,
2497 venue: Option<&Venue>,
2498 instrument_id: Option<&InstrumentId>,
2499 strategy_id: Option<&StrategyId>,
2500 side: Option<OrderSide>,
2501 ) -> Vec<&OrderAny> {
2502 let client_order_ids = self.client_order_ids_emulated(venue, instrument_id, strategy_id);
2503 self.get_orders_for_ids(&client_order_ids, side)
2504 }
2505
2506 #[must_use]
2508 pub fn orders_inflight(
2509 &self,
2510 venue: Option<&Venue>,
2511 instrument_id: Option<&InstrumentId>,
2512 strategy_id: Option<&StrategyId>,
2513 side: Option<OrderSide>,
2514 ) -> Vec<&OrderAny> {
2515 let client_order_ids = self.client_order_ids_inflight(venue, instrument_id, strategy_id);
2516 self.get_orders_for_ids(&client_order_ids, side)
2517 }
2518
2519 #[must_use]
2521 pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2522 let client_order_ids = self.index.position_orders.get(position_id);
2523 match client_order_ids {
2524 Some(client_order_ids) => {
2525 self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2526 }
2527 None => Vec::new(),
2528 }
2529 }
2530
2531 #[must_use]
2533 pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2534 self.index.orders.contains(client_order_id)
2535 }
2536
2537 #[must_use]
2539 pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2540 self.index.orders_open.contains(client_order_id)
2541 }
2542
2543 #[must_use]
2545 pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2546 self.index.orders_closed.contains(client_order_id)
2547 }
2548
2549 #[must_use]
2551 pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2552 self.index.orders_emulated.contains(client_order_id)
2553 }
2554
2555 #[must_use]
2557 pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2558 self.index.orders_inflight.contains(client_order_id)
2559 }
2560
2561 #[must_use]
2563 pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2564 self.index.orders_pending_cancel.contains(client_order_id)
2565 }
2566
2567 #[must_use]
2569 pub fn orders_open_count(
2570 &self,
2571 venue: Option<&Venue>,
2572 instrument_id: Option<&InstrumentId>,
2573 strategy_id: Option<&StrategyId>,
2574 side: Option<OrderSide>,
2575 ) -> usize {
2576 self.orders_open(venue, instrument_id, strategy_id, side)
2577 .len()
2578 }
2579
2580 #[must_use]
2582 pub fn orders_closed_count(
2583 &self,
2584 venue: Option<&Venue>,
2585 instrument_id: Option<&InstrumentId>,
2586 strategy_id: Option<&StrategyId>,
2587 side: Option<OrderSide>,
2588 ) -> usize {
2589 self.orders_closed(venue, instrument_id, strategy_id, side)
2590 .len()
2591 }
2592
2593 #[must_use]
2595 pub fn orders_emulated_count(
2596 &self,
2597 venue: Option<&Venue>,
2598 instrument_id: Option<&InstrumentId>,
2599 strategy_id: Option<&StrategyId>,
2600 side: Option<OrderSide>,
2601 ) -> usize {
2602 self.orders_emulated(venue, instrument_id, strategy_id, side)
2603 .len()
2604 }
2605
2606 #[must_use]
2608 pub fn orders_inflight_count(
2609 &self,
2610 venue: Option<&Venue>,
2611 instrument_id: Option<&InstrumentId>,
2612 strategy_id: Option<&StrategyId>,
2613 side: Option<OrderSide>,
2614 ) -> usize {
2615 self.orders_inflight(venue, instrument_id, strategy_id, side)
2616 .len()
2617 }
2618
2619 #[must_use]
2621 pub fn orders_total_count(
2622 &self,
2623 venue: Option<&Venue>,
2624 instrument_id: Option<&InstrumentId>,
2625 strategy_id: Option<&StrategyId>,
2626 side: Option<OrderSide>,
2627 ) -> usize {
2628 self.orders(venue, instrument_id, strategy_id, side).len()
2629 }
2630
2631 #[must_use]
2633 pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
2634 self.order_lists.get(order_list_id)
2635 }
2636
2637 #[must_use]
2639 pub fn order_lists(
2640 &self,
2641 venue: Option<&Venue>,
2642 instrument_id: Option<&InstrumentId>,
2643 strategy_id: Option<&StrategyId>,
2644 ) -> Vec<&OrderList> {
2645 let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
2646
2647 if let Some(venue) = venue {
2648 order_lists.retain(|ol| &ol.instrument_id.venue == venue);
2649 }
2650
2651 if let Some(instrument_id) = instrument_id {
2652 order_lists.retain(|ol| &ol.instrument_id == instrument_id);
2653 }
2654
2655 if let Some(strategy_id) = strategy_id {
2656 order_lists.retain(|ol| &ol.strategy_id == strategy_id);
2657 }
2658
2659 order_lists
2660 }
2661
2662 #[must_use]
2664 pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
2665 self.order_lists.contains_key(order_list_id)
2666 }
2667
2668 #[must_use]
2673 pub fn orders_for_exec_algorithm(
2674 &self,
2675 exec_algorithm_id: &ExecAlgorithmId,
2676 venue: Option<&Venue>,
2677 instrument_id: Option<&InstrumentId>,
2678 strategy_id: Option<&StrategyId>,
2679 side: Option<OrderSide>,
2680 ) -> Vec<&OrderAny> {
2681 let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2682 let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
2683
2684 if let Some(query) = query
2685 && let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids
2686 {
2687 let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
2688 }
2689
2690 if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2691 self.get_orders_for_ids(exec_algorithm_order_ids, side)
2692 } else {
2693 Vec::new()
2694 }
2695 }
2696
2697 #[must_use]
2699 pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
2700 self.get_orders_for_ids(
2701 self.index
2702 .exec_spawn_orders
2703 .get(exec_spawn_id)
2704 .unwrap_or(&AHashSet::new()),
2705 None,
2706 )
2707 }
2708
2709 #[must_use]
2711 pub fn exec_spawn_total_quantity(
2712 &self,
2713 exec_spawn_id: &ClientOrderId,
2714 active_only: bool,
2715 ) -> Option<Quantity> {
2716 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2717
2718 let mut total_quantity: Option<Quantity> = None;
2719
2720 for spawn_order in exec_spawn_orders {
2721 if active_only && spawn_order.is_closed() {
2722 continue;
2723 }
2724
2725 match total_quantity.as_mut() {
2726 Some(total) => *total += spawn_order.quantity(),
2727 None => total_quantity = Some(spawn_order.quantity()),
2728 }
2729 }
2730
2731 total_quantity
2732 }
2733
2734 #[must_use]
2736 pub fn exec_spawn_total_filled_qty(
2737 &self,
2738 exec_spawn_id: &ClientOrderId,
2739 active_only: bool,
2740 ) -> Option<Quantity> {
2741 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2742
2743 let mut total_quantity: Option<Quantity> = None;
2744
2745 for spawn_order in exec_spawn_orders {
2746 if active_only && spawn_order.is_closed() {
2747 continue;
2748 }
2749
2750 match total_quantity.as_mut() {
2751 Some(total) => *total += spawn_order.filled_qty(),
2752 None => total_quantity = Some(spawn_order.filled_qty()),
2753 }
2754 }
2755
2756 total_quantity
2757 }
2758
2759 #[must_use]
2761 pub fn exec_spawn_total_leaves_qty(
2762 &self,
2763 exec_spawn_id: &ClientOrderId,
2764 active_only: bool,
2765 ) -> Option<Quantity> {
2766 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2767
2768 let mut total_quantity: Option<Quantity> = None;
2769
2770 for spawn_order in exec_spawn_orders {
2771 if active_only && spawn_order.is_closed() {
2772 continue;
2773 }
2774
2775 match total_quantity.as_mut() {
2776 Some(total) => *total += spawn_order.leaves_qty(),
2777 None => total_quantity = Some(spawn_order.leaves_qty()),
2778 }
2779 }
2780
2781 total_quantity
2782 }
2783
2784 #[must_use]
2788 pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
2789 self.positions.get(position_id)
2790 }
2791
2792 #[must_use]
2794 pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
2795 self.index
2796 .order_position
2797 .get(client_order_id)
2798 .and_then(|position_id| self.positions.get(position_id))
2799 }
2800
2801 #[must_use]
2803 pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
2804 self.index.order_position.get(client_order_id)
2805 }
2806
2807 #[must_use]
2809 pub fn positions(
2810 &self,
2811 venue: Option<&Venue>,
2812 instrument_id: Option<&InstrumentId>,
2813 strategy_id: Option<&StrategyId>,
2814 side: Option<PositionSide>,
2815 ) -> Vec<&Position> {
2816 let position_ids = self.position_ids(venue, instrument_id, strategy_id);
2817 self.get_positions_for_ids(&position_ids, side)
2818 }
2819
2820 #[must_use]
2822 pub fn positions_open(
2823 &self,
2824 venue: Option<&Venue>,
2825 instrument_id: Option<&InstrumentId>,
2826 strategy_id: Option<&StrategyId>,
2827 side: Option<PositionSide>,
2828 ) -> Vec<&Position> {
2829 let position_ids = self.position_open_ids(venue, instrument_id, strategy_id);
2830 self.get_positions_for_ids(&position_ids, side)
2831 }
2832
2833 #[must_use]
2835 pub fn positions_closed(
2836 &self,
2837 venue: Option<&Venue>,
2838 instrument_id: Option<&InstrumentId>,
2839 strategy_id: Option<&StrategyId>,
2840 side: Option<PositionSide>,
2841 ) -> Vec<&Position> {
2842 let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id);
2843 self.get_positions_for_ids(&position_ids, side)
2844 }
2845
2846 #[must_use]
2848 pub fn position_exists(&self, position_id: &PositionId) -> bool {
2849 self.index.positions.contains(position_id)
2850 }
2851
2852 #[must_use]
2854 pub fn is_position_open(&self, position_id: &PositionId) -> bool {
2855 self.index.positions_open.contains(position_id)
2856 }
2857
2858 #[must_use]
2860 pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
2861 self.index.positions_closed.contains(position_id)
2862 }
2863
2864 #[must_use]
2866 pub fn positions_open_count(
2867 &self,
2868 venue: Option<&Venue>,
2869 instrument_id: Option<&InstrumentId>,
2870 strategy_id: Option<&StrategyId>,
2871 side: Option<PositionSide>,
2872 ) -> usize {
2873 self.positions_open(venue, instrument_id, strategy_id, side)
2874 .len()
2875 }
2876
2877 #[must_use]
2879 pub fn positions_closed_count(
2880 &self,
2881 venue: Option<&Venue>,
2882 instrument_id: Option<&InstrumentId>,
2883 strategy_id: Option<&StrategyId>,
2884 side: Option<PositionSide>,
2885 ) -> usize {
2886 self.positions_closed(venue, instrument_id, strategy_id, side)
2887 .len()
2888 }
2889
2890 #[must_use]
2892 pub fn positions_total_count(
2893 &self,
2894 venue: Option<&Venue>,
2895 instrument_id: Option<&InstrumentId>,
2896 strategy_id: Option<&StrategyId>,
2897 side: Option<PositionSide>,
2898 ) -> usize {
2899 self.positions(venue, instrument_id, strategy_id, side)
2900 .len()
2901 }
2902
2903 #[must_use]
2907 pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
2908 self.index.order_strategy.get(client_order_id)
2909 }
2910
2911 #[must_use]
2913 pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
2914 self.index.position_strategy.get(position_id)
2915 }
2916
2917 pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
2925 check_valid_string_ascii(key, stringify!(key))?;
2926
2927 Ok(self.general.get(key))
2928 }
2929
2930 #[must_use]
2934 pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
2935 match price_type {
2936 PriceType::Bid => self
2937 .quotes
2938 .get(instrument_id)
2939 .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
2940 PriceType::Ask => self
2941 .quotes
2942 .get(instrument_id)
2943 .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
2944 PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
2945 quotes.front().map(|quote| {
2946 Price::new(
2947 f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
2948 quote.bid_price.precision + 1,
2949 )
2950 })
2951 }),
2952 PriceType::Last => self
2953 .trades
2954 .get(instrument_id)
2955 .and_then(|trades| trades.front().map(|trade| trade.price)),
2956 PriceType::Mark => self
2957 .mark_prices
2958 .get(instrument_id)
2959 .and_then(|marks| marks.front().map(|mark| mark.value)),
2960 }
2961 }
2962
2963 #[must_use]
2965 pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
2966 self.quotes
2967 .get(instrument_id)
2968 .map(|quotes| quotes.iter().copied().collect())
2969 }
2970
2971 #[must_use]
2973 pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
2974 self.trades
2975 .get(instrument_id)
2976 .map(|trades| trades.iter().copied().collect())
2977 }
2978
2979 #[must_use]
2981 pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
2982 self.mark_prices
2983 .get(instrument_id)
2984 .map(|mark_prices| mark_prices.iter().copied().collect())
2985 }
2986
2987 #[must_use]
2989 pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
2990 self.index_prices
2991 .get(instrument_id)
2992 .map(|index_prices| index_prices.iter().copied().collect())
2993 }
2994
2995 #[must_use]
2997 pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
2998 self.bars
2999 .get(bar_type)
3000 .map(|bars| bars.iter().copied().collect())
3001 }
3002
3003 #[must_use]
3005 pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
3006 self.books.get(instrument_id)
3007 }
3008
3009 #[must_use]
3011 pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
3012 self.books.get_mut(instrument_id)
3013 }
3014
3015 #[must_use]
3017 pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
3018 self.own_books.get(instrument_id)
3019 }
3020
3021 #[must_use]
3023 pub fn own_order_book_mut(
3024 &mut self,
3025 instrument_id: &InstrumentId,
3026 ) -> Option<&mut OwnOrderBook> {
3027 self.own_books.get_mut(instrument_id)
3028 }
3029
3030 #[must_use]
3032 pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
3033 self.quotes
3034 .get(instrument_id)
3035 .and_then(|quotes| quotes.front())
3036 }
3037
3038 #[must_use]
3040 pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
3041 self.trades
3042 .get(instrument_id)
3043 .and_then(|trades| trades.front())
3044 }
3045
3046 #[must_use]
3048 pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
3049 self.mark_prices
3050 .get(instrument_id)
3051 .and_then(|mark_prices| mark_prices.front())
3052 }
3053
3054 #[must_use]
3056 pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
3057 self.index_prices
3058 .get(instrument_id)
3059 .and_then(|index_prices| index_prices.front())
3060 }
3061
3062 #[must_use]
3064 pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
3065 self.funding_rates.get(instrument_id)
3066 }
3067
3068 #[must_use]
3070 pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
3071 self.bars.get(bar_type).and_then(|bars| bars.front())
3072 }
3073
3074 #[must_use]
3076 pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
3077 self.books
3078 .get(instrument_id)
3079 .map_or(0, |book| book.update_count) as usize
3080 }
3081
3082 #[must_use]
3084 pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
3085 self.quotes
3086 .get(instrument_id)
3087 .map_or(0, std::collections::VecDeque::len)
3088 }
3089
3090 #[must_use]
3092 pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
3093 self.trades
3094 .get(instrument_id)
3095 .map_or(0, std::collections::VecDeque::len)
3096 }
3097
3098 #[must_use]
3100 pub fn bar_count(&self, bar_type: &BarType) -> usize {
3101 self.bars
3102 .get(bar_type)
3103 .map_or(0, std::collections::VecDeque::len)
3104 }
3105
3106 #[must_use]
3108 pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
3109 self.books.contains_key(instrument_id)
3110 }
3111
3112 #[must_use]
3114 pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
3115 self.quote_count(instrument_id) > 0
3116 }
3117
3118 #[must_use]
3120 pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
3121 self.trade_count(instrument_id) > 0
3122 }
3123
3124 #[must_use]
3126 pub fn has_bars(&self, bar_type: &BarType) -> bool {
3127 self.bar_count(bar_type) > 0
3128 }
3129
3130 #[must_use]
3131 pub fn get_xrate(
3132 &self,
3133 venue: Venue,
3134 from_currency: Currency,
3135 to_currency: Currency,
3136 price_type: PriceType,
3137 ) -> Option<f64> {
3138 if from_currency == to_currency {
3139 return Some(1.0);
3142 }
3143
3144 let (bid_quote, ask_quote) = self.build_quote_table(&venue);
3145
3146 match get_exchange_rate(
3147 from_currency.code,
3148 to_currency.code,
3149 price_type,
3150 bid_quote,
3151 ask_quote,
3152 ) {
3153 Ok(rate) => rate,
3154 Err(e) => {
3155 log::error!("Failed to calculate xrate: {e}");
3156 None
3157 }
3158 }
3159 }
3160
3161 fn build_quote_table(&self, venue: &Venue) -> (AHashMap<String, f64>, AHashMap<String, f64>) {
3162 let mut bid_quotes = AHashMap::new();
3163 let mut ask_quotes = AHashMap::new();
3164
3165 for instrument_id in self.instruments.keys() {
3166 if instrument_id.venue != *venue {
3167 continue;
3168 }
3169
3170 let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
3171 if let Some(tick) = ticks.front() {
3172 (tick.bid_price, tick.ask_price)
3173 } else {
3174 continue; }
3176 } else {
3177 let bid_bar = self
3178 .bars
3179 .iter()
3180 .find(|(k, _)| {
3181 k.instrument_id() == *instrument_id
3182 && matches!(k.spec().price_type, PriceType::Bid)
3183 })
3184 .map(|(_, v)| v);
3185
3186 let ask_bar = self
3187 .bars
3188 .iter()
3189 .find(|(k, _)| {
3190 k.instrument_id() == *instrument_id
3191 && matches!(k.spec().price_type, PriceType::Ask)
3192 })
3193 .map(|(_, v)| v);
3194
3195 match (bid_bar, ask_bar) {
3196 (Some(bid), Some(ask)) => {
3197 match (bid.front(), ask.front()) {
3198 (Some(bid_bar), Some(ask_bar)) => (bid_bar.close, ask_bar.close),
3199 _ => {
3200 continue;
3202 }
3203 }
3204 }
3205 _ => continue,
3206 }
3207 };
3208
3209 bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
3210 ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
3211 }
3212
3213 (bid_quotes, ask_quotes)
3214 }
3215
3216 #[must_use]
3218 pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
3219 self.mark_xrates.get(&(from_currency, to_currency)).copied()
3220 }
3221
3222 pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
3228 assert!(xrate > 0.0, "xrate was zero");
3229 self.mark_xrates.insert((from_currency, to_currency), xrate);
3230 self.mark_xrates
3231 .insert((to_currency, from_currency), 1.0 / xrate);
3232 }
3233
3234 pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
3236 let _ = self.mark_xrates.remove(&(from_currency, to_currency));
3237 }
3238
3239 pub fn clear_mark_xrates(&mut self) {
3241 self.mark_xrates.clear();
3242 }
3243
3244 #[must_use]
3248 pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
3249 self.instruments.get(instrument_id)
3250 }
3251
3252 #[must_use]
3254 pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
3255 match venue {
3256 Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
3257 None => self.instruments.keys().collect(),
3258 }
3259 }
3260
3261 #[must_use]
3263 pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
3264 self.instruments
3265 .values()
3266 .filter(|i| &i.id().venue == venue)
3267 .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
3268 .collect()
3269 }
3270
3271 #[must_use]
3273 pub fn bar_types(
3274 &self,
3275 instrument_id: Option<&InstrumentId>,
3276 price_type: Option<&PriceType>,
3277 aggregation_source: AggregationSource,
3278 ) -> Vec<&BarType> {
3279 let mut bar_types = self
3280 .bars
3281 .keys()
3282 .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
3283 .collect::<Vec<&BarType>>();
3284
3285 if let Some(instrument_id) = instrument_id {
3286 bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
3287 }
3288
3289 if let Some(price_type) = price_type {
3290 bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
3291 }
3292
3293 bar_types
3294 }
3295
3296 #[must_use]
3300 pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
3301 self.synthetics.get(instrument_id)
3302 }
3303
3304 #[must_use]
3306 pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
3307 self.synthetics.keys().collect()
3308 }
3309
3310 #[must_use]
3312 pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
3313 self.synthetics.values().collect()
3314 }
3315
3316 #[must_use]
3320 pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
3321 self.accounts.get(account_id)
3322 }
3323
3324 #[must_use]
3326 pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
3327 self.index
3328 .venue_account
3329 .get(venue)
3330 .and_then(|account_id| self.accounts.get(account_id))
3331 }
3332
3333 #[must_use]
3335 pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
3336 self.index.venue_account.get(venue)
3337 }
3338
3339 #[must_use]
3341 pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
3342 self.accounts
3343 .values()
3344 .filter(|account| &account.id() == account_id)
3345 .collect()
3346 }
3347
3348 pub fn update_own_order_book(&mut self, order: &OrderAny) {
3356 if !order.has_price() {
3357 return;
3358 }
3359
3360 let instrument_id = order.instrument_id();
3361
3362 let own_book = self
3363 .own_books
3364 .entry(instrument_id)
3365 .or_insert_with(|| OwnOrderBook::new(instrument_id));
3366
3367 let own_book_order = order.to_own_book_order();
3368
3369 if order.is_closed() {
3370 if let Err(e) = own_book.delete(own_book_order) {
3371 log::debug!(
3372 "Failed to delete order {} from own book: {e}",
3373 order.client_order_id(),
3374 );
3375 } else {
3376 log::debug!("Deleted order {} from own book", order.client_order_id());
3377 }
3378 } else {
3379 if let Err(e) = own_book.update(own_book_order) {
3381 log::debug!(
3382 "Failed to update order {} in own book: {e}; inserting instead",
3383 order.client_order_id(),
3384 );
3385 own_book.add(own_book_order);
3386 }
3387 log::debug!("Updated order {} in own book", order.client_order_id());
3388 }
3389 }
3390
3391 pub fn force_remove_from_own_order_book(&mut self, client_order_id: &ClientOrderId) {
3397 let order = match self.orders.get(client_order_id) {
3398 Some(order) => order,
3399 None => return,
3400 };
3401
3402 self.index.orders_open.remove(client_order_id);
3403 self.index.orders_pending_cancel.remove(client_order_id);
3404 self.index.orders_inflight.remove(client_order_id);
3405 self.index.orders_emulated.remove(client_order_id);
3406
3407 if let Some(own_book) = self.own_books.get_mut(&order.instrument_id())
3408 && order.has_price()
3409 {
3410 let own_book_order = order.to_own_book_order();
3411 if let Err(e) = own_book.delete(own_book_order) {
3412 log::debug!("Could not force delete {client_order_id} from own book: {e}");
3413 } else {
3414 log::debug!("Force deleted {client_order_id} from own book");
3415 }
3416 }
3417
3418 self.index.orders_closed.insert(*client_order_id);
3419 }
3420
3421 pub fn audit_own_order_books(&mut self) {
3428 log::debug!("Starting own books audit");
3429 let start = std::time::Instant::now();
3430
3431 let valid_order_ids: AHashSet<ClientOrderId> = self
3434 .index
3435 .orders_open
3436 .union(&self.index.orders_inflight)
3437 .copied()
3438 .collect();
3439
3440 for own_book in self.own_books.values_mut() {
3441 own_book.audit_open_orders(&valid_order_ids);
3442 }
3443
3444 log::debug!("Completed own books audit in {:?}", start.elapsed());
3445 }
3446}