nautilus_common/cache/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! In-memory cache for market and execution data, with optional persistent backing.
17//!
18//! Provides methods to load, query, and update cached data such as instruments, orders, and prices.
19
20pub mod config;
21pub mod database;
22pub mod quote;
23
24mod index;
25
26#[cfg(test)]
27mod tests;
28
29use std::{
30    collections::VecDeque,
31    fmt::Debug,
32    time::{SystemTime, UNIX_EPOCH},
33};
34
35use ahash::{AHashMap, AHashSet};
36use bytes::Bytes;
37pub use config::CacheConfig; // Re-export
38use database::{CacheDatabaseAdapter, CacheMap};
39use index::CacheIndex;
40use nautilus_core::{
41    UUID4, UnixNanos,
42    correctness::{
43        check_key_not_in_map, check_predicate_false, check_slice_not_empty,
44        check_valid_string_ascii,
45    },
46    datetime::secs_to_nanos_unchecked,
47};
48use nautilus_model::{
49    accounts::{Account, AccountAny},
50    data::{
51        Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate, QuoteTick,
52        TradeTick, YieldCurveData,
53    },
54    enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
55    identifiers::{
56        AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
57        OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
58    },
59    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
60    orderbook::{
61        OrderBook,
62        own::{OwnOrderBook, should_handle_own_book_order},
63    },
64    orders::{Order, OrderAny, OrderList},
65    position::Position,
66    types::{Currency, Money, Price, Quantity},
67};
68use ustr::Ustr;
69
70use crate::xrate::get_exchange_rate;
71
72/// A common in-memory `Cache` for market and execution related data.
73#[cfg_attr(
74    feature = "python",
75    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
76)]
77pub struct Cache {
78    config: CacheConfig,
79    index: CacheIndex,
80    database: Option<Box<dyn CacheDatabaseAdapter>>,
81    general: AHashMap<String, Bytes>,
82    currencies: AHashMap<Ustr, Currency>,
83    instruments: AHashMap<InstrumentId, InstrumentAny>,
84    synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
85    books: AHashMap<InstrumentId, OrderBook>,
86    own_books: AHashMap<InstrumentId, OwnOrderBook>,
87    quotes: AHashMap<InstrumentId, VecDeque<QuoteTick>>,
88    trades: AHashMap<InstrumentId, VecDeque<TradeTick>>,
89    mark_xrates: AHashMap<(Currency, Currency), f64>,
90    mark_prices: AHashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
91    index_prices: AHashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
92    funding_rates: AHashMap<InstrumentId, FundingRateUpdate>,
93    bars: AHashMap<BarType, VecDeque<Bar>>,
94    greeks: AHashMap<InstrumentId, GreeksData>,
95    yield_curves: AHashMap<String, YieldCurveData>,
96    accounts: AHashMap<AccountId, AccountAny>,
97    orders: AHashMap<ClientOrderId, OrderAny>,
98    order_lists: AHashMap<OrderListId, OrderList>,
99    positions: AHashMap<PositionId, Position>,
100    position_snapshots: AHashMap<PositionId, Bytes>,
101    #[cfg(feature = "defi")]
102    pub(crate) defi: crate::defi::cache::DefiCache,
103}
104
105impl Debug for Cache {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        f.debug_struct(stringify!(Cache))
108            .field("config", &self.config)
109            .field("index", &self.index)
110            .field("general", &self.general)
111            .field("currencies", &self.currencies)
112            .field("instruments", &self.instruments)
113            .field("synthetics", &self.synthetics)
114            .field("books", &self.books)
115            .field("own_books", &self.own_books)
116            .field("quotes", &self.quotes)
117            .field("trades", &self.trades)
118            .field("mark_xrates", &self.mark_xrates)
119            .field("mark_prices", &self.mark_prices)
120            .field("index_prices", &self.index_prices)
121            .field("funding_rates", &self.funding_rates)
122            .field("bars", &self.bars)
123            .field("greeks", &self.greeks)
124            .field("yield_curves", &self.yield_curves)
125            .field("accounts", &self.accounts)
126            .field("orders", &self.orders)
127            .field("order_lists", &self.order_lists)
128            .field("positions", &self.positions)
129            .field("position_snapshots", &self.position_snapshots)
130            .finish()
131    }
132}
133
134impl Default for Cache {
135    /// Creates a new default [`Cache`] instance.
136    fn default() -> Self {
137        Self::new(Some(CacheConfig::default()), None)
138    }
139}
140
141impl Cache {
142    /// Creates a new [`Cache`] instance with optional configuration and database adapter.
143    #[must_use]
144    /// # Note
145    ///
146    /// Uses provided `CacheConfig` or defaults, and optional `CacheDatabaseAdapter` for persistence.
147    pub fn new(
148        config: Option<CacheConfig>,
149        database: Option<Box<dyn CacheDatabaseAdapter>>,
150    ) -> Self {
151        Self {
152            config: config.unwrap_or_default(),
153            index: CacheIndex::default(),
154            database,
155            general: AHashMap::new(),
156            currencies: AHashMap::new(),
157            instruments: AHashMap::new(),
158            synthetics: AHashMap::new(),
159            books: AHashMap::new(),
160            own_books: AHashMap::new(),
161            quotes: AHashMap::new(),
162            trades: AHashMap::new(),
163            mark_xrates: AHashMap::new(),
164            mark_prices: AHashMap::new(),
165            index_prices: AHashMap::new(),
166            funding_rates: AHashMap::new(),
167            bars: AHashMap::new(),
168            greeks: AHashMap::new(),
169            yield_curves: AHashMap::new(),
170            accounts: AHashMap::new(),
171            orders: AHashMap::new(),
172            order_lists: AHashMap::new(),
173            positions: AHashMap::new(),
174            position_snapshots: AHashMap::new(),
175            #[cfg(feature = "defi")]
176            defi: crate::defi::cache::DefiCache::default(),
177        }
178    }
179
180    /// Returns the cache instances memory address.
181    #[must_use]
182    pub fn memory_address(&self) -> String {
183        format!("{:?}", std::ptr::from_ref(self))
184    }
185
186    // -- COMMANDS --------------------------------------------------------------------------------
187
188    /// Clears and reloads general entries from the database into the cache.
189    ///
190    /// # Errors
191    ///
192    /// Returns an error if loading general cache data fails.
193    pub fn cache_general(&mut self) -> anyhow::Result<()> {
194        self.general = match &mut self.database {
195            Some(db) => db.load()?,
196            None => AHashMap::new(),
197        };
198
199        log::info!(
200            "Cached {} general object(s) from database",
201            self.general.len()
202        );
203        Ok(())
204    }
205
206    /// Loads all core caches (currencies, instruments, accounts, orders, positions) from the database.
207    ///
208    /// # Errors
209    ///
210    /// Returns an error if loading all cache data fails.
211    pub async fn cache_all(&mut self) -> anyhow::Result<()> {
212        let cache_map = match &self.database {
213            Some(db) => db.load_all().await?,
214            None => CacheMap::default(),
215        };
216
217        self.currencies = cache_map.currencies;
218        self.instruments = cache_map.instruments;
219        self.synthetics = cache_map.synthetics;
220        self.accounts = cache_map.accounts;
221        self.orders = cache_map.orders;
222        self.positions = cache_map.positions;
223        Ok(())
224    }
225
226    /// Clears and reloads the currency cache from the database.
227    ///
228    /// # Errors
229    ///
230    /// Returns an error if loading currencies cache fails.
231    pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
232        self.currencies = match &mut self.database {
233            Some(db) => db.load_currencies().await?,
234            None => AHashMap::new(),
235        };
236
237        log::info!("Cached {} currencies from database", self.general.len());
238        Ok(())
239    }
240
241    /// Clears and reloads the instrument cache from the database.
242    ///
243    /// # Errors
244    ///
245    /// Returns an error if loading instruments cache fails.
246    pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
247        self.instruments = match &mut self.database {
248            Some(db) => db.load_instruments().await?,
249            None => AHashMap::new(),
250        };
251
252        log::info!("Cached {} instruments from database", self.general.len());
253        Ok(())
254    }
255
256    /// Clears and reloads the synthetic instrument cache from the database.
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if loading synthetic instruments cache fails.
261    pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
262        self.synthetics = match &mut self.database {
263            Some(db) => db.load_synthetics().await?,
264            None => AHashMap::new(),
265        };
266
267        log::info!(
268            "Cached {} synthetic instruments from database",
269            self.general.len()
270        );
271        Ok(())
272    }
273
274    /// Clears and reloads the account cache from the database.
275    ///
276    /// # Errors
277    ///
278    /// Returns an error if loading accounts cache fails.
279    pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
280        self.accounts = match &mut self.database {
281            Some(db) => db.load_accounts().await?,
282            None => AHashMap::new(),
283        };
284
285        log::info!(
286            "Cached {} synthetic instruments from database",
287            self.general.len()
288        );
289        Ok(())
290    }
291
292    /// Clears and reloads the order cache from the database.
293    ///
294    /// # Errors
295    ///
296    /// Returns an error if loading orders cache fails.
297    pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
298        self.orders = match &mut self.database {
299            Some(db) => db.load_orders().await?,
300            None => AHashMap::new(),
301        };
302
303        log::info!("Cached {} orders from database", self.general.len());
304        Ok(())
305    }
306
307    /// Clears and reloads the position cache from the database.
308    ///
309    /// # Errors
310    ///
311    /// Returns an error if loading positions cache fails.
312    pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
313        self.positions = match &mut self.database {
314            Some(db) => db.load_positions().await?,
315            None => AHashMap::new(),
316        };
317
318        log::info!("Cached {} positions from database", self.general.len());
319        Ok(())
320    }
321
322    /// Clears the current cache index and re-build.
323    pub fn build_index(&mut self) {
324        log::debug!("Building index");
325
326        // Index accounts
327        for account_id in self.accounts.keys() {
328            self.index
329                .venue_account
330                .insert(account_id.get_issuer(), *account_id);
331        }
332
333        // Index orders
334        for (client_order_id, order) in &self.orders {
335            let instrument_id = order.instrument_id();
336            let venue = instrument_id.venue;
337            let strategy_id = order.strategy_id();
338
339            // 1: Build index.venue_orders -> {Venue, {ClientOrderId}}
340            self.index
341                .venue_orders
342                .entry(venue)
343                .or_default()
344                .insert(*client_order_id);
345
346            // 2: Build index.order_ids -> {VenueOrderId, ClientOrderId}
347            if let Some(venue_order_id) = order.venue_order_id() {
348                self.index
349                    .venue_order_ids
350                    .insert(venue_order_id, *client_order_id);
351            }
352
353            // 3: Build index.order_position -> {ClientOrderId, PositionId}
354            if let Some(position_id) = order.position_id() {
355                self.index
356                    .order_position
357                    .insert(*client_order_id, position_id);
358            }
359
360            // 4: Build index.order_strategy -> {ClientOrderId, StrategyId}
361            self.index
362                .order_strategy
363                .insert(*client_order_id, order.strategy_id());
364
365            // 5: Build index.instrument_orders -> {InstrumentId, {ClientOrderId}}
366            self.index
367                .instrument_orders
368                .entry(instrument_id)
369                .or_default()
370                .insert(*client_order_id);
371
372            // 6: Build index.strategy_orders -> {StrategyId, {ClientOrderId}}
373            self.index
374                .strategy_orders
375                .entry(strategy_id)
376                .or_default()
377                .insert(*client_order_id);
378
379            // 7: Build index.exec_algorithm_orders -> {ExecAlgorithmId, {ClientOrderId}}
380            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
381                self.index
382                    .exec_algorithm_orders
383                    .entry(exec_algorithm_id)
384                    .or_default()
385                    .insert(*client_order_id);
386            }
387
388            // 8: Build index.exec_spawn_orders -> {ClientOrderId, {ClientOrderId}}
389            if let Some(exec_spawn_id) = order.exec_spawn_id() {
390                self.index
391                    .exec_spawn_orders
392                    .entry(exec_spawn_id)
393                    .or_default()
394                    .insert(*client_order_id);
395            }
396
397            // 9: Build index.orders -> {ClientOrderId}
398            self.index.orders.insert(*client_order_id);
399
400            // 10: Build index.orders_open -> {ClientOrderId}
401            if order.is_open() {
402                self.index.orders_open.insert(*client_order_id);
403            }
404
405            // 11: Build index.orders_closed -> {ClientOrderId}
406            if order.is_closed() {
407                self.index.orders_closed.insert(*client_order_id);
408            }
409
410            // 12: Build index.orders_emulated -> {ClientOrderId}
411            if let Some(emulation_trigger) = order.emulation_trigger()
412                && emulation_trigger != TriggerType::NoTrigger
413                && !order.is_closed()
414            {
415                self.index.orders_emulated.insert(*client_order_id);
416            }
417
418            // 13: Build index.orders_inflight -> {ClientOrderId}
419            if order.is_inflight() {
420                self.index.orders_inflight.insert(*client_order_id);
421            }
422
423            // 14: Build index.strategies -> {StrategyId}
424            self.index.strategies.insert(strategy_id);
425
426            // 15: Build index.strategies -> {ExecAlgorithmId}
427            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
428                self.index.exec_algorithms.insert(exec_algorithm_id);
429            }
430        }
431
432        // Index positions
433        for (position_id, position) in &self.positions {
434            let instrument_id = position.instrument_id;
435            let venue = instrument_id.venue;
436            let strategy_id = position.strategy_id;
437
438            // 1: Build index.venue_positions -> {Venue, {PositionId}}
439            self.index
440                .venue_positions
441                .entry(venue)
442                .or_default()
443                .insert(*position_id);
444
445            // 2: Build index.position_strategy -> {PositionId, StrategyId}
446            self.index
447                .position_strategy
448                .insert(*position_id, position.strategy_id);
449
450            // 3: Build index.position_orders -> {PositionId, {ClientOrderId}}
451            self.index
452                .position_orders
453                .entry(*position_id)
454                .or_default()
455                .extend(position.client_order_ids().into_iter());
456
457            // 4: Build index.instrument_positions -> {InstrumentId, {PositionId}}
458            self.index
459                .instrument_positions
460                .entry(instrument_id)
461                .or_default()
462                .insert(*position_id);
463
464            // 5: Build index.strategy_positions -> {StrategyId, {PositionId}}
465            self.index
466                .strategy_positions
467                .entry(strategy_id)
468                .or_default()
469                .insert(*position_id);
470
471            // 6: Build index.positions -> {PositionId}
472            self.index.positions.insert(*position_id);
473
474            // 7: Build index.positions_open -> {PositionId}
475            if position.is_open() {
476                self.index.positions_open.insert(*position_id);
477            }
478
479            // 8: Build index.positions_closed -> {PositionId}
480            if position.is_closed() {
481                self.index.positions_closed.insert(*position_id);
482            }
483
484            // 9: Build index.strategies -> {StrategyId}
485            self.index.strategies.insert(strategy_id);
486        }
487    }
488
489    /// Returns whether the cache has a backing database.
490    #[must_use]
491    pub const fn has_backing(&self) -> bool {
492        self.config.database.is_some()
493    }
494
495    // Calculate the unrealized profit and loss (PnL) for `position`.
496    #[must_use]
497    pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
498        let quote = if let Some(quote) = self.quote(&position.instrument_id) {
499            quote
500        } else {
501            log::warn!(
502                "Cannot calculate unrealized PnL for {}, no quotes for {}",
503                position.id,
504                position.instrument_id
505            );
506            return None;
507        };
508
509        // Use exit price for mark-to-market: longs exit at bid, shorts exit at ask
510        let last = match position.side {
511            PositionSide::Flat | PositionSide::NoPositionSide => {
512                return Some(Money::new(0.0, position.settlement_currency));
513            }
514            PositionSide::Long => quote.bid_price,
515            PositionSide::Short => quote.ask_price,
516        };
517
518        Some(position.unrealized_pnl(last))
519    }
520
521    /// Checks integrity of data within the cache.
522    ///
523    /// All data should be loaded from the database prior to this call.
524    /// If an error is found then a log error message will also be produced.
525    ///
526    /// # Panics
527    ///
528    /// Panics if failure calling system clock.
529    #[must_use]
530    pub fn check_integrity(&mut self) -> bool {
531        let mut error_count = 0;
532        let failure = "Integrity failure";
533
534        // Get current timestamp in microseconds
535        let timestamp_us = SystemTime::now()
536            .duration_since(UNIX_EPOCH)
537            .expect("Time went backwards")
538            .as_micros();
539
540        log::info!("Checking data integrity");
541
542        // Check object caches
543        for account_id in self.accounts.keys() {
544            if !self
545                .index
546                .venue_account
547                .contains_key(&account_id.get_issuer())
548            {
549                log::error!(
550                    "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
551                );
552                error_count += 1;
553            }
554        }
555
556        for (client_order_id, order) in &self.orders {
557            if !self.index.order_strategy.contains_key(client_order_id) {
558                log::error!(
559                    "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
560                );
561                error_count += 1;
562            }
563            if !self.index.orders.contains(client_order_id) {
564                log::error!(
565                    "{failure} in orders: {client_order_id} not found in `self.index.orders`",
566                );
567                error_count += 1;
568            }
569            if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
570                log::error!(
571                    "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
572                );
573                error_count += 1;
574            }
575            if order.is_open() && !self.index.orders_open.contains(client_order_id) {
576                log::error!(
577                    "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
578                );
579                error_count += 1;
580            }
581            if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
582                log::error!(
583                    "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
584                );
585                error_count += 1;
586            }
587            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
588                if !self
589                    .index
590                    .exec_algorithm_orders
591                    .contains_key(&exec_algorithm_id)
592                {
593                    log::error!(
594                        "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
595                    );
596                    error_count += 1;
597                }
598                if order.exec_spawn_id().is_none()
599                    && !self.index.exec_spawn_orders.contains_key(client_order_id)
600                {
601                    log::error!(
602                        "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
603                    );
604                    error_count += 1;
605                }
606            }
607        }
608
609        for (position_id, position) in &self.positions {
610            if !self.index.position_strategy.contains_key(position_id) {
611                log::error!(
612                    "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
613                );
614                error_count += 1;
615            }
616            if !self.index.position_orders.contains_key(position_id) {
617                log::error!(
618                    "{failure} in positions: {position_id} not found in `self.index.position_orders`",
619                );
620                error_count += 1;
621            }
622            if !self.index.positions.contains(position_id) {
623                log::error!(
624                    "{failure} in positions: {position_id} not found in `self.index.positions`",
625                );
626                error_count += 1;
627            }
628            if position.is_open() && !self.index.positions_open.contains(position_id) {
629                log::error!(
630                    "{failure} in positions: {position_id} not found in `self.index.positions_open`",
631                );
632                error_count += 1;
633            }
634            if position.is_closed() && !self.index.positions_closed.contains(position_id) {
635                log::error!(
636                    "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
637                );
638                error_count += 1;
639            }
640        }
641
642        // Check indexes
643        for account_id in self.index.venue_account.values() {
644            if !self.accounts.contains_key(account_id) {
645                log::error!(
646                    "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
647                );
648                error_count += 1;
649            }
650        }
651
652        for client_order_id in self.index.venue_order_ids.values() {
653            if !self.orders.contains_key(client_order_id) {
654                log::error!(
655                    "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
656                );
657                error_count += 1;
658            }
659        }
660
661        for client_order_id in self.index.client_order_ids.keys() {
662            if !self.orders.contains_key(client_order_id) {
663                log::error!(
664                    "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
665                );
666                error_count += 1;
667            }
668        }
669
670        for client_order_id in self.index.order_position.keys() {
671            if !self.orders.contains_key(client_order_id) {
672                log::error!(
673                    "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
674                );
675                error_count += 1;
676            }
677        }
678
679        // Check indexes
680        for client_order_id in self.index.order_strategy.keys() {
681            if !self.orders.contains_key(client_order_id) {
682                log::error!(
683                    "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
684                );
685                error_count += 1;
686            }
687        }
688
689        for position_id in self.index.position_strategy.keys() {
690            if !self.positions.contains_key(position_id) {
691                log::error!(
692                    "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
693                );
694                error_count += 1;
695            }
696        }
697
698        for position_id in self.index.position_orders.keys() {
699            if !self.positions.contains_key(position_id) {
700                log::error!(
701                    "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
702                );
703                error_count += 1;
704            }
705        }
706
707        for (instrument_id, client_order_ids) in &self.index.instrument_orders {
708            for client_order_id in client_order_ids {
709                if !self.orders.contains_key(client_order_id) {
710                    log::error!(
711                        "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
712                    );
713                    error_count += 1;
714                }
715            }
716        }
717
718        for instrument_id in self.index.instrument_positions.keys() {
719            if !self.index.instrument_orders.contains_key(instrument_id) {
720                log::error!(
721                    "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
722                );
723                error_count += 1;
724            }
725        }
726
727        for client_order_ids in self.index.strategy_orders.values() {
728            for client_order_id in client_order_ids {
729                if !self.orders.contains_key(client_order_id) {
730                    log::error!(
731                        "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
732                    );
733                    error_count += 1;
734                }
735            }
736        }
737
738        for position_ids in self.index.strategy_positions.values() {
739            for position_id in position_ids {
740                if !self.positions.contains_key(position_id) {
741                    log::error!(
742                        "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
743                    );
744                    error_count += 1;
745                }
746            }
747        }
748
749        for client_order_id in &self.index.orders {
750            if !self.orders.contains_key(client_order_id) {
751                log::error!(
752                    "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
753                );
754                error_count += 1;
755            }
756        }
757
758        for client_order_id in &self.index.orders_emulated {
759            if !self.orders.contains_key(client_order_id) {
760                log::error!(
761                    "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
762                );
763                error_count += 1;
764            }
765        }
766
767        for client_order_id in &self.index.orders_inflight {
768            if !self.orders.contains_key(client_order_id) {
769                log::error!(
770                    "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
771                );
772                error_count += 1;
773            }
774        }
775
776        for client_order_id in &self.index.orders_open {
777            if !self.orders.contains_key(client_order_id) {
778                log::error!(
779                    "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
780                );
781                error_count += 1;
782            }
783        }
784
785        for client_order_id in &self.index.orders_closed {
786            if !self.orders.contains_key(client_order_id) {
787                log::error!(
788                    "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
789                );
790                error_count += 1;
791            }
792        }
793
794        for position_id in &self.index.positions {
795            if !self.positions.contains_key(position_id) {
796                log::error!(
797                    "{failure} in `index.positions`: {position_id} not found in `self.positions`",
798                );
799                error_count += 1;
800            }
801        }
802
803        for position_id in &self.index.positions_open {
804            if !self.positions.contains_key(position_id) {
805                log::error!(
806                    "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
807                );
808                error_count += 1;
809            }
810        }
811
812        for position_id in &self.index.positions_closed {
813            if !self.positions.contains_key(position_id) {
814                log::error!(
815                    "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
816                );
817                error_count += 1;
818            }
819        }
820
821        for strategy_id in &self.index.strategies {
822            if !self.index.strategy_orders.contains_key(strategy_id) {
823                log::error!(
824                    "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
825                );
826                error_count += 1;
827            }
828        }
829
830        for exec_algorithm_id in &self.index.exec_algorithms {
831            if !self
832                .index
833                .exec_algorithm_orders
834                .contains_key(exec_algorithm_id)
835            {
836                log::error!(
837                    "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
838                );
839                error_count += 1;
840            }
841        }
842
843        let total_us = SystemTime::now()
844            .duration_since(UNIX_EPOCH)
845            .expect("Time went backwards")
846            .as_micros()
847            - timestamp_us;
848
849        if error_count == 0 {
850            log::info!("Integrity check passed in {total_us}μs");
851            true
852        } else {
853            log::error!(
854                "Integrity check failed with {error_count} error{} in {total_us}μs",
855                if error_count == 1 { "" } else { "s" },
856            );
857            false
858        }
859    }
860
861    /// Checks for any residual open state and log warnings if any are found.
862    ///
863    ///'Open state' is considered to be open orders and open positions.
864    #[must_use]
865    pub fn check_residuals(&self) -> bool {
866        log::debug!("Checking residuals");
867
868        let mut residuals = false;
869
870        // Check for any open orders
871        for order in self.orders_open(None, None, None, None) {
872            residuals = true;
873            log::warn!("Residual {order}");
874        }
875
876        // Check for any open positions
877        for position in self.positions_open(None, None, None, None) {
878            residuals = true;
879            log::warn!("Residual {position}");
880        }
881
882        residuals
883    }
884
885    /// Purges all closed orders from the cache that are older than `buffer_secs`.
886    ///
887    ///
888    /// Only orders that have been closed for at least this amount of time will be purged.
889    /// A value of 0 means purge all closed orders regardless of when they were closed.
890    pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
891        log::debug!(
892            "Purging closed orders{}",
893            if buffer_secs > 0 {
894                format!(" with buffer_secs={buffer_secs}")
895            } else {
896                String::new()
897            }
898        );
899
900        let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
901
902        'outer: for client_order_id in self.index.orders_closed.clone() {
903            if let Some(order) = self.orders.get(&client_order_id)
904                && order.is_closed()
905                && let Some(ts_closed) = order.ts_closed()
906                && ts_closed + buffer_ns <= ts_now
907            {
908                // Check any linked orders (contingency orders)
909                if let Some(linked_order_ids) = order.linked_order_ids() {
910                    for linked_order_id in linked_order_ids {
911                        if let Some(linked_order) = self.orders.get(linked_order_id)
912                            && linked_order.is_open()
913                        {
914                            // Do not purge if linked order still open
915                            continue 'outer;
916                        }
917                    }
918                }
919
920                self.purge_order(client_order_id);
921            }
922        }
923    }
924
925    /// Purges all closed positions from the cache that are older than `buffer_secs`.
926    pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
927        log::debug!(
928            "Purging closed positions{}",
929            if buffer_secs > 0 {
930                format!(" with buffer_secs={buffer_secs}")
931            } else {
932                String::new()
933            }
934        );
935
936        let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
937
938        for position_id in self.index.positions_closed.clone() {
939            if let Some(position) = self.positions.get(&position_id)
940                && position.is_closed()
941                && let Some(ts_closed) = position.ts_closed
942                && ts_closed + buffer_ns <= ts_now
943            {
944                self.purge_position(position_id);
945            }
946        }
947    }
948
949    /// Purges the order with the `client_order_id` from the cache (if found).
950    ///
951    /// For safety, an order is prevented from being purged if it's open.
952    pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
953        // Check if order exists and is safe to purge before removing
954        let order = self.orders.get(&client_order_id).cloned();
955
956        // SAFETY: Prevent purging open orders
957        if let Some(ref ord) = order
958            && ord.is_open()
959        {
960            log::warn!("Order {client_order_id} found open when purging, skipping purge");
961            return;
962        }
963
964        // If order exists in cache, remove it and clean up order-specific indices
965        if let Some(ref ord) = order {
966            // Safe to purge
967            self.orders.remove(&client_order_id);
968
969            // Remove order from venue index
970            if let Some(venue_orders) = self.index.venue_orders.get_mut(&ord.instrument_id().venue)
971            {
972                venue_orders.remove(&client_order_id);
973            }
974
975            // Remove venue order ID index if exists
976            if let Some(venue_order_id) = ord.venue_order_id() {
977                self.index.venue_order_ids.remove(&venue_order_id);
978            }
979
980            // Remove from instrument orders index
981            if let Some(instrument_orders) =
982                self.index.instrument_orders.get_mut(&ord.instrument_id())
983            {
984                instrument_orders.remove(&client_order_id);
985            }
986
987            // Remove from position orders index if associated with a position
988            if let Some(position_id) = ord.position_id()
989                && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
990            {
991                position_orders.remove(&client_order_id);
992            }
993
994            // Remove from exec algorithm orders index if it has an exec algorithm
995            if let Some(exec_algorithm_id) = ord.exec_algorithm_id()
996                && let Some(exec_algorithm_orders) =
997                    self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
998            {
999                exec_algorithm_orders.remove(&client_order_id);
1000            }
1001
1002            // Clean up strategy orders reverse index
1003            if let Some(strategy_orders) = self.index.strategy_orders.get_mut(&ord.strategy_id()) {
1004                strategy_orders.remove(&client_order_id);
1005                if strategy_orders.is_empty() {
1006                    self.index.strategy_orders.remove(&ord.strategy_id());
1007                }
1008            }
1009
1010            // Clean up exec spawn reverse index (if this order is a spawned child)
1011            if let Some(exec_spawn_id) = ord.exec_spawn_id()
1012                && let Some(spawn_orders) = self.index.exec_spawn_orders.get_mut(&exec_spawn_id)
1013            {
1014                spawn_orders.remove(&client_order_id);
1015                if spawn_orders.is_empty() {
1016                    self.index.exec_spawn_orders.remove(&exec_spawn_id);
1017                }
1018            }
1019
1020            log::info!("Purged order {client_order_id}");
1021        } else {
1022            log::warn!("Order {client_order_id} not found when purging");
1023        }
1024
1025        // Always clean up order indices (even if order was not in cache)
1026        self.index.order_position.remove(&client_order_id);
1027        let strategy_id = self.index.order_strategy.remove(&client_order_id);
1028        self.index.order_client.remove(&client_order_id);
1029        self.index.client_order_ids.remove(&client_order_id);
1030
1031        // Clean up reverse index when order not in cache (using forward index)
1032        if let Some(strategy_id) = strategy_id
1033            && let Some(strategy_orders) = self.index.strategy_orders.get_mut(&strategy_id)
1034        {
1035            strategy_orders.remove(&client_order_id);
1036            if strategy_orders.is_empty() {
1037                self.index.strategy_orders.remove(&strategy_id);
1038            }
1039        }
1040
1041        // Remove spawn parent entry if this order was a spawn root
1042        self.index.exec_spawn_orders.remove(&client_order_id);
1043
1044        self.index.orders.remove(&client_order_id);
1045        self.index.orders_closed.remove(&client_order_id);
1046        self.index.orders_emulated.remove(&client_order_id);
1047        self.index.orders_inflight.remove(&client_order_id);
1048        self.index.orders_pending_cancel.remove(&client_order_id);
1049    }
1050
1051    /// Purges the position with the `position_id` from the cache (if found).
1052    ///
1053    /// For safety, a position is prevented from being purged if it's open.
1054    pub fn purge_position(&mut self, position_id: PositionId) {
1055        // Check if position exists and is safe to purge before removing
1056        let position = self.positions.get(&position_id).cloned();
1057
1058        // SAFETY: Prevent purging open positions
1059        if let Some(ref pos) = position
1060            && pos.is_open()
1061        {
1062            log::warn!("Position {position_id} found open when purging, skipping purge");
1063            return;
1064        }
1065
1066        // If position exists in cache, remove it and clean up position-specific indices
1067        if let Some(ref pos) = position {
1068            self.positions.remove(&position_id);
1069
1070            // Remove from venue positions index
1071            if let Some(venue_positions) =
1072                self.index.venue_positions.get_mut(&pos.instrument_id.venue)
1073            {
1074                venue_positions.remove(&position_id);
1075            }
1076
1077            // Remove from instrument positions index
1078            if let Some(instrument_positions) =
1079                self.index.instrument_positions.get_mut(&pos.instrument_id)
1080            {
1081                instrument_positions.remove(&position_id);
1082            }
1083
1084            // Remove from strategy positions index
1085            if let Some(strategy_positions) =
1086                self.index.strategy_positions.get_mut(&pos.strategy_id)
1087            {
1088                strategy_positions.remove(&position_id);
1089            }
1090
1091            // Remove position ID from orders that reference it
1092            for client_order_id in pos.client_order_ids() {
1093                self.index.order_position.remove(&client_order_id);
1094            }
1095
1096            log::info!("Purged position {position_id}");
1097        } else {
1098            log::warn!("Position {position_id} not found when purging");
1099        }
1100
1101        // Always clean up position indices (even if position not in cache)
1102        self.index.position_strategy.remove(&position_id);
1103        self.index.position_orders.remove(&position_id);
1104        self.index.positions.remove(&position_id);
1105        self.index.positions_open.remove(&position_id);
1106        self.index.positions_closed.remove(&position_id);
1107
1108        // Always clean up position snapshots (even if position not in cache)
1109        self.position_snapshots.remove(&position_id);
1110    }
1111
1112    /// Purges all account state events which are outside the lookback window.
1113    ///
1114    /// Only events which are outside the lookback window will be purged.
1115    /// A value of 0 means purge all account state events.
1116    pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1117        log::debug!(
1118            "Purging account events{}",
1119            if lookback_secs > 0 {
1120                format!(" with lookback_secs={lookback_secs}")
1121            } else {
1122                String::new()
1123            }
1124        );
1125
1126        for account in self.accounts.values_mut() {
1127            let event_count = account.event_count();
1128            account.purge_account_events(ts_now, lookback_secs);
1129            let count_diff = event_count - account.event_count();
1130            if count_diff > 0 {
1131                log::info!(
1132                    "Purged {} event(s) from account {}",
1133                    count_diff,
1134                    account.id()
1135                );
1136            }
1137        }
1138    }
1139
1140    /// Clears the caches index.
1141    pub fn clear_index(&mut self) {
1142        self.index.clear();
1143        log::debug!("Cleared index");
1144    }
1145
1146    /// Resets the cache.
1147    ///
1148    /// All stateful fields are reset to their initial value.
1149    pub fn reset(&mut self) {
1150        log::debug!("Resetting cache");
1151
1152        self.general.clear();
1153        self.currencies.clear();
1154        self.instruments.clear();
1155        self.synthetics.clear();
1156        self.books.clear();
1157        self.own_books.clear();
1158        self.quotes.clear();
1159        self.trades.clear();
1160        self.mark_xrates.clear();
1161        self.mark_prices.clear();
1162        self.index_prices.clear();
1163        self.bars.clear();
1164        self.accounts.clear();
1165        self.orders.clear();
1166        self.order_lists.clear();
1167        self.positions.clear();
1168        self.position_snapshots.clear();
1169        self.greeks.clear();
1170        self.yield_curves.clear();
1171
1172        #[cfg(feature = "defi")]
1173        {
1174            self.defi.pools.clear();
1175            self.defi.pool_profilers.clear();
1176        }
1177
1178        self.clear_index();
1179
1180        log::info!("Reset cache");
1181    }
1182
1183    /// Dispose of the cache which will close any underlying database adapter.
1184    ///
1185    /// If closing the database connection fails, an error is logged.
1186    pub fn dispose(&mut self) {
1187        if let Some(database) = &mut self.database
1188            && let Err(e) = database.close()
1189        {
1190            log::error!("Failed to close database during dispose: {e}");
1191        }
1192    }
1193
1194    /// Flushes the caches database which permanently removes all persisted data.
1195    ///
1196    /// If flushing the database connection fails, an error is logged.
1197    pub fn flush_db(&mut self) {
1198        if let Some(database) = &mut self.database
1199            && let Err(e) = database.flush()
1200        {
1201            log::error!("Failed to flush database: {e}");
1202        }
1203    }
1204
1205    /// Adds a raw bytes `value` to the cache under the `key`.
1206    ///
1207    /// The cache stores only raw bytes; interpretation is the caller's responsibility.
1208    ///
1209    /// # Errors
1210    ///
1211    /// Returns an error if persisting the entry to the backing database fails.
1212    pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1213        check_valid_string_ascii(key, stringify!(key))?;
1214        check_predicate_false(value.is_empty(), stringify!(value))?;
1215
1216        log::debug!("Adding general {key}");
1217        self.general.insert(key.to_string(), value.clone());
1218
1219        if let Some(database) = &mut self.database {
1220            database.add(key.to_string(), value)?;
1221        }
1222        Ok(())
1223    }
1224
1225    /// Adds an `OrderBook` to the cache.
1226    ///
1227    /// # Errors
1228    ///
1229    /// Returns an error if persisting the order book to the backing database fails.
1230    pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1231        log::debug!("Adding `OrderBook` {}", book.instrument_id);
1232
1233        if self.config.save_market_data
1234            && let Some(database) = &mut self.database
1235        {
1236            database.add_order_book(&book)?;
1237        }
1238
1239        self.books.insert(book.instrument_id, book);
1240        Ok(())
1241    }
1242
1243    /// Adds an `OwnOrderBook` to the cache.
1244    ///
1245    /// # Errors
1246    ///
1247    /// Returns an error if persisting the own order book fails.
1248    pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1249        log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1250
1251        self.own_books.insert(own_book.instrument_id, own_book);
1252        Ok(())
1253    }
1254
1255    /// Adds the `mark_price` update to the cache.
1256    ///
1257    /// # Errors
1258    ///
1259    /// Returns an error if persisting the mark price to the backing database fails.
1260    pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1261        log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1262
1263        if self.config.save_market_data {
1264            // TODO: Placeholder and return Result for consistency
1265        }
1266
1267        let mark_prices_deque = self
1268            .mark_prices
1269            .entry(mark_price.instrument_id)
1270            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1271        mark_prices_deque.push_front(mark_price);
1272        Ok(())
1273    }
1274
1275    /// Adds the `index_price` update to the cache.
1276    ///
1277    /// # Errors
1278    ///
1279    /// Returns an error if persisting the index price to the backing database fails.
1280    pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1281        log::debug!(
1282            "Adding `IndexPriceUpdate` for {}",
1283            index_price.instrument_id
1284        );
1285
1286        if self.config.save_market_data {
1287            // TODO: Placeholder and return Result for consistency
1288        }
1289
1290        let index_prices_deque = self
1291            .index_prices
1292            .entry(index_price.instrument_id)
1293            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1294        index_prices_deque.push_front(index_price);
1295        Ok(())
1296    }
1297
1298    /// Adds the `funding_rate` update to the cache.
1299    ///
1300    /// # Errors
1301    ///
1302    /// Returns an error if persisting the funding rate update to the backing database fails.
1303    pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
1304        log::debug!(
1305            "Adding `FundingRateUpdate` for {}",
1306            funding_rate.instrument_id
1307        );
1308
1309        if self.config.save_market_data {
1310            // TODO: Placeholder and return Result for consistency
1311        }
1312
1313        self.funding_rates
1314            .insert(funding_rate.instrument_id, funding_rate);
1315        Ok(())
1316    }
1317
1318    /// Adds the `quote` tick to the cache.
1319    ///
1320    /// # Errors
1321    ///
1322    /// Returns an error if persisting the quote tick to the backing database fails.
1323    pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1324        log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1325
1326        if self.config.save_market_data
1327            && let Some(database) = &mut self.database
1328        {
1329            database.add_quote(&quote)?;
1330        }
1331
1332        let quotes_deque = self
1333            .quotes
1334            .entry(quote.instrument_id)
1335            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1336        quotes_deque.push_front(quote);
1337        Ok(())
1338    }
1339
1340    /// Adds the `quotes` to the cache.
1341    ///
1342    /// # Errors
1343    ///
1344    /// Returns an error if persisting the quote ticks to the backing database fails.
1345    pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1346        check_slice_not_empty(quotes, stringify!(quotes))?;
1347
1348        let instrument_id = quotes[0].instrument_id;
1349        log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1350
1351        if self.config.save_market_data
1352            && let Some(database) = &mut self.database
1353        {
1354            for quote in quotes {
1355                database.add_quote(quote)?;
1356            }
1357        }
1358
1359        let quotes_deque = self
1360            .quotes
1361            .entry(instrument_id)
1362            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1363
1364        for quote in quotes {
1365            quotes_deque.push_front(*quote);
1366        }
1367        Ok(())
1368    }
1369
1370    /// Adds the `trade` tick to the cache.
1371    ///
1372    /// # Errors
1373    ///
1374    /// Returns an error if persisting the trade tick to the backing database fails.
1375    pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1376        log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1377
1378        if self.config.save_market_data
1379            && let Some(database) = &mut self.database
1380        {
1381            database.add_trade(&trade)?;
1382        }
1383
1384        let trades_deque = self
1385            .trades
1386            .entry(trade.instrument_id)
1387            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1388        trades_deque.push_front(trade);
1389        Ok(())
1390    }
1391
1392    /// Adds the give `trades` to the cache.
1393    ///
1394    /// # Errors
1395    ///
1396    /// Returns an error if persisting the trade ticks to the backing database fails.
1397    pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1398        check_slice_not_empty(trades, stringify!(trades))?;
1399
1400        let instrument_id = trades[0].instrument_id;
1401        log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1402
1403        if self.config.save_market_data
1404            && let Some(database) = &mut self.database
1405        {
1406            for trade in trades {
1407                database.add_trade(trade)?;
1408            }
1409        }
1410
1411        let trades_deque = self
1412            .trades
1413            .entry(instrument_id)
1414            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1415
1416        for trade in trades {
1417            trades_deque.push_front(*trade);
1418        }
1419        Ok(())
1420    }
1421
1422    /// Adds the `bar` to the cache.
1423    ///
1424    /// # Errors
1425    ///
1426    /// Returns an error if persisting the bar to the backing database fails.
1427    pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1428        log::debug!("Adding `Bar` {}", bar.bar_type);
1429
1430        if self.config.save_market_data
1431            && let Some(database) = &mut self.database
1432        {
1433            database.add_bar(&bar)?;
1434        }
1435
1436        let bars = self
1437            .bars
1438            .entry(bar.bar_type)
1439            .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1440        bars.push_front(bar);
1441        Ok(())
1442    }
1443
1444    /// Adds the `bars` to the cache.
1445    ///
1446    /// # Errors
1447    ///
1448    /// Returns an error if persisting the bars to the backing database fails.
1449    pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1450        check_slice_not_empty(bars, stringify!(bars))?;
1451
1452        let bar_type = bars[0].bar_type;
1453        log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1454
1455        if self.config.save_market_data
1456            && let Some(database) = &mut self.database
1457        {
1458            for bar in bars {
1459                database.add_bar(bar)?;
1460            }
1461        }
1462
1463        let bars_deque = self
1464            .bars
1465            .entry(bar_type)
1466            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1467
1468        for bar in bars {
1469            bars_deque.push_front(*bar);
1470        }
1471        Ok(())
1472    }
1473
1474    /// Adds the `greeks` data to the cache.
1475    ///
1476    /// # Errors
1477    ///
1478    /// Returns an error if persisting the greeks data to the backing database fails.
1479    pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1480        log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1481
1482        if self.config.save_market_data
1483            && let Some(_database) = &mut self.database
1484        {
1485            // TODO: Implement database.add_greeks(&greeks) when database adapter is updated
1486        }
1487
1488        self.greeks.insert(greeks.instrument_id, greeks);
1489        Ok(())
1490    }
1491
1492    /// Gets the greeks data for the `instrument_id`.
1493    pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1494        self.greeks.get(instrument_id).cloned()
1495    }
1496
1497    /// Adds the `yield_curve` data to the cache.
1498    ///
1499    /// # Errors
1500    ///
1501    /// Returns an error if persisting the yield curve data to the backing database fails.
1502    pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1503        log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1504
1505        if self.config.save_market_data
1506            && let Some(_database) = &mut self.database
1507        {
1508            // TODO: Implement database.add_yield_curve(&yield_curve) when database adapter is updated
1509        }
1510
1511        self.yield_curves
1512            .insert(yield_curve.curve_name.clone(), yield_curve);
1513        Ok(())
1514    }
1515
1516    /// Gets the yield curve for the `key`.
1517    pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1518        self.yield_curves.get(key).map(|curve| {
1519            let curve_clone = curve.clone();
1520            Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
1521                as Box<dyn Fn(f64) -> f64>
1522        })
1523    }
1524
1525    /// Adds the `currency` to the cache.
1526    ///
1527    /// # Errors
1528    ///
1529    /// Returns an error if persisting the currency to the backing database fails.
1530    pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1531        log::debug!("Adding `Currency` {}", currency.code);
1532
1533        if let Some(database) = &mut self.database {
1534            database.add_currency(&currency)?;
1535        }
1536
1537        self.currencies.insert(currency.code, currency);
1538        Ok(())
1539    }
1540
1541    /// Adds the `instrument` to the cache.
1542    ///
1543    /// # Errors
1544    ///
1545    /// Returns an error if persisting the instrument to the backing database fails.
1546    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1547        log::debug!("Adding `Instrument` {}", instrument.id());
1548
1549        if let Some(database) = &mut self.database {
1550            database.add_instrument(&instrument)?;
1551        }
1552
1553        self.instruments.insert(instrument.id(), instrument);
1554        Ok(())
1555    }
1556
1557    /// Adds the `synthetic` instrument to the cache.
1558    ///
1559    /// # Errors
1560    ///
1561    /// Returns an error if persisting the synthetic instrument to the backing database fails.
1562    pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1563        log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1564
1565        if let Some(database) = &mut self.database {
1566            database.add_synthetic(&synthetic)?;
1567        }
1568
1569        self.synthetics.insert(synthetic.id, synthetic);
1570        Ok(())
1571    }
1572
1573    /// Adds the `account` to the cache.
1574    ///
1575    /// # Errors
1576    ///
1577    /// Returns an error if persisting the account to the backing database fails.
1578    pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1579        log::debug!("Adding `Account` {}", account.id());
1580
1581        if let Some(database) = &mut self.database {
1582            database.add_account(&account)?;
1583        }
1584
1585        let account_id = account.id();
1586        self.accounts.insert(account_id, account);
1587        self.index
1588            .venue_account
1589            .insert(account_id.get_issuer(), account_id);
1590        Ok(())
1591    }
1592
1593    /// Indexes the `client_order_id` with the `venue_order_id`.
1594    ///
1595    /// The `overwrite` parameter determines whether to overwrite any existing cached identifier.
1596    ///
1597    /// # Errors
1598    ///
1599    /// Returns an error if the existing venue order ID conflicts and overwrite is false.
1600    pub fn add_venue_order_id(
1601        &mut self,
1602        client_order_id: &ClientOrderId,
1603        venue_order_id: &VenueOrderId,
1604        overwrite: bool,
1605    ) -> anyhow::Result<()> {
1606        if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
1607            && !overwrite
1608            && existing_venue_order_id != venue_order_id
1609        {
1610            anyhow::bail!(
1611                "Existing {existing_venue_order_id} for {client_order_id}
1612                    did not match the given {venue_order_id}.
1613                    If you are writing a test then try a different `venue_order_id`,
1614                    otherwise this is probably a bug."
1615            );
1616        }
1617
1618        self.index
1619            .client_order_ids
1620            .insert(*client_order_id, *venue_order_id);
1621        self.index
1622            .venue_order_ids
1623            .insert(*venue_order_id, *client_order_id);
1624
1625        Ok(())
1626    }
1627
1628    /// Adds the `order` to the cache indexed with any given identifiers.
1629    ///
1630    /// # Parameters
1631    ///
1632    /// `override_existing`: If the added order should 'override' any existing order and replace
1633    /// it in the cache. This is currently used for emulated orders which are
1634    /// being released and transformed into another type.
1635    ///
1636    /// # Errors
1637    ///
1638    /// Returns an error if not `replace_existing` and the `order.client_order_id` is already contained in the cache.
1639    pub fn add_order(
1640        &mut self,
1641        order: OrderAny,
1642        position_id: Option<PositionId>,
1643        client_id: Option<ClientId>,
1644        replace_existing: bool,
1645    ) -> anyhow::Result<()> {
1646        let instrument_id = order.instrument_id();
1647        let venue = instrument_id.venue;
1648        let client_order_id = order.client_order_id();
1649        let strategy_id = order.strategy_id();
1650        let exec_algorithm_id = order.exec_algorithm_id();
1651        let exec_spawn_id = order.exec_spawn_id();
1652
1653        if !replace_existing {
1654            check_key_not_in_map(
1655                &client_order_id,
1656                &self.orders,
1657                stringify!(client_order_id),
1658                stringify!(orders),
1659            )?;
1660        }
1661
1662        log::debug!("Adding {order:?}");
1663
1664        self.index.orders.insert(client_order_id);
1665        self.index
1666            .order_strategy
1667            .insert(client_order_id, strategy_id);
1668        self.index.strategies.insert(strategy_id);
1669
1670        // Update venue -> orders index
1671        self.index
1672            .venue_orders
1673            .entry(venue)
1674            .or_default()
1675            .insert(client_order_id);
1676
1677        // Update instrument -> orders index
1678        self.index
1679            .instrument_orders
1680            .entry(instrument_id)
1681            .or_default()
1682            .insert(client_order_id);
1683
1684        // Update strategy -> orders index
1685        self.index
1686            .strategy_orders
1687            .entry(strategy_id)
1688            .or_default()
1689            .insert(client_order_id);
1690
1691        // Update exec_algorithm -> orders index
1692        if let Some(exec_algorithm_id) = exec_algorithm_id {
1693            self.index.exec_algorithms.insert(exec_algorithm_id);
1694
1695            self.index
1696                .exec_algorithm_orders
1697                .entry(exec_algorithm_id)
1698                .or_default()
1699                .insert(client_order_id);
1700        }
1701
1702        // Update exec_spawn -> orders index
1703        if let Some(exec_spawn_id) = exec_spawn_id {
1704            self.index
1705                .exec_spawn_orders
1706                .entry(exec_spawn_id)
1707                .or_default()
1708                .insert(client_order_id);
1709        }
1710
1711        // Update emulation index
1712        match order.emulation_trigger() {
1713            Some(_) => {
1714                self.index.orders_emulated.remove(&client_order_id);
1715            }
1716            None => {
1717                self.index.orders_emulated.insert(client_order_id);
1718            }
1719        }
1720
1721        // Index position ID if provided
1722        if let Some(position_id) = position_id {
1723            self.add_position_id(
1724                &position_id,
1725                &order.instrument_id().venue,
1726                &client_order_id,
1727                &strategy_id,
1728            )?;
1729        }
1730
1731        // Index client ID if provided
1732        if let Some(client_id) = client_id {
1733            self.index.order_client.insert(client_order_id, client_id);
1734            log::debug!("Indexed {client_id:?}");
1735        }
1736
1737        if let Some(database) = &mut self.database {
1738            database.add_order(&order, client_id)?;
1739            // TODO: Implement
1740            // if self.config.snapshot_orders {
1741            //     database.snapshot_order_state(order)?;
1742            // }
1743        }
1744
1745        self.orders.insert(client_order_id, order);
1746
1747        Ok(())
1748    }
1749
1750    /// Indexes the `position_id` with the other given IDs.
1751    ///
1752    /// # Errors
1753    ///
1754    /// Returns an error if indexing position ID in the backing database fails.
1755    pub fn add_position_id(
1756        &mut self,
1757        position_id: &PositionId,
1758        venue: &Venue,
1759        client_order_id: &ClientOrderId,
1760        strategy_id: &StrategyId,
1761    ) -> anyhow::Result<()> {
1762        self.index
1763            .order_position
1764            .insert(*client_order_id, *position_id);
1765
1766        // Index: ClientOrderId -> PositionId
1767        if let Some(database) = &mut self.database {
1768            database.index_order_position(*client_order_id, *position_id)?;
1769        }
1770
1771        // Index: PositionId -> StrategyId
1772        self.index
1773            .position_strategy
1774            .insert(*position_id, *strategy_id);
1775
1776        // Index: PositionId -> set[ClientOrderId]
1777        self.index
1778            .position_orders
1779            .entry(*position_id)
1780            .or_default()
1781            .insert(*client_order_id);
1782
1783        // Index: StrategyId -> set[PositionId]
1784        self.index
1785            .strategy_positions
1786            .entry(*strategy_id)
1787            .or_default()
1788            .insert(*position_id);
1789
1790        // Index: Venue -> set[PositionId]
1791        self.index
1792            .venue_positions
1793            .entry(*venue)
1794            .or_default()
1795            .insert(*position_id);
1796
1797        Ok(())
1798    }
1799
1800    /// Adds the `position` to the cache.
1801    ///
1802    /// # Errors
1803    ///
1804    /// Returns an error if persisting the position to the backing database fails.
1805    pub fn add_position(&mut self, position: Position, _oms_type: OmsType) -> anyhow::Result<()> {
1806        self.positions.insert(position.id, position.clone());
1807        self.index.positions.insert(position.id);
1808        self.index.positions_open.insert(position.id);
1809        self.index.positions_closed.remove(&position.id); // Cleanup for NETTING reopen
1810
1811        log::debug!("Adding {position}");
1812
1813        self.add_position_id(
1814            &position.id,
1815            &position.instrument_id.venue,
1816            &position.opening_order_id,
1817            &position.strategy_id,
1818        )?;
1819
1820        let venue = position.instrument_id.venue;
1821        let venue_positions = self.index.venue_positions.entry(venue).or_default();
1822        venue_positions.insert(position.id);
1823
1824        // Index: InstrumentId -> AHashSet
1825        let instrument_id = position.instrument_id;
1826        let instrument_positions = self
1827            .index
1828            .instrument_positions
1829            .entry(instrument_id)
1830            .or_default();
1831        instrument_positions.insert(position.id);
1832
1833        if let Some(database) = &mut self.database {
1834            database.add_position(&position)?;
1835            // TODO: Implement position snapshots
1836            // if self.snapshot_positions {
1837            //     database.snapshot_position_state(
1838            //         position,
1839            //         position.ts_last,
1840            //         self.calculate_unrealized_pnl(&position),
1841            //     )?;
1842            // }
1843        }
1844
1845        Ok(())
1846    }
1847
1848    /// Updates the `account` in the cache.
1849    ///
1850    /// # Errors
1851    ///
1852    /// Returns an error if updating the account in the database fails.
1853    pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1854        if let Some(database) = &mut self.database {
1855            database.update_account(&account)?;
1856        }
1857        Ok(())
1858    }
1859
1860    /// Updates the `order` in the cache.
1861    ///
1862    /// # Errors
1863    ///
1864    /// Returns an error if updating the order in the database fails.
1865    pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
1866        let client_order_id = order.client_order_id();
1867
1868        // Update venue order ID
1869        if let Some(venue_order_id) = order.venue_order_id() {
1870            // If the order is being modified then we allow a changing `VenueOrderId` to accommodate
1871            // venues which use a cancel+replace update strategy.
1872            if !self.index.venue_order_ids.contains_key(&venue_order_id) {
1873                // TODO: If the last event was `OrderUpdated` then overwrite should be true
1874                self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
1875            }
1876        }
1877
1878        // Update in-flight state
1879        if order.is_inflight() {
1880            self.index.orders_inflight.insert(client_order_id);
1881        } else {
1882            self.index.orders_inflight.remove(&client_order_id);
1883        }
1884
1885        // Update open/closed state
1886        if order.is_open() {
1887            self.index.orders_closed.remove(&client_order_id);
1888            self.index.orders_open.insert(client_order_id);
1889        } else if order.is_closed() {
1890            self.index.orders_open.remove(&client_order_id);
1891            self.index.orders_pending_cancel.remove(&client_order_id);
1892            self.index.orders_closed.insert(client_order_id);
1893        }
1894
1895        // Update emulation
1896        if let Some(emulation_trigger) = order.emulation_trigger() {
1897            match emulation_trigger {
1898                TriggerType::NoTrigger => self.index.orders_emulated.remove(&client_order_id),
1899                _ => self.index.orders_emulated.insert(client_order_id),
1900            };
1901        }
1902
1903        // Update own book
1904        if self.own_order_book(&order.instrument_id()).is_some()
1905            && should_handle_own_book_order(order)
1906        {
1907            self.update_own_order_book(order);
1908        }
1909
1910        if let Some(database) = &mut self.database {
1911            database.update_order(order.last_event())?;
1912            // TODO: Implement order snapshots
1913            // if self.snapshot_orders {
1914            //     database.snapshot_order_state(order)?;
1915            // }
1916        }
1917
1918        // update the order in the cache
1919        self.orders.insert(client_order_id, order.clone());
1920
1921        Ok(())
1922    }
1923
1924    /// Updates the `order` as pending cancel locally.
1925    pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
1926        self.index
1927            .orders_pending_cancel
1928            .insert(order.client_order_id());
1929    }
1930
1931    /// Updates the `position` in the cache.
1932    ///
1933    /// # Errors
1934    ///
1935    /// Returns an error if updating the position in the database fails.
1936    pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
1937        // Update open/closed state
1938
1939        if position.is_open() {
1940            self.index.positions_open.insert(position.id);
1941            self.index.positions_closed.remove(&position.id);
1942        } else {
1943            self.index.positions_closed.insert(position.id);
1944            self.index.positions_open.remove(&position.id);
1945        }
1946
1947        if let Some(database) = &mut self.database {
1948            database.update_position(position)?;
1949            // TODO: Implement order snapshots
1950            // if self.snapshot_orders {
1951            //     database.snapshot_order_state(order)?;
1952            // }
1953        }
1954
1955        self.positions.insert(position.id, position.clone());
1956
1957        Ok(())
1958    }
1959
1960    /// Creates a snapshot of the `position` by cloning it, assigning a new ID,
1961    /// serializing it, and storing it in the position snapshots.
1962    ///
1963    /// # Errors
1964    ///
1965    /// Returns an error if serializing or storing the position snapshot fails.
1966    pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
1967        let position_id = position.id;
1968
1969        let mut copied_position = position.clone();
1970        let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
1971        copied_position.id = PositionId::new(new_id);
1972
1973        // Serialize the position (TODO: temporarily just to JSON to remove a dependency)
1974        let position_serialized = serde_json::to_vec(&copied_position)?;
1975
1976        let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
1977        let new_snapshots = match snapshots {
1978            Some(existing_snapshots) => {
1979                let mut combined = existing_snapshots.to_vec();
1980                combined.extend(position_serialized);
1981                Bytes::from(combined)
1982            }
1983            None => Bytes::from(position_serialized),
1984        };
1985        self.position_snapshots.insert(position_id, new_snapshots);
1986
1987        log::debug!("Snapshot {copied_position}");
1988        Ok(())
1989    }
1990
1991    /// Creates a snapshot of the `position` state in the database.
1992    ///
1993    /// # Errors
1994    ///
1995    /// Returns an error if snapshotting the position state fails.
1996    pub fn snapshot_position_state(
1997        &mut self,
1998        position: &Position,
1999        // ts_snapshot: u64,
2000        // unrealized_pnl: Option<Money>,
2001        open_only: Option<bool>,
2002    ) -> anyhow::Result<()> {
2003        let open_only = open_only.unwrap_or(true);
2004
2005        if open_only && !position.is_open() {
2006            return Ok(());
2007        }
2008
2009        if let Some(database) = &mut self.database {
2010            database.snapshot_position_state(position).map_err(|e| {
2011                log::error!(
2012                    "Failed to snapshot position state for {}: {e:?}",
2013                    position.id
2014                );
2015                e
2016            })?;
2017        } else {
2018            log::warn!(
2019                "Cannot snapshot position state for {} (no database configured)",
2020                position.id
2021            );
2022        }
2023
2024        // Ok(())
2025        todo!()
2026    }
2027
2028    /// Gets the OMS type for the `position_id`.
2029    #[must_use]
2030    pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
2031        // Get OMS type from the index
2032        if self.index.position_strategy.contains_key(position_id) {
2033            // For now, we'll default to NETTING
2034            // TODO: Store and retrieve actual OMS type per position
2035            Some(OmsType::Netting)
2036        } else {
2037            None
2038        }
2039    }
2040
2041    /// Gets position snapshot bytes for the `position_id`.
2042    #[must_use]
2043    pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<u8>> {
2044        self.position_snapshots.get(position_id).map(|b| b.to_vec())
2045    }
2046
2047    /// Gets position snapshot IDs for the `instrument_id`.
2048    #[must_use]
2049    pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> AHashSet<PositionId> {
2050        // Get snapshot position IDs that match the instrument
2051        let mut result = AHashSet::new();
2052        for (position_id, _) in &self.position_snapshots {
2053            // Check if this position is for the requested instrument
2054            if let Some(position) = self.positions.get(position_id)
2055                && position.instrument_id == *instrument_id
2056            {
2057                result.insert(*position_id);
2058            }
2059        }
2060        result
2061    }
2062
2063    /// Snapshots the `order` state in the database.
2064    ///
2065    /// # Errors
2066    ///
2067    /// Returns an error if snapshotting the order state fails.
2068    pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
2069        let database = if let Some(database) = &self.database {
2070            database
2071        } else {
2072            log::warn!(
2073                "Cannot snapshot order state for {} (no database configured)",
2074                order.client_order_id()
2075            );
2076            return Ok(());
2077        };
2078
2079        database.snapshot_order_state(order)
2080    }
2081
2082    // -- IDENTIFIER QUERIES ----------------------------------------------------------------------
2083
2084    fn build_order_query_filter_set(
2085        &self,
2086        venue: Option<&Venue>,
2087        instrument_id: Option<&InstrumentId>,
2088        strategy_id: Option<&StrategyId>,
2089    ) -> Option<AHashSet<ClientOrderId>> {
2090        let mut query: Option<AHashSet<ClientOrderId>> = None;
2091
2092        if let Some(venue) = venue {
2093            query = Some(
2094                self.index
2095                    .venue_orders
2096                    .get(venue)
2097                    .cloned()
2098                    .unwrap_or_default(),
2099            );
2100        }
2101
2102        if let Some(instrument_id) = instrument_id {
2103            let instrument_orders = self
2104                .index
2105                .instrument_orders
2106                .get(instrument_id)
2107                .cloned()
2108                .unwrap_or_default();
2109
2110            if let Some(existing_query) = &mut query {
2111                *existing_query = existing_query
2112                    .intersection(&instrument_orders)
2113                    .copied()
2114                    .collect();
2115            } else {
2116                query = Some(instrument_orders);
2117            }
2118        }
2119
2120        if let Some(strategy_id) = strategy_id {
2121            let strategy_orders = self
2122                .index
2123                .strategy_orders
2124                .get(strategy_id)
2125                .cloned()
2126                .unwrap_or_default();
2127
2128            if let Some(existing_query) = &mut query {
2129                *existing_query = existing_query
2130                    .intersection(&strategy_orders)
2131                    .copied()
2132                    .collect();
2133            } else {
2134                query = Some(strategy_orders);
2135            }
2136        }
2137
2138        query
2139    }
2140
2141    fn build_position_query_filter_set(
2142        &self,
2143        venue: Option<&Venue>,
2144        instrument_id: Option<&InstrumentId>,
2145        strategy_id: Option<&StrategyId>,
2146    ) -> Option<AHashSet<PositionId>> {
2147        let mut query: Option<AHashSet<PositionId>> = None;
2148
2149        if let Some(venue) = venue {
2150            query = Some(
2151                self.index
2152                    .venue_positions
2153                    .get(venue)
2154                    .cloned()
2155                    .unwrap_or_default(),
2156            );
2157        }
2158
2159        if let Some(instrument_id) = instrument_id {
2160            let instrument_positions = self
2161                .index
2162                .instrument_positions
2163                .get(instrument_id)
2164                .cloned()
2165                .unwrap_or_default();
2166
2167            if let Some(existing_query) = query {
2168                query = Some(
2169                    existing_query
2170                        .intersection(&instrument_positions)
2171                        .copied()
2172                        .collect(),
2173                );
2174            } else {
2175                query = Some(instrument_positions);
2176            }
2177        }
2178
2179        if let Some(strategy_id) = strategy_id {
2180            let strategy_positions = self
2181                .index
2182                .strategy_positions
2183                .get(strategy_id)
2184                .cloned()
2185                .unwrap_or_default();
2186
2187            if let Some(existing_query) = query {
2188                query = Some(
2189                    existing_query
2190                        .intersection(&strategy_positions)
2191                        .copied()
2192                        .collect(),
2193                );
2194            } else {
2195                query = Some(strategy_positions);
2196            }
2197        }
2198
2199        query
2200    }
2201
2202    /// Retrieves orders corresponding to the `client_order_ids`, optionally filtering by `side`.
2203    ///
2204    /// # Panics
2205    ///
2206    /// Panics if any `client_order_id` in the set is not found in the cache.
2207    fn get_orders_for_ids(
2208        &self,
2209        client_order_ids: &AHashSet<ClientOrderId>,
2210        side: Option<OrderSide>,
2211    ) -> Vec<&OrderAny> {
2212        let side = side.unwrap_or(OrderSide::NoOrderSide);
2213        let mut orders = Vec::new();
2214
2215        for client_order_id in client_order_ids {
2216            let order = self
2217                .orders
2218                .get(client_order_id)
2219                .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
2220            if side == OrderSide::NoOrderSide || side == order.order_side() {
2221                orders.push(order);
2222            }
2223        }
2224
2225        orders
2226    }
2227
2228    /// Retrieves positions corresponding to the `position_ids`, optionally filtering by `side`.
2229    ///
2230    /// # Panics
2231    ///
2232    /// Panics if any `position_id` in the set is not found in the cache.
2233    fn get_positions_for_ids(
2234        &self,
2235        position_ids: &AHashSet<PositionId>,
2236        side: Option<PositionSide>,
2237    ) -> Vec<&Position> {
2238        let side = side.unwrap_or(PositionSide::NoPositionSide);
2239        let mut positions = Vec::new();
2240
2241        for position_id in position_ids {
2242            let position = self
2243                .positions
2244                .get(position_id)
2245                .unwrap_or_else(|| panic!("Position {position_id} not found"));
2246            if side == PositionSide::NoPositionSide || side == position.side {
2247                positions.push(position);
2248            }
2249        }
2250
2251        positions
2252    }
2253
2254    /// Returns the `ClientOrderId`s of all orders.
2255    #[must_use]
2256    pub fn client_order_ids(
2257        &self,
2258        venue: Option<&Venue>,
2259        instrument_id: Option<&InstrumentId>,
2260        strategy_id: Option<&StrategyId>,
2261    ) -> AHashSet<ClientOrderId> {
2262        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2263        match query {
2264            Some(query) => self.index.orders.intersection(&query).copied().collect(),
2265            None => self.index.orders.clone(),
2266        }
2267    }
2268
2269    /// Returns the `ClientOrderId`s of all open orders.
2270    #[must_use]
2271    pub fn client_order_ids_open(
2272        &self,
2273        venue: Option<&Venue>,
2274        instrument_id: Option<&InstrumentId>,
2275        strategy_id: Option<&StrategyId>,
2276    ) -> AHashSet<ClientOrderId> {
2277        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2278        match query {
2279            Some(query) => self
2280                .index
2281                .orders_open
2282                .intersection(&query)
2283                .copied()
2284                .collect(),
2285            None => self.index.orders_open.clone(),
2286        }
2287    }
2288
2289    /// Returns the `ClientOrderId`s of all closed orders.
2290    #[must_use]
2291    pub fn client_order_ids_closed(
2292        &self,
2293        venue: Option<&Venue>,
2294        instrument_id: Option<&InstrumentId>,
2295        strategy_id: Option<&StrategyId>,
2296    ) -> AHashSet<ClientOrderId> {
2297        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2298        match query {
2299            Some(query) => self
2300                .index
2301                .orders_closed
2302                .intersection(&query)
2303                .copied()
2304                .collect(),
2305            None => self.index.orders_closed.clone(),
2306        }
2307    }
2308
2309    /// Returns the `ClientOrderId`s of all emulated orders.
2310    #[must_use]
2311    pub fn client_order_ids_emulated(
2312        &self,
2313        venue: Option<&Venue>,
2314        instrument_id: Option<&InstrumentId>,
2315        strategy_id: Option<&StrategyId>,
2316    ) -> AHashSet<ClientOrderId> {
2317        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2318        match query {
2319            Some(query) => self
2320                .index
2321                .orders_emulated
2322                .intersection(&query)
2323                .copied()
2324                .collect(),
2325            None => self.index.orders_emulated.clone(),
2326        }
2327    }
2328
2329    /// Returns the `ClientOrderId`s of all in-flight orders.
2330    #[must_use]
2331    pub fn client_order_ids_inflight(
2332        &self,
2333        venue: Option<&Venue>,
2334        instrument_id: Option<&InstrumentId>,
2335        strategy_id: Option<&StrategyId>,
2336    ) -> AHashSet<ClientOrderId> {
2337        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2338        match query {
2339            Some(query) => self
2340                .index
2341                .orders_inflight
2342                .intersection(&query)
2343                .copied()
2344                .collect(),
2345            None => self.index.orders_inflight.clone(),
2346        }
2347    }
2348
2349    /// Returns `PositionId`s of all positions.
2350    #[must_use]
2351    pub fn position_ids(
2352        &self,
2353        venue: Option<&Venue>,
2354        instrument_id: Option<&InstrumentId>,
2355        strategy_id: Option<&StrategyId>,
2356    ) -> AHashSet<PositionId> {
2357        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2358        match query {
2359            Some(query) => self.index.positions.intersection(&query).copied().collect(),
2360            None => self.index.positions.clone(),
2361        }
2362    }
2363
2364    /// Returns the `PositionId`s of all open positions.
2365    #[must_use]
2366    pub fn position_open_ids(
2367        &self,
2368        venue: Option<&Venue>,
2369        instrument_id: Option<&InstrumentId>,
2370        strategy_id: Option<&StrategyId>,
2371    ) -> AHashSet<PositionId> {
2372        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2373        match query {
2374            Some(query) => self
2375                .index
2376                .positions_open
2377                .intersection(&query)
2378                .copied()
2379                .collect(),
2380            None => self.index.positions_open.clone(),
2381        }
2382    }
2383
2384    /// Returns the `PositionId`s of all closed positions.
2385    #[must_use]
2386    pub fn position_closed_ids(
2387        &self,
2388        venue: Option<&Venue>,
2389        instrument_id: Option<&InstrumentId>,
2390        strategy_id: Option<&StrategyId>,
2391    ) -> AHashSet<PositionId> {
2392        let query = self.build_position_query_filter_set(venue, instrument_id, strategy_id);
2393        match query {
2394            Some(query) => self
2395                .index
2396                .positions_closed
2397                .intersection(&query)
2398                .copied()
2399                .collect(),
2400            None => self.index.positions_closed.clone(),
2401        }
2402    }
2403
2404    /// Returns the `ComponentId`s of all actors.
2405    #[must_use]
2406    pub fn actor_ids(&self) -> AHashSet<ComponentId> {
2407        self.index.actors.clone()
2408    }
2409
2410    /// Returns the `StrategyId`s of all strategies.
2411    #[must_use]
2412    pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
2413        self.index.strategies.clone()
2414    }
2415
2416    /// Returns the `ExecAlgorithmId`s of all execution algorithms.
2417    #[must_use]
2418    pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
2419        self.index.exec_algorithms.clone()
2420    }
2421
2422    // -- ORDER QUERIES ---------------------------------------------------------------------------
2423
2424    /// Gets a reference to the order with the `client_order_id` (if found).
2425    #[must_use]
2426    pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2427        self.orders.get(client_order_id)
2428    }
2429
2430    /// Gets a reference to the order with the `client_order_id` (if found).
2431    #[must_use]
2432    pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2433        self.orders.get_mut(client_order_id)
2434    }
2435
2436    /// Gets a reference to the client order ID for the `venue_order_id` (if found).
2437    #[must_use]
2438    pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2439        self.index.venue_order_ids.get(venue_order_id)
2440    }
2441
2442    /// Gets a reference to the venue order ID for the `client_order_id` (if found).
2443    #[must_use]
2444    pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2445        self.index.client_order_ids.get(client_order_id)
2446    }
2447
2448    /// Gets a reference to the client ID indexed for then `client_order_id` (if found).
2449    #[must_use]
2450    pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2451        self.index.order_client.get(client_order_id)
2452    }
2453
2454    /// Returns references to all orders matching the optional filter parameters.
2455    #[must_use]
2456    pub fn orders(
2457        &self,
2458        venue: Option<&Venue>,
2459        instrument_id: Option<&InstrumentId>,
2460        strategy_id: Option<&StrategyId>,
2461        side: Option<OrderSide>,
2462    ) -> Vec<&OrderAny> {
2463        let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id);
2464        self.get_orders_for_ids(&client_order_ids, side)
2465    }
2466
2467    /// Returns references to all open orders matching the optional filter parameters.
2468    #[must_use]
2469    pub fn orders_open(
2470        &self,
2471        venue: Option<&Venue>,
2472        instrument_id: Option<&InstrumentId>,
2473        strategy_id: Option<&StrategyId>,
2474        side: Option<OrderSide>,
2475    ) -> Vec<&OrderAny> {
2476        let client_order_ids = self.client_order_ids_open(venue, instrument_id, strategy_id);
2477        self.get_orders_for_ids(&client_order_ids, side)
2478    }
2479
2480    /// Returns references to all closed orders matching the optional filter parameters.
2481    #[must_use]
2482    pub fn orders_closed(
2483        &self,
2484        venue: Option<&Venue>,
2485        instrument_id: Option<&InstrumentId>,
2486        strategy_id: Option<&StrategyId>,
2487        side: Option<OrderSide>,
2488    ) -> Vec<&OrderAny> {
2489        let client_order_ids = self.client_order_ids_closed(venue, instrument_id, strategy_id);
2490        self.get_orders_for_ids(&client_order_ids, side)
2491    }
2492
2493    /// Returns references to all emulated orders matching the optional filter parameters.
2494    #[must_use]
2495    pub fn orders_emulated(
2496        &self,
2497        venue: Option<&Venue>,
2498        instrument_id: Option<&InstrumentId>,
2499        strategy_id: Option<&StrategyId>,
2500        side: Option<OrderSide>,
2501    ) -> Vec<&OrderAny> {
2502        let client_order_ids = self.client_order_ids_emulated(venue, instrument_id, strategy_id);
2503        self.get_orders_for_ids(&client_order_ids, side)
2504    }
2505
2506    /// Returns references to all in-flight orders matching the optional filter parameters.
2507    #[must_use]
2508    pub fn orders_inflight(
2509        &self,
2510        venue: Option<&Venue>,
2511        instrument_id: Option<&InstrumentId>,
2512        strategy_id: Option<&StrategyId>,
2513        side: Option<OrderSide>,
2514    ) -> Vec<&OrderAny> {
2515        let client_order_ids = self.client_order_ids_inflight(venue, instrument_id, strategy_id);
2516        self.get_orders_for_ids(&client_order_ids, side)
2517    }
2518
2519    /// Returns references to all orders for the `position_id`.
2520    #[must_use]
2521    pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2522        let client_order_ids = self.index.position_orders.get(position_id);
2523        match client_order_ids {
2524            Some(client_order_ids) => {
2525                self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2526            }
2527            None => Vec::new(),
2528        }
2529    }
2530
2531    /// Returns whether an order with the `client_order_id` exists.
2532    #[must_use]
2533    pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2534        self.index.orders.contains(client_order_id)
2535    }
2536
2537    /// Returns whether an order with the `client_order_id` is open.
2538    #[must_use]
2539    pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2540        self.index.orders_open.contains(client_order_id)
2541    }
2542
2543    /// Returns whether an order with the `client_order_id` is closed.
2544    #[must_use]
2545    pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2546        self.index.orders_closed.contains(client_order_id)
2547    }
2548
2549    /// Returns whether an order with the `client_order_id` is emulated.
2550    #[must_use]
2551    pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2552        self.index.orders_emulated.contains(client_order_id)
2553    }
2554
2555    /// Returns whether an order with the `client_order_id` is in-flight.
2556    #[must_use]
2557    pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2558        self.index.orders_inflight.contains(client_order_id)
2559    }
2560
2561    /// Returns whether an order with the `client_order_id` is `PENDING_CANCEL` locally.
2562    #[must_use]
2563    pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2564        self.index.orders_pending_cancel.contains(client_order_id)
2565    }
2566
2567    /// Returns the count of all open orders.
2568    #[must_use]
2569    pub fn orders_open_count(
2570        &self,
2571        venue: Option<&Venue>,
2572        instrument_id: Option<&InstrumentId>,
2573        strategy_id: Option<&StrategyId>,
2574        side: Option<OrderSide>,
2575    ) -> usize {
2576        self.orders_open(venue, instrument_id, strategy_id, side)
2577            .len()
2578    }
2579
2580    /// Returns the count of all closed orders.
2581    #[must_use]
2582    pub fn orders_closed_count(
2583        &self,
2584        venue: Option<&Venue>,
2585        instrument_id: Option<&InstrumentId>,
2586        strategy_id: Option<&StrategyId>,
2587        side: Option<OrderSide>,
2588    ) -> usize {
2589        self.orders_closed(venue, instrument_id, strategy_id, side)
2590            .len()
2591    }
2592
2593    /// Returns the count of all emulated orders.
2594    #[must_use]
2595    pub fn orders_emulated_count(
2596        &self,
2597        venue: Option<&Venue>,
2598        instrument_id: Option<&InstrumentId>,
2599        strategy_id: Option<&StrategyId>,
2600        side: Option<OrderSide>,
2601    ) -> usize {
2602        self.orders_emulated(venue, instrument_id, strategy_id, side)
2603            .len()
2604    }
2605
2606    /// Returns the count of all in-flight orders.
2607    #[must_use]
2608    pub fn orders_inflight_count(
2609        &self,
2610        venue: Option<&Venue>,
2611        instrument_id: Option<&InstrumentId>,
2612        strategy_id: Option<&StrategyId>,
2613        side: Option<OrderSide>,
2614    ) -> usize {
2615        self.orders_inflight(venue, instrument_id, strategy_id, side)
2616            .len()
2617    }
2618
2619    /// Returns the count of all orders.
2620    #[must_use]
2621    pub fn orders_total_count(
2622        &self,
2623        venue: Option<&Venue>,
2624        instrument_id: Option<&InstrumentId>,
2625        strategy_id: Option<&StrategyId>,
2626        side: Option<OrderSide>,
2627    ) -> usize {
2628        self.orders(venue, instrument_id, strategy_id, side).len()
2629    }
2630
2631    /// Returns the order list for the `order_list_id`.
2632    #[must_use]
2633    pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
2634        self.order_lists.get(order_list_id)
2635    }
2636
2637    /// Returns all order lists matching the optional filter parameters.
2638    #[must_use]
2639    pub fn order_lists(
2640        &self,
2641        venue: Option<&Venue>,
2642        instrument_id: Option<&InstrumentId>,
2643        strategy_id: Option<&StrategyId>,
2644    ) -> Vec<&OrderList> {
2645        let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
2646
2647        if let Some(venue) = venue {
2648            order_lists.retain(|ol| &ol.instrument_id.venue == venue);
2649        }
2650
2651        if let Some(instrument_id) = instrument_id {
2652            order_lists.retain(|ol| &ol.instrument_id == instrument_id);
2653        }
2654
2655        if let Some(strategy_id) = strategy_id {
2656            order_lists.retain(|ol| &ol.strategy_id == strategy_id);
2657        }
2658
2659        order_lists
2660    }
2661
2662    /// Returns whether an order list with the `order_list_id` exists.
2663    #[must_use]
2664    pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
2665        self.order_lists.contains_key(order_list_id)
2666    }
2667
2668    // -- EXEC ALGORITHM QUERIES ------------------------------------------------------------------
2669
2670    /// Returns references to all orders associated with the `exec_algorithm_id` matching the
2671    /// optional filter parameters.
2672    #[must_use]
2673    pub fn orders_for_exec_algorithm(
2674        &self,
2675        exec_algorithm_id: &ExecAlgorithmId,
2676        venue: Option<&Venue>,
2677        instrument_id: Option<&InstrumentId>,
2678        strategy_id: Option<&StrategyId>,
2679        side: Option<OrderSide>,
2680    ) -> Vec<&OrderAny> {
2681        let query = self.build_order_query_filter_set(venue, instrument_id, strategy_id);
2682        let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
2683
2684        if let Some(query) = query
2685            && let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids
2686        {
2687            let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
2688        }
2689
2690        if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2691            self.get_orders_for_ids(exec_algorithm_order_ids, side)
2692        } else {
2693            Vec::new()
2694        }
2695    }
2696
2697    /// Returns references to all orders with the `exec_spawn_id`.
2698    #[must_use]
2699    pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
2700        self.get_orders_for_ids(
2701            self.index
2702                .exec_spawn_orders
2703                .get(exec_spawn_id)
2704                .unwrap_or(&AHashSet::new()),
2705            None,
2706        )
2707    }
2708
2709    /// Returns the total order quantity for the `exec_spawn_id`.
2710    #[must_use]
2711    pub fn exec_spawn_total_quantity(
2712        &self,
2713        exec_spawn_id: &ClientOrderId,
2714        active_only: bool,
2715    ) -> Option<Quantity> {
2716        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2717
2718        let mut total_quantity: Option<Quantity> = None;
2719
2720        for spawn_order in exec_spawn_orders {
2721            if active_only && spawn_order.is_closed() {
2722                continue;
2723            }
2724
2725            match total_quantity.as_mut() {
2726                Some(total) => *total += spawn_order.quantity(),
2727                None => total_quantity = Some(spawn_order.quantity()),
2728            }
2729        }
2730
2731        total_quantity
2732    }
2733
2734    /// Returns the total filled quantity for all orders with the `exec_spawn_id`.
2735    #[must_use]
2736    pub fn exec_spawn_total_filled_qty(
2737        &self,
2738        exec_spawn_id: &ClientOrderId,
2739        active_only: bool,
2740    ) -> Option<Quantity> {
2741        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2742
2743        let mut total_quantity: Option<Quantity> = None;
2744
2745        for spawn_order in exec_spawn_orders {
2746            if active_only && spawn_order.is_closed() {
2747                continue;
2748            }
2749
2750            match total_quantity.as_mut() {
2751                Some(total) => *total += spawn_order.filled_qty(),
2752                None => total_quantity = Some(spawn_order.filled_qty()),
2753            }
2754        }
2755
2756        total_quantity
2757    }
2758
2759    /// Returns the total leaves quantity for all orders with the `exec_spawn_id`.
2760    #[must_use]
2761    pub fn exec_spawn_total_leaves_qty(
2762        &self,
2763        exec_spawn_id: &ClientOrderId,
2764        active_only: bool,
2765    ) -> Option<Quantity> {
2766        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2767
2768        let mut total_quantity: Option<Quantity> = None;
2769
2770        for spawn_order in exec_spawn_orders {
2771            if active_only && spawn_order.is_closed() {
2772                continue;
2773            }
2774
2775            match total_quantity.as_mut() {
2776                Some(total) => *total += spawn_order.leaves_qty(),
2777                None => total_quantity = Some(spawn_order.leaves_qty()),
2778            }
2779        }
2780
2781        total_quantity
2782    }
2783
2784    // -- POSITION QUERIES ------------------------------------------------------------------------
2785
2786    /// Returns a reference to the position with the `position_id` (if found).
2787    #[must_use]
2788    pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
2789        self.positions.get(position_id)
2790    }
2791
2792    /// Returns a reference to the position for the `client_order_id` (if found).
2793    #[must_use]
2794    pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
2795        self.index
2796            .order_position
2797            .get(client_order_id)
2798            .and_then(|position_id| self.positions.get(position_id))
2799    }
2800
2801    /// Returns a reference to the position ID for the `client_order_id` (if found).
2802    #[must_use]
2803    pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
2804        self.index.order_position.get(client_order_id)
2805    }
2806
2807    /// Returns a reference to all positions matching the optional filter parameters.
2808    #[must_use]
2809    pub fn positions(
2810        &self,
2811        venue: Option<&Venue>,
2812        instrument_id: Option<&InstrumentId>,
2813        strategy_id: Option<&StrategyId>,
2814        side: Option<PositionSide>,
2815    ) -> Vec<&Position> {
2816        let position_ids = self.position_ids(venue, instrument_id, strategy_id);
2817        self.get_positions_for_ids(&position_ids, side)
2818    }
2819
2820    /// Returns a reference to all open positions matching the optional filter parameters.
2821    #[must_use]
2822    pub fn positions_open(
2823        &self,
2824        venue: Option<&Venue>,
2825        instrument_id: Option<&InstrumentId>,
2826        strategy_id: Option<&StrategyId>,
2827        side: Option<PositionSide>,
2828    ) -> Vec<&Position> {
2829        let position_ids = self.position_open_ids(venue, instrument_id, strategy_id);
2830        self.get_positions_for_ids(&position_ids, side)
2831    }
2832
2833    /// Returns a reference to all closed positions matching the optional filter parameters.
2834    #[must_use]
2835    pub fn positions_closed(
2836        &self,
2837        venue: Option<&Venue>,
2838        instrument_id: Option<&InstrumentId>,
2839        strategy_id: Option<&StrategyId>,
2840        side: Option<PositionSide>,
2841    ) -> Vec<&Position> {
2842        let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id);
2843        self.get_positions_for_ids(&position_ids, side)
2844    }
2845
2846    /// Returns whether a position with the `position_id` exists.
2847    #[must_use]
2848    pub fn position_exists(&self, position_id: &PositionId) -> bool {
2849        self.index.positions.contains(position_id)
2850    }
2851
2852    /// Returns whether a position with the `position_id` is open.
2853    #[must_use]
2854    pub fn is_position_open(&self, position_id: &PositionId) -> bool {
2855        self.index.positions_open.contains(position_id)
2856    }
2857
2858    /// Returns whether a position with the `position_id` is closed.
2859    #[must_use]
2860    pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
2861        self.index.positions_closed.contains(position_id)
2862    }
2863
2864    /// Returns the count of all open positions.
2865    #[must_use]
2866    pub fn positions_open_count(
2867        &self,
2868        venue: Option<&Venue>,
2869        instrument_id: Option<&InstrumentId>,
2870        strategy_id: Option<&StrategyId>,
2871        side: Option<PositionSide>,
2872    ) -> usize {
2873        self.positions_open(venue, instrument_id, strategy_id, side)
2874            .len()
2875    }
2876
2877    /// Returns the count of all closed positions.
2878    #[must_use]
2879    pub fn positions_closed_count(
2880        &self,
2881        venue: Option<&Venue>,
2882        instrument_id: Option<&InstrumentId>,
2883        strategy_id: Option<&StrategyId>,
2884        side: Option<PositionSide>,
2885    ) -> usize {
2886        self.positions_closed(venue, instrument_id, strategy_id, side)
2887            .len()
2888    }
2889
2890    /// Returns the count of all positions.
2891    #[must_use]
2892    pub fn positions_total_count(
2893        &self,
2894        venue: Option<&Venue>,
2895        instrument_id: Option<&InstrumentId>,
2896        strategy_id: Option<&StrategyId>,
2897        side: Option<PositionSide>,
2898    ) -> usize {
2899        self.positions(venue, instrument_id, strategy_id, side)
2900            .len()
2901    }
2902
2903    // -- STRATEGY QUERIES ------------------------------------------------------------------------
2904
2905    /// Gets a reference to the strategy ID for the `client_order_id` (if found).
2906    #[must_use]
2907    pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
2908        self.index.order_strategy.get(client_order_id)
2909    }
2910
2911    /// Gets a reference to the strategy ID for the `position_id` (if found).
2912    #[must_use]
2913    pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
2914        self.index.position_strategy.get(position_id)
2915    }
2916
2917    // -- GENERAL ---------------------------------------------------------------------------------
2918
2919    /// Gets a reference to the general value for the `key` (if found).
2920    ///
2921    /// # Errors
2922    ///
2923    /// Returns an error if the `key` is invalid.
2924    pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
2925        check_valid_string_ascii(key, stringify!(key))?;
2926
2927        Ok(self.general.get(key))
2928    }
2929
2930    // -- DATA QUERIES ----------------------------------------------------------------------------
2931
2932    /// Returns the price for the `instrument_id` and `price_type` (if found).
2933    #[must_use]
2934    pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
2935        match price_type {
2936            PriceType::Bid => self
2937                .quotes
2938                .get(instrument_id)
2939                .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
2940            PriceType::Ask => self
2941                .quotes
2942                .get(instrument_id)
2943                .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
2944            PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
2945                quotes.front().map(|quote| {
2946                    Price::new(
2947                        f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
2948                        quote.bid_price.precision + 1,
2949                    )
2950                })
2951            }),
2952            PriceType::Last => self
2953                .trades
2954                .get(instrument_id)
2955                .and_then(|trades| trades.front().map(|trade| trade.price)),
2956            PriceType::Mark => self
2957                .mark_prices
2958                .get(instrument_id)
2959                .and_then(|marks| marks.front().map(|mark| mark.value)),
2960        }
2961    }
2962
2963    /// Gets all quotes for the `instrument_id`.
2964    #[must_use]
2965    pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
2966        self.quotes
2967            .get(instrument_id)
2968            .map(|quotes| quotes.iter().copied().collect())
2969    }
2970
2971    /// Gets all trades for the `instrument_id`.
2972    #[must_use]
2973    pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
2974        self.trades
2975            .get(instrument_id)
2976            .map(|trades| trades.iter().copied().collect())
2977    }
2978
2979    /// Gets all mark price updates for the `instrument_id`.
2980    #[must_use]
2981    pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
2982        self.mark_prices
2983            .get(instrument_id)
2984            .map(|mark_prices| mark_prices.iter().copied().collect())
2985    }
2986
2987    /// Gets all index price updates for the `instrument_id`.
2988    #[must_use]
2989    pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
2990        self.index_prices
2991            .get(instrument_id)
2992            .map(|index_prices| index_prices.iter().copied().collect())
2993    }
2994
2995    /// Gets all bars for the `bar_type`.
2996    #[must_use]
2997    pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
2998        self.bars
2999            .get(bar_type)
3000            .map(|bars| bars.iter().copied().collect())
3001    }
3002
3003    /// Gets a reference to the order book for the `instrument_id`.
3004    #[must_use]
3005    pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
3006        self.books.get(instrument_id)
3007    }
3008
3009    /// Gets a reference to the order book for the `instrument_id`.
3010    #[must_use]
3011    pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
3012        self.books.get_mut(instrument_id)
3013    }
3014
3015    /// Gets a reference to the own order book for the `instrument_id`.
3016    #[must_use]
3017    pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
3018        self.own_books.get(instrument_id)
3019    }
3020
3021    /// Gets a reference to the own order book for the `instrument_id`.
3022    #[must_use]
3023    pub fn own_order_book_mut(
3024        &mut self,
3025        instrument_id: &InstrumentId,
3026    ) -> Option<&mut OwnOrderBook> {
3027        self.own_books.get_mut(instrument_id)
3028    }
3029
3030    /// Gets a reference to the latest quote for the `instrument_id`.
3031    #[must_use]
3032    pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
3033        self.quotes
3034            .get(instrument_id)
3035            .and_then(|quotes| quotes.front())
3036    }
3037
3038    /// Gets a reference to the latest trade for the `instrument_id`.
3039    #[must_use]
3040    pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
3041        self.trades
3042            .get(instrument_id)
3043            .and_then(|trades| trades.front())
3044    }
3045
3046    /// Gets a reference to the latest mark price update for the `instrument_id`.
3047    #[must_use]
3048    pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
3049        self.mark_prices
3050            .get(instrument_id)
3051            .and_then(|mark_prices| mark_prices.front())
3052    }
3053
3054    /// Gets a reference to the latest index price update for the `instrument_id`.
3055    #[must_use]
3056    pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
3057        self.index_prices
3058            .get(instrument_id)
3059            .and_then(|index_prices| index_prices.front())
3060    }
3061
3062    /// Gets a reference to the funding rate update for the `instrument_id`.
3063    #[must_use]
3064    pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
3065        self.funding_rates.get(instrument_id)
3066    }
3067
3068    /// Gets a reference to the latest bar for the `bar_type`.
3069    #[must_use]
3070    pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
3071        self.bars.get(bar_type).and_then(|bars| bars.front())
3072    }
3073
3074    /// Gets the order book update count for the `instrument_id`.
3075    #[must_use]
3076    pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
3077        self.books
3078            .get(instrument_id)
3079            .map_or(0, |book| book.update_count) as usize
3080    }
3081
3082    /// Gets the quote tick count for the `instrument_id`.
3083    #[must_use]
3084    pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
3085        self.quotes
3086            .get(instrument_id)
3087            .map_or(0, std::collections::VecDeque::len)
3088    }
3089
3090    /// Gets the trade tick count for the `instrument_id`.
3091    #[must_use]
3092    pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
3093        self.trades
3094            .get(instrument_id)
3095            .map_or(0, std::collections::VecDeque::len)
3096    }
3097
3098    /// Gets the bar count for the `instrument_id`.
3099    #[must_use]
3100    pub fn bar_count(&self, bar_type: &BarType) -> usize {
3101        self.bars
3102            .get(bar_type)
3103            .map_or(0, std::collections::VecDeque::len)
3104    }
3105
3106    /// Returns whether the cache contains an order book for the `instrument_id`.
3107    #[must_use]
3108    pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
3109        self.books.contains_key(instrument_id)
3110    }
3111
3112    /// Returns whether the cache contains quotes for the `instrument_id`.
3113    #[must_use]
3114    pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
3115        self.quote_count(instrument_id) > 0
3116    }
3117
3118    /// Returns whether the cache contains trades for the `instrument_id`.
3119    #[must_use]
3120    pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
3121        self.trade_count(instrument_id) > 0
3122    }
3123
3124    /// Returns whether the cache contains bars for the `bar_type`.
3125    #[must_use]
3126    pub fn has_bars(&self, bar_type: &BarType) -> bool {
3127        self.bar_count(bar_type) > 0
3128    }
3129
3130    #[must_use]
3131    pub fn get_xrate(
3132        &self,
3133        venue: Venue,
3134        from_currency: Currency,
3135        to_currency: Currency,
3136        price_type: PriceType,
3137    ) -> Option<f64> {
3138        if from_currency == to_currency {
3139            // When the source and target currencies are identical,
3140            // no conversion is needed; return an exchange rate of 1.0.
3141            return Some(1.0);
3142        }
3143
3144        let (bid_quote, ask_quote) = self.build_quote_table(&venue);
3145
3146        match get_exchange_rate(
3147            from_currency.code,
3148            to_currency.code,
3149            price_type,
3150            bid_quote,
3151            ask_quote,
3152        ) {
3153            Ok(rate) => rate,
3154            Err(e) => {
3155                log::error!("Failed to calculate xrate: {e}");
3156                None
3157            }
3158        }
3159    }
3160
3161    fn build_quote_table(&self, venue: &Venue) -> (AHashMap<String, f64>, AHashMap<String, f64>) {
3162        let mut bid_quotes = AHashMap::new();
3163        let mut ask_quotes = AHashMap::new();
3164
3165        for instrument_id in self.instruments.keys() {
3166            if instrument_id.venue != *venue {
3167                continue;
3168            }
3169
3170            let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
3171                if let Some(tick) = ticks.front() {
3172                    (tick.bid_price, tick.ask_price)
3173                } else {
3174                    continue; // Empty ticks vector
3175                }
3176            } else {
3177                let bid_bar = self
3178                    .bars
3179                    .iter()
3180                    .find(|(k, _)| {
3181                        k.instrument_id() == *instrument_id
3182                            && matches!(k.spec().price_type, PriceType::Bid)
3183                    })
3184                    .map(|(_, v)| v);
3185
3186                let ask_bar = self
3187                    .bars
3188                    .iter()
3189                    .find(|(k, _)| {
3190                        k.instrument_id() == *instrument_id
3191                            && matches!(k.spec().price_type, PriceType::Ask)
3192                    })
3193                    .map(|(_, v)| v);
3194
3195                match (bid_bar, ask_bar) {
3196                    (Some(bid), Some(ask)) => {
3197                        match (bid.front(), ask.front()) {
3198                            (Some(bid_bar), Some(ask_bar)) => (bid_bar.close, ask_bar.close),
3199                            _ => {
3200                                // Empty bar VecDeques
3201                                continue;
3202                            }
3203                        }
3204                    }
3205                    _ => continue,
3206                }
3207            };
3208
3209            bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
3210            ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
3211        }
3212
3213        (bid_quotes, ask_quotes)
3214    }
3215
3216    /// Returns the mark exchange rate for the given currency pair, or `None` if not set.
3217    #[must_use]
3218    pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
3219        self.mark_xrates.get(&(from_currency, to_currency)).copied()
3220    }
3221
3222    /// Sets the mark exchange rate for the given currency pair and automatically sets the inverse rate.
3223    ///
3224    /// # Panics
3225    ///
3226    /// Panics if `xrate` is not positive.
3227    pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
3228        assert!(xrate > 0.0, "xrate was zero");
3229        self.mark_xrates.insert((from_currency, to_currency), xrate);
3230        self.mark_xrates
3231            .insert((to_currency, from_currency), 1.0 / xrate);
3232    }
3233
3234    /// Clears the mark exchange rate for the given currency pair.
3235    pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
3236        let _ = self.mark_xrates.remove(&(from_currency, to_currency));
3237    }
3238
3239    /// Clears all mark exchange rates.
3240    pub fn clear_mark_xrates(&mut self) {
3241        self.mark_xrates.clear();
3242    }
3243
3244    // -- INSTRUMENT QUERIES ----------------------------------------------------------------------
3245
3246    /// Returns a reference to the instrument for the `instrument_id` (if found).
3247    #[must_use]
3248    pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
3249        self.instruments.get(instrument_id)
3250    }
3251
3252    /// Returns references to all instrument IDs for the `venue`.
3253    #[must_use]
3254    pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
3255        match venue {
3256            Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
3257            None => self.instruments.keys().collect(),
3258        }
3259    }
3260
3261    /// Returns references to all instruments for the `venue`.
3262    #[must_use]
3263    pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
3264        self.instruments
3265            .values()
3266            .filter(|i| &i.id().venue == venue)
3267            .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
3268            .collect()
3269    }
3270
3271    /// Returns references to all bar types contained in the cache.
3272    #[must_use]
3273    pub fn bar_types(
3274        &self,
3275        instrument_id: Option<&InstrumentId>,
3276        price_type: Option<&PriceType>,
3277        aggregation_source: AggregationSource,
3278    ) -> Vec<&BarType> {
3279        let mut bar_types = self
3280            .bars
3281            .keys()
3282            .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
3283            .collect::<Vec<&BarType>>();
3284
3285        if let Some(instrument_id) = instrument_id {
3286            bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
3287        }
3288
3289        if let Some(price_type) = price_type {
3290            bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
3291        }
3292
3293        bar_types
3294    }
3295
3296    // -- SYNTHETIC QUERIES -----------------------------------------------------------------------
3297
3298    /// Returns a reference to the synthetic instrument for the `instrument_id` (if found).
3299    #[must_use]
3300    pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
3301        self.synthetics.get(instrument_id)
3302    }
3303
3304    /// Returns references to instrument IDs for all synthetic instruments contained in the cache.
3305    #[must_use]
3306    pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
3307        self.synthetics.keys().collect()
3308    }
3309
3310    /// Returns references to all synthetic instruments contained in the cache.
3311    #[must_use]
3312    pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
3313        self.synthetics.values().collect()
3314    }
3315
3316    // -- ACCOUNT QUERIES -----------------------------------------------------------------------
3317
3318    /// Returns a reference to the account for the `account_id` (if found).
3319    #[must_use]
3320    pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
3321        self.accounts.get(account_id)
3322    }
3323
3324    /// Returns a reference to the account for the `venue` (if found).
3325    #[must_use]
3326    pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
3327        self.index
3328            .venue_account
3329            .get(venue)
3330            .and_then(|account_id| self.accounts.get(account_id))
3331    }
3332
3333    /// Returns a reference to the account ID for the `venue` (if found).
3334    #[must_use]
3335    pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
3336        self.index.venue_account.get(venue)
3337    }
3338
3339    /// Returns references to all accounts for the `account_id`.
3340    #[must_use]
3341    pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
3342        self.accounts
3343            .values()
3344            .filter(|account| &account.id() == account_id)
3345            .collect()
3346    }
3347
3348    /// Updates the own order book with an order.
3349    ///
3350    /// This method adds, updates, or removes an order from the own order book
3351    /// based on the order's current state.
3352    ///
3353    /// Orders without prices (MARKET, etc.) are skipped as they cannot be
3354    /// represented in own books.
3355    pub fn update_own_order_book(&mut self, order: &OrderAny) {
3356        if !order.has_price() {
3357            return;
3358        }
3359
3360        let instrument_id = order.instrument_id();
3361
3362        let own_book = self
3363            .own_books
3364            .entry(instrument_id)
3365            .or_insert_with(|| OwnOrderBook::new(instrument_id));
3366
3367        let own_book_order = order.to_own_book_order();
3368
3369        if order.is_closed() {
3370            if let Err(e) = own_book.delete(own_book_order) {
3371                log::debug!(
3372                    "Failed to delete order {} from own book: {e}",
3373                    order.client_order_id(),
3374                );
3375            } else {
3376                log::debug!("Deleted order {} from own book", order.client_order_id());
3377            }
3378        } else {
3379            // Add or update the order in the own book
3380            if let Err(e) = own_book.update(own_book_order) {
3381                log::debug!(
3382                    "Failed to update order {} in own book: {e}; inserting instead",
3383                    order.client_order_id(),
3384                );
3385                own_book.add(own_book_order);
3386            }
3387            log::debug!("Updated order {} in own book", order.client_order_id());
3388        }
3389    }
3390
3391    /// Force removal of an order from own order books and clean up all indexes.
3392    ///
3393    /// This method is used when order event application fails and we need to ensure
3394    /// terminal orders are properly cleaned up from own books and all relevant indexes.
3395    /// Replicates the index cleanup that update_order performs for closed orders.
3396    pub fn force_remove_from_own_order_book(&mut self, client_order_id: &ClientOrderId) {
3397        let order = match self.orders.get(client_order_id) {
3398            Some(order) => order,
3399            None => return,
3400        };
3401
3402        self.index.orders_open.remove(client_order_id);
3403        self.index.orders_pending_cancel.remove(client_order_id);
3404        self.index.orders_inflight.remove(client_order_id);
3405        self.index.orders_emulated.remove(client_order_id);
3406
3407        if let Some(own_book) = self.own_books.get_mut(&order.instrument_id())
3408            && order.has_price()
3409        {
3410            let own_book_order = order.to_own_book_order();
3411            if let Err(e) = own_book.delete(own_book_order) {
3412                log::debug!("Could not force delete {client_order_id} from own book: {e}");
3413            } else {
3414                log::debug!("Force deleted {client_order_id} from own book");
3415            }
3416        }
3417
3418        self.index.orders_closed.insert(*client_order_id);
3419    }
3420
3421    /// Audit all own order books against open and inflight order indexes.
3422    ///
3423    /// Ensures closed orders are removed from own order books. This includes both
3424    /// orders tracked in `orders_open` (ACCEPTED, TRIGGERED, PENDING_*, PARTIALLY_FILLED)
3425    /// and `orders_inflight` (INITIALIZED, SUBMITTED) to prevent false positives
3426    /// during venue latency windows.
3427    pub fn audit_own_order_books(&mut self) {
3428        log::debug!("Starting own books audit");
3429        let start = std::time::Instant::now();
3430
3431        // Build union of open and inflight orders for audit,
3432        // this prevents false positives for SUBMITTED orders during venue latency.
3433        let valid_order_ids: AHashSet<ClientOrderId> = self
3434            .index
3435            .orders_open
3436            .union(&self.index.orders_inflight)
3437            .copied()
3438            .collect();
3439
3440        for own_book in self.own_books.values_mut() {
3441            own_book.audit_open_orders(&valid_order_ids);
3442        }
3443
3444        log::debug!("Completed own books audit in {:?}", start.elapsed());
3445    }
3446}