Skip to main content

nautilus_common/cache/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! In-memory cache for market and execution data, with optional persistent backing.
17//!
18//! Provides methods to load, query, and update cached data such as instruments, orders, and prices.
19
20pub mod config;
21pub mod database;
22pub mod fifo;
23pub mod quote;
24
25mod index;
26
27#[cfg(test)]
28mod tests;
29
30use std::{
31    collections::VecDeque,
32    fmt::{Debug, Display},
33    time::{SystemTime, UNIX_EPOCH},
34};
35
36use ahash::{AHashMap, AHashSet};
37use bytes::Bytes;
38pub use config::CacheConfig; // Re-export
39use database::{CacheDatabaseAdapter, CacheMap};
40use index::CacheIndex;
41use nautilus_core::{
42    UUID4, UnixNanos,
43    correctness::{
44        check_key_not_in_map, check_predicate_false, check_slice_not_empty,
45        check_valid_string_ascii,
46    },
47    datetime::secs_to_nanos_unchecked,
48};
49use nautilus_model::{
50    accounts::{Account, AccountAny},
51    data::{
52        Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate, QuoteTick,
53        TradeTick, YieldCurveData, option_chain::OptionGreeks,
54    },
55    enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
56    identifiers::{
57        AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
58        OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
59    },
60    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
61    orderbook::{
62        OrderBook,
63        own::{OwnOrderBook, should_handle_own_book_order},
64    },
65    orders::{Order, OrderAny, OrderList},
66    position::Position,
67    types::{Currency, Money, Price, Quantity},
68};
69use ustr::Ustr;
70
71use crate::xrate::get_exchange_rate;
72
73/// A common in-memory `Cache` for market and execution related data.
74#[cfg_attr(
75    feature = "python",
76    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
77)]
78pub struct Cache {
79    config: CacheConfig,
80    index: CacheIndex,
81    database: Option<Box<dyn CacheDatabaseAdapter>>,
82    general: AHashMap<String, Bytes>,
83    currencies: AHashMap<Ustr, Currency>,
84    instruments: AHashMap<InstrumentId, InstrumentAny>,
85    synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
86    books: AHashMap<InstrumentId, OrderBook>,
87    own_books: AHashMap<InstrumentId, OwnOrderBook>,
88    quotes: AHashMap<InstrumentId, VecDeque<QuoteTick>>,
89    trades: AHashMap<InstrumentId, VecDeque<TradeTick>>,
90    mark_xrates: AHashMap<(Currency, Currency), f64>,
91    mark_prices: AHashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
92    index_prices: AHashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
93    funding_rates: AHashMap<InstrumentId, VecDeque<FundingRateUpdate>>,
94    bars: AHashMap<BarType, VecDeque<Bar>>,
95    greeks: AHashMap<InstrumentId, GreeksData>,
96    option_greeks: AHashMap<InstrumentId, OptionGreeks>,
97    yield_curves: AHashMap<String, YieldCurveData>,
98    accounts: AHashMap<AccountId, AccountAny>,
99    orders: AHashMap<ClientOrderId, OrderAny>,
100    order_lists: AHashMap<OrderListId, OrderList>,
101    positions: AHashMap<PositionId, Position>,
102    position_snapshots: AHashMap<PositionId, Bytes>,
103    #[cfg(feature = "defi")]
104    pub(crate) defi: crate::defi::cache::DefiCache,
105}
106
107impl Debug for Cache {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        f.debug_struct(stringify!(Cache))
110            .field("config", &self.config)
111            .field("index", &self.index)
112            .field("general", &self.general)
113            .field("currencies", &self.currencies)
114            .field("instruments", &self.instruments)
115            .field("synthetics", &self.synthetics)
116            .field("books", &self.books)
117            .field("own_books", &self.own_books)
118            .field("quotes", &self.quotes)
119            .field("trades", &self.trades)
120            .field("mark_xrates", &self.mark_xrates)
121            .field("mark_prices", &self.mark_prices)
122            .field("index_prices", &self.index_prices)
123            .field("funding_rates", &self.funding_rates)
124            .field("bars", &self.bars)
125            .field("greeks", &self.greeks)
126            .field("option_greeks", &self.option_greeks)
127            .field("yield_curves", &self.yield_curves)
128            .field("accounts", &self.accounts)
129            .field("orders", &self.orders)
130            .field("order_lists", &self.order_lists)
131            .field("positions", &self.positions)
132            .field("position_snapshots", &self.position_snapshots)
133            .finish()
134    }
135}
136
137impl Default for Cache {
138    /// Creates a new default [`Cache`] instance.
139    fn default() -> Self {
140        Self::new(Some(CacheConfig::default()), None)
141    }
142}
143
144impl Cache {
145    /// Creates a new [`Cache`] instance with optional configuration and database adapter.
146    #[must_use]
147    /// # Note
148    ///
149    /// Uses provided `CacheConfig` or defaults, and optional `CacheDatabaseAdapter` for persistence.
150    pub fn new(
151        config: Option<CacheConfig>,
152        database: Option<Box<dyn CacheDatabaseAdapter>>,
153    ) -> Self {
154        Self {
155            config: config.unwrap_or_default(),
156            index: CacheIndex::default(),
157            database,
158            general: AHashMap::new(),
159            currencies: AHashMap::new(),
160            instruments: AHashMap::new(),
161            synthetics: AHashMap::new(),
162            books: AHashMap::new(),
163            own_books: AHashMap::new(),
164            quotes: AHashMap::new(),
165            trades: AHashMap::new(),
166            mark_xrates: AHashMap::new(),
167            mark_prices: AHashMap::new(),
168            index_prices: AHashMap::new(),
169            funding_rates: AHashMap::new(),
170            bars: AHashMap::new(),
171            greeks: AHashMap::new(),
172            option_greeks: AHashMap::new(),
173            yield_curves: AHashMap::new(),
174            accounts: AHashMap::new(),
175            orders: AHashMap::new(),
176            order_lists: AHashMap::new(),
177            positions: AHashMap::new(),
178            position_snapshots: AHashMap::new(),
179            #[cfg(feature = "defi")]
180            defi: crate::defi::cache::DefiCache::default(),
181        }
182    }
183
184    /// Returns the cache instances memory address.
185    #[must_use]
186    pub fn memory_address(&self) -> String {
187        format!("{:?}", std::ptr::from_ref(self))
188    }
189
190    /// Sets the cache database adapter for persistence.
191    ///
192    /// This allows setting or replacing the database adapter after cache construction.
193    pub fn set_database(&mut self, database: Box<dyn CacheDatabaseAdapter>) {
194        let type_name = std::any::type_name_of_val(&*database);
195        log::info!("Cache database adapter set: {type_name}");
196        self.database = Some(database);
197    }
198
199    // -- COMMANDS --------------------------------------------------------------------------------
200
201    /// Clears and reloads general entries from the database into the cache.
202    ///
203    /// # Errors
204    ///
205    /// Returns an error if loading general cache data fails.
206    pub fn cache_general(&mut self) -> anyhow::Result<()> {
207        self.general = match &mut self.database {
208            Some(db) => db.load()?,
209            None => AHashMap::new(),
210        };
211
212        log::info!(
213            "Cached {} general object(s) from database",
214            self.general.len()
215        );
216        Ok(())
217    }
218
219    /// Loads all core caches (currencies, instruments, accounts, orders, positions) from the database.
220    ///
221    /// # Errors
222    ///
223    /// Returns an error if loading all cache data fails.
224    pub async fn cache_all(&mut self) -> anyhow::Result<()> {
225        let cache_map = match &self.database {
226            Some(db) => db.load_all().await?,
227            None => CacheMap::default(),
228        };
229
230        self.currencies = cache_map.currencies;
231        self.instruments = cache_map.instruments;
232        self.synthetics = cache_map.synthetics;
233        self.accounts = cache_map.accounts;
234        self.orders = cache_map.orders;
235        self.positions = cache_map.positions;
236        Ok(())
237    }
238
239    /// Clears and reloads the currency cache from the database.
240    ///
241    /// # Errors
242    ///
243    /// Returns an error if loading currencies cache fails.
244    pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
245        self.currencies = match &mut self.database {
246            Some(db) => db.load_currencies().await?,
247            None => AHashMap::new(),
248        };
249
250        log::info!("Cached {} currencies from database", self.general.len());
251        Ok(())
252    }
253
254    /// Clears and reloads the instrument cache from the database.
255    ///
256    /// # Errors
257    ///
258    /// Returns an error if loading instruments cache fails.
259    pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
260        self.instruments = match &mut self.database {
261            Some(db) => db.load_instruments().await?,
262            None => AHashMap::new(),
263        };
264
265        log::info!("Cached {} instruments from database", self.general.len());
266        Ok(())
267    }
268
269    /// Clears and reloads the synthetic instrument cache from the database.
270    ///
271    /// # Errors
272    ///
273    /// Returns an error if loading synthetic instruments cache fails.
274    pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
275        self.synthetics = match &mut self.database {
276            Some(db) => db.load_synthetics().await?,
277            None => AHashMap::new(),
278        };
279
280        log::info!(
281            "Cached {} synthetic instruments from database",
282            self.general.len()
283        );
284        Ok(())
285    }
286
287    /// Clears and reloads the account cache from the database.
288    ///
289    /// # Errors
290    ///
291    /// Returns an error if loading accounts cache fails.
292    pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
293        self.accounts = match &mut self.database {
294            Some(db) => db.load_accounts().await?,
295            None => AHashMap::new(),
296        };
297
298        log::info!(
299            "Cached {} synthetic instruments from database",
300            self.general.len()
301        );
302        Ok(())
303    }
304
305    /// Clears and reloads the order cache from the database.
306    ///
307    /// # Errors
308    ///
309    /// Returns an error if loading orders cache fails.
310    pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
311        self.orders = match &mut self.database {
312            Some(db) => db.load_orders().await?,
313            None => AHashMap::new(),
314        };
315
316        log::info!("Cached {} orders from database", self.general.len());
317        Ok(())
318    }
319
320    /// Clears and reloads the position cache from the database.
321    ///
322    /// # Errors
323    ///
324    /// Returns an error if loading positions cache fails.
325    pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
326        self.positions = match &mut self.database {
327            Some(db) => db.load_positions().await?,
328            None => AHashMap::new(),
329        };
330
331        log::info!("Cached {} positions from database", self.general.len());
332        Ok(())
333    }
334
335    /// Clears the current cache index and re-build.
336    pub fn build_index(&mut self) {
337        log::debug!("Building index");
338
339        // Index accounts
340        for account_id in self.accounts.keys() {
341            self.index
342                .venue_account
343                .insert(account_id.get_issuer(), *account_id);
344        }
345
346        // Index orders
347        for (client_order_id, order) in &self.orders {
348            let instrument_id = order.instrument_id();
349            let venue = instrument_id.venue;
350            let strategy_id = order.strategy_id();
351
352            // 1: Build index.venue_orders -> {Venue, {ClientOrderId}}
353            self.index
354                .venue_orders
355                .entry(venue)
356                .or_default()
357                .insert(*client_order_id);
358
359            // 2: Build index.order_ids -> {VenueOrderId, ClientOrderId}
360            if let Some(venue_order_id) = order.venue_order_id() {
361                self.index
362                    .venue_order_ids
363                    .insert(venue_order_id, *client_order_id);
364            }
365
366            // 3: Build index.order_position -> {ClientOrderId, PositionId}
367            if let Some(position_id) = order.position_id() {
368                self.index
369                    .order_position
370                    .insert(*client_order_id, position_id);
371            }
372
373            // 4: Build index.order_strategy -> {ClientOrderId, StrategyId}
374            self.index
375                .order_strategy
376                .insert(*client_order_id, order.strategy_id());
377
378            // 5: Build index.instrument_orders -> {InstrumentId, {ClientOrderId}}
379            self.index
380                .instrument_orders
381                .entry(instrument_id)
382                .or_default()
383                .insert(*client_order_id);
384
385            // 6: Build index.strategy_orders -> {StrategyId, {ClientOrderId}}
386            self.index
387                .strategy_orders
388                .entry(strategy_id)
389                .or_default()
390                .insert(*client_order_id);
391
392            // 7: Build index.account_orders -> {AccountId, {ClientOrderId}}
393            if let Some(account_id) = order.account_id() {
394                self.index
395                    .account_orders
396                    .entry(account_id)
397                    .or_default()
398                    .insert(*client_order_id);
399            }
400
401            // 8: Build index.exec_algorithm_orders -> {ExecAlgorithmId, {ClientOrderId}}
402            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
403                self.index
404                    .exec_algorithm_orders
405                    .entry(exec_algorithm_id)
406                    .or_default()
407                    .insert(*client_order_id);
408            }
409
410            // 8: Build index.exec_spawn_orders -> {ClientOrderId, {ClientOrderId}}
411            if let Some(exec_spawn_id) = order.exec_spawn_id() {
412                self.index
413                    .exec_spawn_orders
414                    .entry(exec_spawn_id)
415                    .or_default()
416                    .insert(*client_order_id);
417            }
418
419            // 9: Build index.orders -> {ClientOrderId}
420            self.index.orders.insert(*client_order_id);
421
422            // 10: Build index.orders_active_local -> {ClientOrderId}
423            if order.is_active_local() {
424                self.index.orders_active_local.insert(*client_order_id);
425            }
426
427            // 11: Build index.orders_open -> {ClientOrderId}
428            if order.is_open() {
429                self.index.orders_open.insert(*client_order_id);
430            }
431
432            // 12: Build index.orders_closed -> {ClientOrderId}
433            if order.is_closed() {
434                self.index.orders_closed.insert(*client_order_id);
435            }
436
437            // 13: Build index.orders_emulated -> {ClientOrderId}
438            if let Some(emulation_trigger) = order.emulation_trigger()
439                && emulation_trigger != TriggerType::NoTrigger
440                && !order.is_closed()
441            {
442                self.index.orders_emulated.insert(*client_order_id);
443            }
444
445            // 14: Build index.orders_inflight -> {ClientOrderId}
446            if order.is_inflight() {
447                self.index.orders_inflight.insert(*client_order_id);
448            }
449
450            // 15: Build index.strategies -> {StrategyId}
451            self.index.strategies.insert(strategy_id);
452
453            // 16: Build index.strategies -> {ExecAlgorithmId}
454            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
455                self.index.exec_algorithms.insert(exec_algorithm_id);
456            }
457        }
458
459        // Index positions
460        for (position_id, position) in &self.positions {
461            let instrument_id = position.instrument_id;
462            let venue = instrument_id.venue;
463            let strategy_id = position.strategy_id;
464
465            // 1: Build index.venue_positions -> {Venue, {PositionId}}
466            self.index
467                .venue_positions
468                .entry(venue)
469                .or_default()
470                .insert(*position_id);
471
472            // 2: Build index.position_strategy -> {PositionId, StrategyId}
473            self.index
474                .position_strategy
475                .insert(*position_id, position.strategy_id);
476
477            // 3: Build index.position_orders -> {PositionId, {ClientOrderId}}
478            self.index
479                .position_orders
480                .entry(*position_id)
481                .or_default()
482                .extend(position.client_order_ids());
483
484            // 4: Build index.instrument_positions -> {InstrumentId, {PositionId}}
485            self.index
486                .instrument_positions
487                .entry(instrument_id)
488                .or_default()
489                .insert(*position_id);
490
491            // 5: Build index.strategy_positions -> {StrategyId, {PositionId}}
492            self.index
493                .strategy_positions
494                .entry(strategy_id)
495                .or_default()
496                .insert(*position_id);
497
498            // 6: Build index.account_positions -> {AccountId, {PositionId}}
499            self.index
500                .account_positions
501                .entry(position.account_id)
502                .or_default()
503                .insert(*position_id);
504
505            // 7: Build index.positions -> {PositionId}
506            self.index.positions.insert(*position_id);
507
508            // 8: Build index.positions_open -> {PositionId}
509            if position.is_open() {
510                self.index.positions_open.insert(*position_id);
511            }
512
513            // 9: Build index.positions_closed -> {PositionId}
514            if position.is_closed() {
515                self.index.positions_closed.insert(*position_id);
516            }
517
518            // 10: Build index.strategies -> {StrategyId}
519            self.index.strategies.insert(strategy_id);
520        }
521    }
522
523    /// Returns whether the cache has a backing database.
524    #[must_use]
525    pub const fn has_backing(&self) -> bool {
526        self.config.database.is_some()
527    }
528
529    // Calculate the unrealized profit and loss (PnL) for `position`.
530    #[must_use]
531    pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
532        let quote = if let Some(quote) = self.quote(&position.instrument_id) {
533            quote
534        } else {
535            log::warn!(
536                "Cannot calculate unrealized PnL for {}, no quotes for {}",
537                position.id,
538                position.instrument_id
539            );
540            return None;
541        };
542
543        // Use exit price for mark-to-market: longs exit at bid, shorts exit at ask
544        let last = match position.side {
545            PositionSide::Flat | PositionSide::NoPositionSide => {
546                return Some(Money::new(0.0, position.settlement_currency));
547            }
548            PositionSide::Long => quote.bid_price,
549            PositionSide::Short => quote.ask_price,
550        };
551
552        Some(position.unrealized_pnl(last))
553    }
554
555    /// Checks integrity of data within the cache.
556    ///
557    /// All data should be loaded from the database prior to this call.
558    /// If an error is found then a log error message will also be produced.
559    ///
560    /// # Panics
561    ///
562    /// Panics if failure calling system clock.
563    #[must_use]
564    pub fn check_integrity(&mut self) -> bool {
565        let mut error_count = 0;
566        let failure = "Integrity failure";
567
568        // Get current timestamp in microseconds
569        let timestamp_us = SystemTime::now()
570            .duration_since(UNIX_EPOCH)
571            .expect("Time went backwards")
572            .as_micros();
573
574        log::info!("Checking data integrity");
575
576        // Check object caches
577        for account_id in self.accounts.keys() {
578            if !self
579                .index
580                .venue_account
581                .contains_key(&account_id.get_issuer())
582            {
583                log::error!(
584                    "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
585                );
586                error_count += 1;
587            }
588        }
589
590        for (client_order_id, order) in &self.orders {
591            if !self.index.order_strategy.contains_key(client_order_id) {
592                log::error!(
593                    "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
594                );
595                error_count += 1;
596            }
597
598            if !self.index.orders.contains(client_order_id) {
599                log::error!(
600                    "{failure} in orders: {client_order_id} not found in `self.index.orders`",
601                );
602                error_count += 1;
603            }
604
605            if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
606                log::error!(
607                    "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
608                );
609                error_count += 1;
610            }
611
612            if order.is_active_local() && !self.index.orders_active_local.contains(client_order_id)
613            {
614                log::error!(
615                    "{failure} in orders: {client_order_id} not found in `self.index.orders_active_local`",
616                );
617                error_count += 1;
618            }
619
620            if order.is_open() && !self.index.orders_open.contains(client_order_id) {
621                log::error!(
622                    "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
623                );
624                error_count += 1;
625            }
626
627            if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
628                log::error!(
629                    "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
630                );
631                error_count += 1;
632            }
633
634            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
635                if !self
636                    .index
637                    .exec_algorithm_orders
638                    .contains_key(&exec_algorithm_id)
639                {
640                    log::error!(
641                        "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
642                    );
643                    error_count += 1;
644                }
645
646                if order.exec_spawn_id().is_none()
647                    && !self.index.exec_spawn_orders.contains_key(client_order_id)
648                {
649                    log::error!(
650                        "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
651                    );
652                    error_count += 1;
653                }
654            }
655        }
656
657        for (position_id, position) in &self.positions {
658            if !self.index.position_strategy.contains_key(position_id) {
659                log::error!(
660                    "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
661                );
662                error_count += 1;
663            }
664
665            if !self.index.position_orders.contains_key(position_id) {
666                log::error!(
667                    "{failure} in positions: {position_id} not found in `self.index.position_orders`",
668                );
669                error_count += 1;
670            }
671
672            if !self.index.positions.contains(position_id) {
673                log::error!(
674                    "{failure} in positions: {position_id} not found in `self.index.positions`",
675                );
676                error_count += 1;
677            }
678
679            if position.is_open() && !self.index.positions_open.contains(position_id) {
680                log::error!(
681                    "{failure} in positions: {position_id} not found in `self.index.positions_open`",
682                );
683                error_count += 1;
684            }
685
686            if position.is_closed() && !self.index.positions_closed.contains(position_id) {
687                log::error!(
688                    "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
689                );
690                error_count += 1;
691            }
692        }
693
694        // Check indexes
695        for account_id in self.index.venue_account.values() {
696            if !self.accounts.contains_key(account_id) {
697                log::error!(
698                    "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
699                );
700                error_count += 1;
701            }
702        }
703
704        for client_order_id in self.index.venue_order_ids.values() {
705            if !self.orders.contains_key(client_order_id) {
706                log::error!(
707                    "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
708                );
709                error_count += 1;
710            }
711        }
712
713        for client_order_id in self.index.client_order_ids.keys() {
714            if !self.orders.contains_key(client_order_id) {
715                log::error!(
716                    "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
717                );
718                error_count += 1;
719            }
720        }
721
722        for client_order_id in self.index.order_position.keys() {
723            if !self.orders.contains_key(client_order_id) {
724                log::error!(
725                    "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
726                );
727                error_count += 1;
728            }
729        }
730
731        // Check indexes
732        for client_order_id in self.index.order_strategy.keys() {
733            if !self.orders.contains_key(client_order_id) {
734                log::error!(
735                    "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
736                );
737                error_count += 1;
738            }
739        }
740
741        for position_id in self.index.position_strategy.keys() {
742            if !self.positions.contains_key(position_id) {
743                log::error!(
744                    "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
745                );
746                error_count += 1;
747            }
748        }
749
750        for position_id in self.index.position_orders.keys() {
751            if !self.positions.contains_key(position_id) {
752                log::error!(
753                    "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
754                );
755                error_count += 1;
756            }
757        }
758
759        for (instrument_id, client_order_ids) in &self.index.instrument_orders {
760            for client_order_id in client_order_ids {
761                if !self.orders.contains_key(client_order_id) {
762                    log::error!(
763                        "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
764                    );
765                    error_count += 1;
766                }
767            }
768        }
769
770        for instrument_id in self.index.instrument_positions.keys() {
771            if !self.index.instrument_orders.contains_key(instrument_id) {
772                log::error!(
773                    "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
774                );
775                error_count += 1;
776            }
777        }
778
779        for client_order_ids in self.index.strategy_orders.values() {
780            for client_order_id in client_order_ids {
781                if !self.orders.contains_key(client_order_id) {
782                    log::error!(
783                        "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
784                    );
785                    error_count += 1;
786                }
787            }
788        }
789
790        for position_ids in self.index.strategy_positions.values() {
791            for position_id in position_ids {
792                if !self.positions.contains_key(position_id) {
793                    log::error!(
794                        "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
795                    );
796                    error_count += 1;
797                }
798            }
799        }
800
801        for client_order_id in &self.index.orders {
802            if !self.orders.contains_key(client_order_id) {
803                log::error!(
804                    "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
805                );
806                error_count += 1;
807            }
808        }
809
810        for client_order_id in &self.index.orders_emulated {
811            if !self.orders.contains_key(client_order_id) {
812                log::error!(
813                    "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
814                );
815                error_count += 1;
816            }
817        }
818
819        for client_order_id in &self.index.orders_active_local {
820            if !self.orders.contains_key(client_order_id) {
821                log::error!(
822                    "{failure} in `index.orders_active_local`: {client_order_id} not found in `self.orders`",
823                );
824                error_count += 1;
825            }
826        }
827
828        for client_order_id in &self.index.orders_inflight {
829            if !self.orders.contains_key(client_order_id) {
830                log::error!(
831                    "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
832                );
833                error_count += 1;
834            }
835        }
836
837        for client_order_id in &self.index.orders_open {
838            if !self.orders.contains_key(client_order_id) {
839                log::error!(
840                    "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
841                );
842                error_count += 1;
843            }
844        }
845
846        for client_order_id in &self.index.orders_closed {
847            if !self.orders.contains_key(client_order_id) {
848                log::error!(
849                    "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
850                );
851                error_count += 1;
852            }
853        }
854
855        for position_id in &self.index.positions {
856            if !self.positions.contains_key(position_id) {
857                log::error!(
858                    "{failure} in `index.positions`: {position_id} not found in `self.positions`",
859                );
860                error_count += 1;
861            }
862        }
863
864        for position_id in &self.index.positions_open {
865            if !self.positions.contains_key(position_id) {
866                log::error!(
867                    "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
868                );
869                error_count += 1;
870            }
871        }
872
873        for position_id in &self.index.positions_closed {
874            if !self.positions.contains_key(position_id) {
875                log::error!(
876                    "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
877                );
878                error_count += 1;
879            }
880        }
881
882        for strategy_id in &self.index.strategies {
883            if !self.index.strategy_orders.contains_key(strategy_id) {
884                log::error!(
885                    "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
886                );
887                error_count += 1;
888            }
889        }
890
891        for exec_algorithm_id in &self.index.exec_algorithms {
892            if !self
893                .index
894                .exec_algorithm_orders
895                .contains_key(exec_algorithm_id)
896            {
897                log::error!(
898                    "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
899                );
900                error_count += 1;
901            }
902        }
903
904        let total_us = SystemTime::now()
905            .duration_since(UNIX_EPOCH)
906            .expect("Time went backwards")
907            .as_micros()
908            - timestamp_us;
909
910        if error_count == 0 {
911            log::info!("Integrity check passed in {total_us}μs");
912            true
913        } else {
914            log::error!(
915                "Integrity check failed with {error_count} error{} in {total_us}μs",
916                if error_count == 1 { "" } else { "s" },
917            );
918            false
919        }
920    }
921
922    /// Checks for any residual open state and log warnings if any are found.
923    ///
924    ///'Open state' is considered to be open orders and open positions.
925    #[must_use]
926    pub fn check_residuals(&self) -> bool {
927        log::debug!("Checking residuals");
928
929        let mut residuals = false;
930
931        // Check for any open orders
932        for order in self.orders_open(None, None, None, None, None) {
933            residuals = true;
934            log::warn!("Residual {order}");
935        }
936
937        // Check for any open positions
938        for position in self.positions_open(None, None, None, None, None) {
939            residuals = true;
940            log::warn!("Residual {position}");
941        }
942
943        residuals
944    }
945
946    /// Purges all closed orders from the cache that are older than `buffer_secs`.
947    ///
948    ///
949    /// Only orders that have been closed for at least this amount of time will be purged.
950    /// A value of 0 means purge all closed orders regardless of when they were closed.
951    pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
952        log::debug!(
953            "Purging closed orders{}",
954            if buffer_secs > 0 {
955                format!(" with buffer_secs={buffer_secs}")
956            } else {
957                String::new()
958            }
959        );
960
961        let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
962
963        let mut affected_order_list_ids: AHashSet<OrderListId> = AHashSet::new();
964
965        'outer: for client_order_id in self.index.orders_closed.clone() {
966            if let Some(order) = self.orders.get(&client_order_id)
967                && order.is_closed()
968                && let Some(ts_closed) = order.ts_closed()
969                && ts_closed + buffer_ns <= ts_now
970            {
971                // Check any linked orders (contingency orders)
972                if let Some(linked_order_ids) = order.linked_order_ids() {
973                    for linked_order_id in linked_order_ids {
974                        if let Some(linked_order) = self.orders.get(linked_order_id)
975                            && linked_order.is_open()
976                        {
977                            // Do not purge if linked order still open
978                            continue 'outer;
979                        }
980                    }
981                }
982
983                if let Some(order_list_id) = order.order_list_id() {
984                    affected_order_list_ids.insert(order_list_id);
985                }
986
987                self.purge_order(client_order_id);
988            }
989        }
990
991        for order_list_id in affected_order_list_ids {
992            if let Some(order_list) = self.order_lists.get(&order_list_id) {
993                let all_purged = order_list
994                    .client_order_ids
995                    .iter()
996                    .all(|id| !self.orders.contains_key(id));
997
998                if all_purged {
999                    self.order_lists.remove(&order_list_id);
1000                    log::info!("Purged {order_list_id}");
1001                }
1002            }
1003        }
1004    }
1005
1006    /// Purges all closed positions from the cache that are older than `buffer_secs`.
1007    pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
1008        log::debug!(
1009            "Purging closed positions{}",
1010            if buffer_secs > 0 {
1011                format!(" with buffer_secs={buffer_secs}")
1012            } else {
1013                String::new()
1014            }
1015        );
1016
1017        let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
1018
1019        for position_id in self.index.positions_closed.clone() {
1020            if let Some(position) = self.positions.get(&position_id)
1021                && position.is_closed()
1022                && let Some(ts_closed) = position.ts_closed
1023                && ts_closed + buffer_ns <= ts_now
1024            {
1025                self.purge_position(position_id);
1026            }
1027        }
1028    }
1029
1030    /// Purges the order with the `client_order_id` from the cache (if found).
1031    ///
1032    /// For safety, an order is prevented from being purged if it's open.
1033    pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
1034        // Check if order exists and is safe to purge before removing
1035        let order = self.orders.get(&client_order_id).cloned();
1036
1037        // Prevent purging open orders
1038        if let Some(ref ord) = order
1039            && ord.is_open()
1040        {
1041            log::warn!("Order {client_order_id} found open when purging, skipping purge");
1042            return;
1043        }
1044
1045        // If order exists in cache, remove it and clean up order-specific indices
1046        if let Some(ref ord) = order {
1047            // Safe to purge
1048            self.orders.remove(&client_order_id);
1049
1050            // Remove order from venue index
1051            if let Some(venue_orders) = self.index.venue_orders.get_mut(&ord.instrument_id().venue)
1052            {
1053                venue_orders.remove(&client_order_id);
1054                if venue_orders.is_empty() {
1055                    self.index.venue_orders.remove(&ord.instrument_id().venue);
1056                }
1057            }
1058
1059            // Remove venue order ID index if exists
1060            if let Some(venue_order_id) = ord.venue_order_id() {
1061                self.index.venue_order_ids.remove(&venue_order_id);
1062            }
1063
1064            // Remove from instrument orders index
1065            if let Some(instrument_orders) =
1066                self.index.instrument_orders.get_mut(&ord.instrument_id())
1067            {
1068                instrument_orders.remove(&client_order_id);
1069                if instrument_orders.is_empty() {
1070                    self.index.instrument_orders.remove(&ord.instrument_id());
1071                }
1072            }
1073
1074            // Remove from position orders index if associated with a position
1075            if let Some(position_id) = ord.position_id()
1076                && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
1077            {
1078                position_orders.remove(&client_order_id);
1079                if position_orders.is_empty() {
1080                    self.index.position_orders.remove(&position_id);
1081                }
1082            }
1083
1084            // Remove from exec algorithm orders index if it has an exec algorithm
1085            if let Some(exec_algorithm_id) = ord.exec_algorithm_id()
1086                && let Some(exec_algorithm_orders) =
1087                    self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
1088            {
1089                exec_algorithm_orders.remove(&client_order_id);
1090                if exec_algorithm_orders.is_empty() {
1091                    self.index.exec_algorithm_orders.remove(&exec_algorithm_id);
1092                }
1093            }
1094
1095            // Clean up strategy orders reverse index
1096            if let Some(strategy_orders) = self.index.strategy_orders.get_mut(&ord.strategy_id()) {
1097                strategy_orders.remove(&client_order_id);
1098                if strategy_orders.is_empty() {
1099                    self.index.strategy_orders.remove(&ord.strategy_id());
1100                }
1101            }
1102
1103            // Clean up account orders index
1104            if let Some(account_id) = ord.account_id()
1105                && let Some(account_orders) = self.index.account_orders.get_mut(&account_id)
1106            {
1107                account_orders.remove(&client_order_id);
1108                if account_orders.is_empty() {
1109                    self.index.account_orders.remove(&account_id);
1110                }
1111            }
1112
1113            // Clean up exec spawn reverse index (if this order is a spawned child)
1114            if let Some(exec_spawn_id) = ord.exec_spawn_id()
1115                && let Some(spawn_orders) = self.index.exec_spawn_orders.get_mut(&exec_spawn_id)
1116            {
1117                spawn_orders.remove(&client_order_id);
1118                if spawn_orders.is_empty() {
1119                    self.index.exec_spawn_orders.remove(&exec_spawn_id);
1120                }
1121            }
1122
1123            log::info!("Purged order {client_order_id}");
1124        } else {
1125            log::warn!("Order {client_order_id} not found when purging");
1126        }
1127
1128        // Always clean up order indices (even if order was not in cache)
1129        self.index.order_position.remove(&client_order_id);
1130        let strategy_id = self.index.order_strategy.remove(&client_order_id);
1131        self.index.order_client.remove(&client_order_id);
1132        self.index.client_order_ids.remove(&client_order_id);
1133
1134        // Clean up reverse index when order not in cache (using forward index)
1135        if let Some(strategy_id) = strategy_id
1136            && let Some(strategy_orders) = self.index.strategy_orders.get_mut(&strategy_id)
1137        {
1138            strategy_orders.remove(&client_order_id);
1139            if strategy_orders.is_empty() {
1140                self.index.strategy_orders.remove(&strategy_id);
1141            }
1142        }
1143
1144        // Remove spawn parent entry if this order was a spawn root
1145        self.index.exec_spawn_orders.remove(&client_order_id);
1146
1147        self.index.orders.remove(&client_order_id);
1148        self.index.orders_active_local.remove(&client_order_id);
1149        self.index.orders_open.remove(&client_order_id);
1150        self.index.orders_closed.remove(&client_order_id);
1151        self.index.orders_emulated.remove(&client_order_id);
1152        self.index.orders_inflight.remove(&client_order_id);
1153        self.index.orders_pending_cancel.remove(&client_order_id);
1154    }
1155
1156    /// Purges the position with the `position_id` from the cache (if found).
1157    ///
1158    /// For safety, a position is prevented from being purged if it's open.
1159    pub fn purge_position(&mut self, position_id: PositionId) {
1160        // Check if position exists and is safe to purge before removing
1161        let position = self.positions.get(&position_id).cloned();
1162
1163        // Prevent purging open positions
1164        if let Some(ref pos) = position
1165            && pos.is_open()
1166        {
1167            log::warn!("Position {position_id} found open when purging, skipping purge");
1168            return;
1169        }
1170
1171        // If position exists in cache, remove it and clean up position-specific indices
1172        if let Some(ref pos) = position {
1173            self.positions.remove(&position_id);
1174
1175            // Remove from venue positions index
1176            if let Some(venue_positions) =
1177                self.index.venue_positions.get_mut(&pos.instrument_id.venue)
1178            {
1179                venue_positions.remove(&position_id);
1180                if venue_positions.is_empty() {
1181                    self.index.venue_positions.remove(&pos.instrument_id.venue);
1182                }
1183            }
1184
1185            // Remove from instrument positions index
1186            if let Some(instrument_positions) =
1187                self.index.instrument_positions.get_mut(&pos.instrument_id)
1188            {
1189                instrument_positions.remove(&position_id);
1190                if instrument_positions.is_empty() {
1191                    self.index.instrument_positions.remove(&pos.instrument_id);
1192                }
1193            }
1194
1195            // Remove from strategy positions index
1196            if let Some(strategy_positions) =
1197                self.index.strategy_positions.get_mut(&pos.strategy_id)
1198            {
1199                strategy_positions.remove(&position_id);
1200                if strategy_positions.is_empty() {
1201                    self.index.strategy_positions.remove(&pos.strategy_id);
1202                }
1203            }
1204
1205            // Remove from account positions index
1206            if let Some(account_positions) = self.index.account_positions.get_mut(&pos.account_id) {
1207                account_positions.remove(&position_id);
1208                if account_positions.is_empty() {
1209                    self.index.account_positions.remove(&pos.account_id);
1210                }
1211            }
1212
1213            // Remove position ID from orders that reference it
1214            for client_order_id in pos.client_order_ids() {
1215                self.index.order_position.remove(&client_order_id);
1216            }
1217
1218            log::info!("Purged position {position_id}");
1219        } else {
1220            log::warn!("Position {position_id} not found when purging");
1221        }
1222
1223        // Always clean up position indices (even if position not in cache)
1224        self.index.position_strategy.remove(&position_id);
1225        self.index.position_orders.remove(&position_id);
1226        self.index.positions.remove(&position_id);
1227        self.index.positions_open.remove(&position_id);
1228        self.index.positions_closed.remove(&position_id);
1229
1230        // Always clean up position snapshots (even if position not in cache)
1231        self.position_snapshots.remove(&position_id);
1232    }
1233
1234    /// Purges all account state events which are outside the lookback window.
1235    ///
1236    /// Only events which are outside the lookback window will be purged.
1237    /// A value of 0 means purge all account state events.
1238    pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1239        log::debug!(
1240            "Purging account events{}",
1241            if lookback_secs > 0 {
1242                format!(" with lookback_secs={lookback_secs}")
1243            } else {
1244                String::new()
1245            }
1246        );
1247
1248        for account in self.accounts.values_mut() {
1249            let event_count = account.event_count();
1250            account.purge_account_events(ts_now, lookback_secs);
1251            let count_diff = event_count - account.event_count();
1252            if count_diff > 0 {
1253                log::info!(
1254                    "Purged {} event(s) from account {}",
1255                    count_diff,
1256                    account.id()
1257                );
1258            }
1259        }
1260    }
1261
1262    /// Clears the caches index.
1263    pub fn clear_index(&mut self) {
1264        self.index.clear();
1265        log::debug!("Cleared index");
1266    }
1267
1268    /// Resets the cache.
1269    ///
1270    /// All stateful fields are reset to their initial value.
1271    pub fn reset(&mut self) {
1272        log::debug!("Resetting cache");
1273
1274        self.general.clear();
1275        self.currencies.clear();
1276        self.instruments.clear();
1277        self.synthetics.clear();
1278        self.books.clear();
1279        self.own_books.clear();
1280        self.quotes.clear();
1281        self.trades.clear();
1282        self.mark_xrates.clear();
1283        self.mark_prices.clear();
1284        self.index_prices.clear();
1285        self.funding_rates.clear();
1286        self.bars.clear();
1287        self.accounts.clear();
1288        self.orders.clear();
1289        self.order_lists.clear();
1290        self.positions.clear();
1291        self.position_snapshots.clear();
1292        self.greeks.clear();
1293        self.yield_curves.clear();
1294
1295        #[cfg(feature = "defi")]
1296        {
1297            self.defi.pools.clear();
1298            self.defi.pool_profilers.clear();
1299        }
1300
1301        self.clear_index();
1302
1303        log::info!("Reset cache");
1304    }
1305
1306    /// Dispose of the cache which will close any underlying database adapter.
1307    ///
1308    /// If closing the database connection fails, an error is logged.
1309    pub fn dispose(&mut self) {
1310        self.reset();
1311
1312        if let Some(database) = &mut self.database
1313            && let Err(e) = database.close()
1314        {
1315            log::error!("Failed to close database during dispose: {e}");
1316        }
1317    }
1318
1319    /// Flushes the caches database which permanently removes all persisted data.
1320    ///
1321    /// If flushing the database connection fails, an error is logged.
1322    pub fn flush_db(&mut self) {
1323        if let Some(database) = &mut self.database
1324            && let Err(e) = database.flush()
1325        {
1326            log::error!("Failed to flush database: {e}");
1327        }
1328    }
1329
1330    /// Adds a raw bytes `value` to the cache under the `key`.
1331    ///
1332    /// The cache stores only raw bytes; interpretation is the caller's responsibility.
1333    ///
1334    /// # Errors
1335    ///
1336    /// Returns an error if persisting the entry to the backing database fails.
1337    pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1338        check_valid_string_ascii(key, stringify!(key))?;
1339        check_predicate_false(value.is_empty(), stringify!(value))?;
1340
1341        log::debug!("Adding general {key}");
1342        self.general.insert(key.to_string(), value.clone());
1343
1344        if let Some(database) = &mut self.database {
1345            database.add(key.to_string(), value)?;
1346        }
1347        Ok(())
1348    }
1349
1350    /// Adds an `OrderBook` to the cache.
1351    ///
1352    /// # Errors
1353    ///
1354    /// Returns an error if persisting the order book to the backing database fails.
1355    pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1356        log::debug!("Adding `OrderBook` {}", book.instrument_id);
1357
1358        if self.config.save_market_data
1359            && let Some(database) = &mut self.database
1360        {
1361            database.add_order_book(&book)?;
1362        }
1363
1364        self.books.insert(book.instrument_id, book);
1365        Ok(())
1366    }
1367
1368    /// Adds an `OwnOrderBook` to the cache.
1369    ///
1370    /// # Errors
1371    ///
1372    /// Returns an error if persisting the own order book fails.
1373    pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1374        log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1375
1376        self.own_books.insert(own_book.instrument_id, own_book);
1377        Ok(())
1378    }
1379
1380    /// Adds the `mark_price` update to the cache.
1381    ///
1382    /// # Errors
1383    ///
1384    /// Returns an error if persisting the mark price to the backing database fails.
1385    pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1386        log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1387
1388        if self.config.save_market_data {
1389            // TODO: Placeholder and return Result for consistency
1390        }
1391
1392        let mark_prices_deque = self
1393            .mark_prices
1394            .entry(mark_price.instrument_id)
1395            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1396        mark_prices_deque.push_front(mark_price);
1397        Ok(())
1398    }
1399
1400    /// Adds the `index_price` update to the cache.
1401    ///
1402    /// # Errors
1403    ///
1404    /// Returns an error if persisting the index price to the backing database fails.
1405    pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1406        log::debug!(
1407            "Adding `IndexPriceUpdate` for {}",
1408            index_price.instrument_id
1409        );
1410
1411        if self.config.save_market_data {
1412            // TODO: Placeholder and return Result for consistency
1413        }
1414
1415        let index_prices_deque = self
1416            .index_prices
1417            .entry(index_price.instrument_id)
1418            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1419        index_prices_deque.push_front(index_price);
1420        Ok(())
1421    }
1422
1423    /// Adds the `funding_rate` update to the cache.
1424    ///
1425    /// # Errors
1426    ///
1427    /// Returns an error if persisting the funding rate update to the backing database fails.
1428    pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
1429        log::debug!(
1430            "Adding `FundingRateUpdate` for {}",
1431            funding_rate.instrument_id
1432        );
1433
1434        if self.config.save_market_data {
1435            // TODO: Placeholder and return Result for consistency
1436        }
1437
1438        let funding_rates_deque = self
1439            .funding_rates
1440            .entry(funding_rate.instrument_id)
1441            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1442        funding_rates_deque.push_front(funding_rate);
1443        Ok(())
1444    }
1445
1446    /// Adds the given `funding rates` to the cache.
1447    ///
1448    /// # Errors
1449    ///
1450    /// Returns an error if persisting the trade ticks to the backing database fails.
1451    pub fn add_funding_rates(&mut self, funding_rates: &[FundingRateUpdate]) -> anyhow::Result<()> {
1452        check_slice_not_empty(funding_rates, stringify!(funding_rates))?;
1453
1454        let instrument_id = funding_rates[0].instrument_id;
1455        log::debug!(
1456            "Adding `FundingRateUpdate`[{}] {instrument_id}",
1457            funding_rates.len()
1458        );
1459
1460        if self.config.save_market_data
1461            && let Some(database) = &mut self.database
1462        {
1463            for funding_rate in funding_rates {
1464                database.add_funding_rate(funding_rate)?;
1465            }
1466        }
1467
1468        let funding_rate_deque = self
1469            .funding_rates
1470            .entry(instrument_id)
1471            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1472
1473        for funding_rate in funding_rates {
1474            funding_rate_deque.push_front(*funding_rate);
1475        }
1476        Ok(())
1477    }
1478
1479    /// Adds the `quote` tick to the cache.
1480    ///
1481    /// # Errors
1482    ///
1483    /// Returns an error if persisting the quote tick to the backing database fails.
1484    pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1485        log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1486
1487        if self.config.save_market_data
1488            && let Some(database) = &mut self.database
1489        {
1490            database.add_quote(&quote)?;
1491        }
1492
1493        let quotes_deque = self
1494            .quotes
1495            .entry(quote.instrument_id)
1496            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1497        quotes_deque.push_front(quote);
1498        Ok(())
1499    }
1500
1501    /// Adds the `quotes` to the cache.
1502    ///
1503    /// # Errors
1504    ///
1505    /// Returns an error if persisting the quote ticks to the backing database fails.
1506    pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1507        check_slice_not_empty(quotes, stringify!(quotes))?;
1508
1509        let instrument_id = quotes[0].instrument_id;
1510        log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1511
1512        if self.config.save_market_data
1513            && let Some(database) = &mut self.database
1514        {
1515            for quote in quotes {
1516                database.add_quote(quote)?;
1517            }
1518        }
1519
1520        let quotes_deque = self
1521            .quotes
1522            .entry(instrument_id)
1523            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1524
1525        for quote in quotes {
1526            quotes_deque.push_front(*quote);
1527        }
1528        Ok(())
1529    }
1530
1531    /// Adds the `trade` tick to the cache.
1532    ///
1533    /// # Errors
1534    ///
1535    /// Returns an error if persisting the trade tick to the backing database fails.
1536    pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1537        log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1538
1539        if self.config.save_market_data
1540            && let Some(database) = &mut self.database
1541        {
1542            database.add_trade(&trade)?;
1543        }
1544
1545        let trades_deque = self
1546            .trades
1547            .entry(trade.instrument_id)
1548            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1549        trades_deque.push_front(trade);
1550        Ok(())
1551    }
1552
1553    /// Adds the give `trades` to the cache.
1554    ///
1555    /// # Errors
1556    ///
1557    /// Returns an error if persisting the trade ticks to the backing database fails.
1558    pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1559        check_slice_not_empty(trades, stringify!(trades))?;
1560
1561        let instrument_id = trades[0].instrument_id;
1562        log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1563
1564        if self.config.save_market_data
1565            && let Some(database) = &mut self.database
1566        {
1567            for trade in trades {
1568                database.add_trade(trade)?;
1569            }
1570        }
1571
1572        let trades_deque = self
1573            .trades
1574            .entry(instrument_id)
1575            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1576
1577        for trade in trades {
1578            trades_deque.push_front(*trade);
1579        }
1580        Ok(())
1581    }
1582
1583    /// Adds the `bar` to the cache.
1584    ///
1585    /// # Errors
1586    ///
1587    /// Returns an error if persisting the bar to the backing database fails.
1588    pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1589        log::debug!("Adding `Bar` {}", bar.bar_type);
1590
1591        if self.config.save_market_data
1592            && let Some(database) = &mut self.database
1593        {
1594            database.add_bar(&bar)?;
1595        }
1596
1597        let bars = self
1598            .bars
1599            .entry(bar.bar_type)
1600            .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1601        bars.push_front(bar);
1602        Ok(())
1603    }
1604
1605    /// Adds the `bars` to the cache.
1606    ///
1607    /// # Errors
1608    ///
1609    /// Returns an error if persisting the bars to the backing database fails.
1610    pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1611        check_slice_not_empty(bars, stringify!(bars))?;
1612
1613        let bar_type = bars[0].bar_type;
1614        log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1615
1616        if self.config.save_market_data
1617            && let Some(database) = &mut self.database
1618        {
1619            for bar in bars {
1620                database.add_bar(bar)?;
1621            }
1622        }
1623
1624        let bars_deque = self
1625            .bars
1626            .entry(bar_type)
1627            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1628
1629        for bar in bars {
1630            bars_deque.push_front(*bar);
1631        }
1632        Ok(())
1633    }
1634
1635    /// Adds the `greeks` data to the cache.
1636    ///
1637    /// # Errors
1638    ///
1639    /// Returns an error if persisting the greeks data to the backing database fails.
1640    pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1641        log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1642
1643        if self.config.save_market_data
1644            && let Some(_database) = &mut self.database
1645        {
1646            // TODO: Implement database.add_greeks(&greeks) when database adapter is updated
1647        }
1648
1649        self.greeks.insert(greeks.instrument_id, greeks);
1650        Ok(())
1651    }
1652
1653    /// Gets the greeks data for the `instrument_id`.
1654    pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1655        self.greeks.get(instrument_id).cloned()
1656    }
1657
1658    /// Adds exchange-provided option greeks to the cache.
1659    pub fn add_option_greeks(&mut self, greeks: OptionGreeks) {
1660        log::debug!("Adding `OptionGreeks` {}", greeks.instrument_id);
1661        self.option_greeks.insert(greeks.instrument_id, greeks);
1662    }
1663
1664    /// Gets a reference to the exchange-provided option greeks for the `instrument_id`.
1665    #[must_use]
1666    pub fn option_greeks(&self, instrument_id: &InstrumentId) -> Option<&OptionGreeks> {
1667        self.option_greeks.get(instrument_id)
1668    }
1669
1670    /// Adds the `yield_curve` data to the cache.
1671    ///
1672    /// # Errors
1673    ///
1674    /// Returns an error if persisting the yield curve data to the backing database fails.
1675    pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1676        log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1677
1678        if self.config.save_market_data
1679            && let Some(_database) = &mut self.database
1680        {
1681            // TODO: Implement database.add_yield_curve(&yield_curve) when database adapter is updated
1682        }
1683
1684        self.yield_curves
1685            .insert(yield_curve.curve_name.clone(), yield_curve);
1686        Ok(())
1687    }
1688
1689    /// Gets the yield curve for the `key`.
1690    pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1691        self.yield_curves.get(key).map(|curve| {
1692            let curve_clone = curve.clone();
1693            Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
1694                as Box<dyn Fn(f64) -> f64>
1695        })
1696    }
1697
1698    /// Adds the `currency` to the cache.
1699    ///
1700    /// # Errors
1701    ///
1702    /// Returns an error if persisting the currency to the backing database fails.
1703    pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1704        if self.currencies.contains_key(&currency.code) {
1705            return Ok(());
1706        }
1707        log::debug!("Adding `Currency` {}", currency.code);
1708
1709        if let Some(database) = &mut self.database {
1710            database.add_currency(&currency)?;
1711        }
1712
1713        self.currencies.insert(currency.code, currency);
1714        Ok(())
1715    }
1716
1717    /// Adds the `instrument` to the cache.
1718    ///
1719    /// # Errors
1720    ///
1721    /// Returns an error if persisting the instrument to the backing database fails.
1722    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1723        log::debug!("Adding `Instrument` {}", instrument.id());
1724
1725        // Ensure currencies exist in cache - safe to call repeatedly as add_currency is idempotent
1726        if let Some(base_currency) = instrument.base_currency() {
1727            self.add_currency(base_currency)?;
1728        }
1729        self.add_currency(instrument.quote_currency())?;
1730        self.add_currency(instrument.settlement_currency())?;
1731
1732        if let Some(database) = &mut self.database {
1733            database.add_instrument(&instrument)?;
1734        }
1735
1736        self.instruments.insert(instrument.id(), instrument);
1737        Ok(())
1738    }
1739
1740    /// Adds the `synthetic` instrument to the cache.
1741    ///
1742    /// # Errors
1743    ///
1744    /// Returns an error if persisting the synthetic instrument to the backing database fails.
1745    pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1746        log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1747
1748        if let Some(database) = &mut self.database {
1749            database.add_synthetic(&synthetic)?;
1750        }
1751
1752        self.synthetics.insert(synthetic.id, synthetic);
1753        Ok(())
1754    }
1755
1756    /// Adds the `account` to the cache.
1757    ///
1758    /// # Errors
1759    ///
1760    /// Returns an error if persisting the account to the backing database fails.
1761    pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1762        log::debug!("Adding `Account` {}", account.id());
1763
1764        if let Some(database) = &mut self.database {
1765            database.add_account(&account)?;
1766        }
1767
1768        let account_id = account.id();
1769        self.accounts.insert(account_id, account);
1770        self.index
1771            .venue_account
1772            .insert(account_id.get_issuer(), account_id);
1773        Ok(())
1774    }
1775
1776    /// Indexes the `client_order_id` with the `venue_order_id`.
1777    ///
1778    /// The `overwrite` parameter determines whether to overwrite any existing cached identifier.
1779    ///
1780    /// # Errors
1781    ///
1782    /// Returns an error if the existing venue order ID conflicts and overwrite is false.
1783    pub fn add_venue_order_id(
1784        &mut self,
1785        client_order_id: &ClientOrderId,
1786        venue_order_id: &VenueOrderId,
1787        overwrite: bool,
1788    ) -> anyhow::Result<()> {
1789        if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
1790            && !overwrite
1791            && existing_venue_order_id != venue_order_id
1792        {
1793            anyhow::bail!(
1794                "Existing {existing_venue_order_id} for {client_order_id}
1795                    did not match the given {venue_order_id}.
1796                    If you are writing a test then try a different `venue_order_id`,
1797                    otherwise this is probably a bug."
1798            );
1799        }
1800
1801        self.index
1802            .client_order_ids
1803            .insert(*client_order_id, *venue_order_id);
1804        self.index
1805            .venue_order_ids
1806            .insert(*venue_order_id, *client_order_id);
1807
1808        Ok(())
1809    }
1810
1811    /// Adds the `order` to the cache indexed with any given identifiers.
1812    ///
1813    /// # Parameters
1814    ///
1815    /// `override_existing`: If the added order should 'override' any existing order and replace
1816    /// it in the cache. This is currently used for emulated orders which are
1817    /// being released and transformed into another type.
1818    ///
1819    /// # Errors
1820    ///
1821    /// Returns an error if not `replace_existing` and the `order.client_order_id` is already contained in the cache.
1822    pub fn add_order(
1823        &mut self,
1824        order: OrderAny,
1825        position_id: Option<PositionId>,
1826        client_id: Option<ClientId>,
1827        replace_existing: bool,
1828    ) -> anyhow::Result<()> {
1829        let instrument_id = order.instrument_id();
1830        let venue = instrument_id.venue;
1831        let client_order_id = order.client_order_id();
1832        let strategy_id = order.strategy_id();
1833        let exec_algorithm_id = order.exec_algorithm_id();
1834        let exec_spawn_id = order.exec_spawn_id();
1835
1836        if !replace_existing {
1837            check_key_not_in_map(
1838                &client_order_id,
1839                &self.orders,
1840                stringify!(client_order_id),
1841                stringify!(orders),
1842            )?;
1843        }
1844
1845        log::debug!("Adding {order:?}");
1846
1847        self.index.orders.insert(client_order_id);
1848
1849        if order.is_active_local() {
1850            self.index.orders_active_local.insert(client_order_id);
1851        }
1852        self.index
1853            .order_strategy
1854            .insert(client_order_id, strategy_id);
1855        self.index.strategies.insert(strategy_id);
1856
1857        // Update venue -> orders index
1858        self.index
1859            .venue_orders
1860            .entry(venue)
1861            .or_default()
1862            .insert(client_order_id);
1863
1864        // Update instrument -> orders index
1865        self.index
1866            .instrument_orders
1867            .entry(instrument_id)
1868            .or_default()
1869            .insert(client_order_id);
1870
1871        // Update strategy -> orders index
1872        self.index
1873            .strategy_orders
1874            .entry(strategy_id)
1875            .or_default()
1876            .insert(client_order_id);
1877
1878        // Update account -> orders index (if account_id known at creation)
1879        if let Some(account_id) = order.account_id() {
1880            self.index
1881                .account_orders
1882                .entry(account_id)
1883                .or_default()
1884                .insert(client_order_id);
1885        }
1886
1887        // Update exec_algorithm -> orders index
1888        if let Some(exec_algorithm_id) = exec_algorithm_id {
1889            self.index.exec_algorithms.insert(exec_algorithm_id);
1890
1891            self.index
1892                .exec_algorithm_orders
1893                .entry(exec_algorithm_id)
1894                .or_default()
1895                .insert(client_order_id);
1896        }
1897
1898        // Update exec_spawn -> orders index
1899        if let Some(exec_spawn_id) = exec_spawn_id {
1900            self.index
1901                .exec_spawn_orders
1902                .entry(exec_spawn_id)
1903                .or_default()
1904                .insert(client_order_id);
1905        }
1906
1907        // Update emulation index
1908        if let Some(emulation_trigger) = order.emulation_trigger()
1909            && emulation_trigger != TriggerType::NoTrigger
1910        {
1911            self.index.orders_emulated.insert(client_order_id);
1912        }
1913
1914        // Index position ID if provided
1915        if let Some(position_id) = position_id {
1916            self.add_position_id(
1917                &position_id,
1918                &order.instrument_id().venue,
1919                &client_order_id,
1920                &strategy_id,
1921            )?;
1922        }
1923
1924        // Index client ID if provided
1925        if let Some(client_id) = client_id {
1926            self.index.order_client.insert(client_order_id, client_id);
1927            log::debug!("Indexed {client_id:?}");
1928        }
1929
1930        if let Some(database) = &mut self.database {
1931            database.add_order(&order, client_id)?;
1932            // TODO: Implement
1933            // if self.config.snapshot_orders {
1934            //     database.snapshot_order_state(order)?;
1935            // }
1936        }
1937
1938        self.orders.insert(client_order_id, order);
1939
1940        Ok(())
1941    }
1942
1943    /// Adds the `order_list` to the cache.
1944    ///
1945    /// # Errors
1946    ///
1947    /// Returns an error if the order list ID is already contained in the cache.
1948    pub fn add_order_list(&mut self, order_list: OrderList) -> anyhow::Result<()> {
1949        let order_list_id = order_list.id;
1950        check_key_not_in_map(
1951            &order_list_id,
1952            &self.order_lists,
1953            stringify!(order_list_id),
1954            stringify!(order_lists),
1955        )?;
1956
1957        log::debug!("Adding {order_list:?}");
1958        self.order_lists.insert(order_list_id, order_list);
1959        Ok(())
1960    }
1961
1962    /// Indexes the `position_id` with the other given IDs.
1963    ///
1964    /// # Errors
1965    ///
1966    /// Returns an error if indexing position ID in the backing database fails.
1967    pub fn add_position_id(
1968        &mut self,
1969        position_id: &PositionId,
1970        venue: &Venue,
1971        client_order_id: &ClientOrderId,
1972        strategy_id: &StrategyId,
1973    ) -> anyhow::Result<()> {
1974        self.index
1975            .order_position
1976            .insert(*client_order_id, *position_id);
1977
1978        // Index: ClientOrderId -> PositionId
1979        if let Some(database) = &mut self.database {
1980            database.index_order_position(*client_order_id, *position_id)?;
1981        }
1982
1983        // Index: PositionId -> StrategyId
1984        self.index
1985            .position_strategy
1986            .insert(*position_id, *strategy_id);
1987
1988        // Index: PositionId -> set[ClientOrderId]
1989        self.index
1990            .position_orders
1991            .entry(*position_id)
1992            .or_default()
1993            .insert(*client_order_id);
1994
1995        // Index: StrategyId -> set[PositionId]
1996        self.index
1997            .strategy_positions
1998            .entry(*strategy_id)
1999            .or_default()
2000            .insert(*position_id);
2001
2002        // Index: Venue -> set[PositionId]
2003        self.index
2004            .venue_positions
2005            .entry(*venue)
2006            .or_default()
2007            .insert(*position_id);
2008
2009        Ok(())
2010    }
2011
2012    /// Adds the `position` to the cache.
2013    ///
2014    /// # Errors
2015    ///
2016    /// Returns an error if persisting the position to the backing database fails.
2017    pub fn add_position(&mut self, position: &Position, _oms_type: OmsType) -> anyhow::Result<()> {
2018        self.positions.insert(position.id, position.clone());
2019        self.index.positions.insert(position.id);
2020        self.index.positions_open.insert(position.id);
2021        self.index.positions_closed.remove(&position.id); // Cleanup for NETTING reopen
2022
2023        log::debug!("Adding {position}");
2024
2025        self.add_position_id(
2026            &position.id,
2027            &position.instrument_id.venue,
2028            &position.opening_order_id,
2029            &position.strategy_id,
2030        )?;
2031
2032        let venue = position.instrument_id.venue;
2033        let venue_positions = self.index.venue_positions.entry(venue).or_default();
2034        venue_positions.insert(position.id);
2035
2036        // Index: InstrumentId -> AHashSet
2037        let instrument_id = position.instrument_id;
2038        let instrument_positions = self
2039            .index
2040            .instrument_positions
2041            .entry(instrument_id)
2042            .or_default();
2043        instrument_positions.insert(position.id);
2044
2045        // Index: AccountId -> AHashSet<PositionId>
2046        self.index
2047            .account_positions
2048            .entry(position.account_id)
2049            .or_default()
2050            .insert(position.id);
2051
2052        if let Some(database) = &mut self.database {
2053            database.add_position(position)?;
2054            // TODO: Implement position snapshots
2055            // if self.snapshot_positions {
2056            //     database.snapshot_position_state(
2057            //         position,
2058            //         position.ts_last,
2059            //         self.calculate_unrealized_pnl(&position),
2060            //     )?;
2061            // }
2062        }
2063
2064        Ok(())
2065    }
2066
2067    /// Updates the `account` in the cache.
2068    ///
2069    /// # Errors
2070    ///
2071    /// Returns an error if updating the account in the database fails.
2072    pub fn update_account(&mut self, account: &AccountAny) -> anyhow::Result<()> {
2073        let account_id = account.id();
2074        self.accounts.insert(account_id, account.clone());
2075
2076        if let Some(database) = &mut self.database {
2077            database.update_account(account)?;
2078        }
2079        Ok(())
2080    }
2081
2082    /// Updates the `order` in the cache.
2083    ///
2084    /// # Errors
2085    ///
2086    /// Returns an error if updating the order in the database fails.
2087    pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
2088        let client_order_id = order.client_order_id();
2089
2090        if order.is_active_local() {
2091            self.index.orders_active_local.insert(client_order_id);
2092        } else {
2093            self.index.orders_active_local.remove(&client_order_id);
2094        }
2095
2096        // Update venue order ID
2097        if let Some(venue_order_id) = order.venue_order_id() {
2098            // If the order is being modified then we allow a changing `VenueOrderId` to accommodate
2099            // venues which use a cancel+replace update strategy.
2100            if !self.index.venue_order_ids.contains_key(&venue_order_id) {
2101                // TODO: If the last event was `OrderUpdated` then overwrite should be true
2102                self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
2103            }
2104        }
2105
2106        // Update in-flight state
2107        if order.is_inflight() {
2108            self.index.orders_inflight.insert(client_order_id);
2109        } else {
2110            self.index.orders_inflight.remove(&client_order_id);
2111        }
2112
2113        // Update open/closed state
2114        if order.is_open() {
2115            self.index.orders_closed.remove(&client_order_id);
2116            self.index.orders_open.insert(client_order_id);
2117        } else if order.is_closed() {
2118            self.index.orders_open.remove(&client_order_id);
2119            self.index.orders_pending_cancel.remove(&client_order_id);
2120            self.index.orders_closed.insert(client_order_id);
2121        }
2122
2123        // Update emulation index
2124        if let Some(emulation_trigger) = order.emulation_trigger()
2125            && emulation_trigger != TriggerType::NoTrigger
2126            && !order.is_closed()
2127        {
2128            self.index.orders_emulated.insert(client_order_id);
2129        } else {
2130            self.index.orders_emulated.remove(&client_order_id);
2131        }
2132
2133        // Update account orders index when account_id becomes available
2134        if let Some(account_id) = order.account_id() {
2135            self.index
2136                .account_orders
2137                .entry(account_id)
2138                .or_default()
2139                .insert(client_order_id);
2140        }
2141
2142        // Update own book
2143        if self.own_order_book(&order.instrument_id()).is_some()
2144            && should_handle_own_book_order(order)
2145        {
2146            self.update_own_order_book(order);
2147        }
2148
2149        if let Some(database) = &mut self.database {
2150            database.update_order(order.last_event())?;
2151            // TODO: Implement order snapshots
2152            // if self.snapshot_orders {
2153            //     database.snapshot_order_state(order)?;
2154            // }
2155        }
2156
2157        // update the order in the cache
2158        self.orders.insert(client_order_id, order.clone());
2159
2160        Ok(())
2161    }
2162
2163    /// Updates the `order` as pending cancel locally.
2164    pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
2165        self.index
2166            .orders_pending_cancel
2167            .insert(order.client_order_id());
2168    }
2169
2170    /// Updates the `position` in the cache.
2171    ///
2172    /// # Errors
2173    ///
2174    /// Returns an error if updating the position in the database fails.
2175    pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
2176        // Update open/closed state
2177
2178        if position.is_open() {
2179            self.index.positions_open.insert(position.id);
2180            self.index.positions_closed.remove(&position.id);
2181        } else {
2182            self.index.positions_closed.insert(position.id);
2183            self.index.positions_open.remove(&position.id);
2184        }
2185
2186        if let Some(database) = &mut self.database {
2187            database.update_position(position)?;
2188            // TODO: Implement order snapshots
2189            // if self.snapshot_orders {
2190            //     database.snapshot_order_state(order)?;
2191            // }
2192        }
2193
2194        self.positions.insert(position.id, position.clone());
2195
2196        Ok(())
2197    }
2198
2199    /// Creates a snapshot of the `position` by cloning it, assigning a new ID,
2200    /// serializing it, and storing it in the position snapshots.
2201    ///
2202    /// # Errors
2203    ///
2204    /// Returns an error if serializing or storing the position snapshot fails.
2205    pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
2206        let position_id = position.id;
2207
2208        let mut copied_position = position.clone();
2209        let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
2210        copied_position.id = PositionId::new(new_id);
2211
2212        // Serialize the position (TODO: temporarily just to JSON to remove a dependency)
2213        let position_serialized = serde_json::to_vec(&copied_position)?;
2214
2215        let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
2216        let new_snapshots = match snapshots {
2217            Some(existing_snapshots) => {
2218                let mut combined = existing_snapshots.to_vec();
2219                combined.extend(position_serialized);
2220                Bytes::from(combined)
2221            }
2222            None => Bytes::from(position_serialized),
2223        };
2224        self.position_snapshots.insert(position_id, new_snapshots);
2225
2226        log::debug!("Snapshot {copied_position}");
2227        Ok(())
2228    }
2229
2230    /// Creates a snapshot of the `position` state in the database.
2231    ///
2232    /// # Errors
2233    ///
2234    /// Returns an error if snapshotting the position state fails.
2235    pub fn snapshot_position_state(
2236        &mut self,
2237        position: &Position,
2238        // ts_snapshot: u64,
2239        // unrealized_pnl: Option<Money>,
2240        open_only: Option<bool>,
2241    ) -> anyhow::Result<()> {
2242        let open_only = open_only.unwrap_or(true);
2243
2244        if open_only && !position.is_open() {
2245            return Ok(());
2246        }
2247
2248        if let Some(database) = &mut self.database {
2249            database.snapshot_position_state(position).map_err(|e| {
2250                log::error!(
2251                    "Failed to snapshot position state for {}: {e:?}",
2252                    position.id
2253                );
2254                e
2255            })?;
2256        } else {
2257            log::warn!(
2258                "Cannot snapshot position state for {} (no database configured)",
2259                position.id
2260            );
2261        }
2262
2263        // Ok(())
2264        todo!()
2265    }
2266
2267    /// Gets the OMS type for the `position_id`.
2268    #[must_use]
2269    pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
2270        // Get OMS type from the index
2271        if self.index.position_strategy.contains_key(position_id) {
2272            // For now, we'll default to NETTING
2273            // TODO: Store and retrieve actual OMS type per position
2274            Some(OmsType::Netting)
2275        } else {
2276            None
2277        }
2278    }
2279
2280    /// Gets position snapshot bytes for the `position_id`.
2281    #[must_use]
2282    pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<u8>> {
2283        self.position_snapshots.get(position_id).map(|b| b.to_vec())
2284    }
2285
2286    /// Gets position snapshot IDs for the `instrument_id`.
2287    #[must_use]
2288    pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> AHashSet<PositionId> {
2289        // Get snapshot position IDs that match the instrument
2290        let mut result = AHashSet::new();
2291        for (position_id, _) in &self.position_snapshots {
2292            // Check if this position is for the requested instrument
2293            if let Some(position) = self.positions.get(position_id)
2294                && position.instrument_id == *instrument_id
2295            {
2296                result.insert(*position_id);
2297            }
2298        }
2299        result
2300    }
2301
2302    /// Snapshots the `order` state in the database.
2303    ///
2304    /// # Errors
2305    ///
2306    /// Returns an error if snapshotting the order state fails.
2307    pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
2308        let database = if let Some(database) = &self.database {
2309            database
2310        } else {
2311            log::warn!(
2312                "Cannot snapshot order state for {} (no database configured)",
2313                order.client_order_id()
2314            );
2315            return Ok(());
2316        };
2317
2318        database.snapshot_order_state(order)
2319    }
2320
2321    // -- IDENTIFIER QUERIES ----------------------------------------------------------------------
2322
2323    fn build_order_query_filter_set(
2324        &self,
2325        venue: Option<&Venue>,
2326        instrument_id: Option<&InstrumentId>,
2327        strategy_id: Option<&StrategyId>,
2328        account_id: Option<&AccountId>,
2329    ) -> Option<AHashSet<ClientOrderId>> {
2330        let mut query: Option<AHashSet<ClientOrderId>> = None;
2331
2332        if let Some(venue) = venue {
2333            query = Some(
2334                self.index
2335                    .venue_orders
2336                    .get(venue)
2337                    .cloned()
2338                    .unwrap_or_default(),
2339            );
2340        }
2341
2342        if let Some(instrument_id) = instrument_id {
2343            let instrument_orders = self
2344                .index
2345                .instrument_orders
2346                .get(instrument_id)
2347                .cloned()
2348                .unwrap_or_default();
2349
2350            if let Some(existing_query) = &mut query {
2351                *existing_query = existing_query
2352                    .intersection(&instrument_orders)
2353                    .copied()
2354                    .collect();
2355            } else {
2356                query = Some(instrument_orders);
2357            }
2358        }
2359
2360        if let Some(strategy_id) = strategy_id {
2361            let strategy_orders = self
2362                .index
2363                .strategy_orders
2364                .get(strategy_id)
2365                .cloned()
2366                .unwrap_or_default();
2367
2368            if let Some(existing_query) = &mut query {
2369                *existing_query = existing_query
2370                    .intersection(&strategy_orders)
2371                    .copied()
2372                    .collect();
2373            } else {
2374                query = Some(strategy_orders);
2375            }
2376        }
2377
2378        if let Some(account_id) = account_id {
2379            let account_orders = self
2380                .index
2381                .account_orders
2382                .get(account_id)
2383                .cloned()
2384                .unwrap_or_default();
2385
2386            if let Some(existing_query) = &mut query {
2387                *existing_query = existing_query
2388                    .intersection(&account_orders)
2389                    .copied()
2390                    .collect();
2391            } else {
2392                query = Some(account_orders);
2393            }
2394        }
2395
2396        query
2397    }
2398
2399    fn build_position_query_filter_set(
2400        &self,
2401        venue: Option<&Venue>,
2402        instrument_id: Option<&InstrumentId>,
2403        strategy_id: Option<&StrategyId>,
2404        account_id: Option<&AccountId>,
2405    ) -> Option<AHashSet<PositionId>> {
2406        let mut query: Option<AHashSet<PositionId>> = None;
2407
2408        if let Some(venue) = venue {
2409            query = Some(
2410                self.index
2411                    .venue_positions
2412                    .get(venue)
2413                    .cloned()
2414                    .unwrap_or_default(),
2415            );
2416        }
2417
2418        if let Some(instrument_id) = instrument_id {
2419            let instrument_positions = self
2420                .index
2421                .instrument_positions
2422                .get(instrument_id)
2423                .cloned()
2424                .unwrap_or_default();
2425
2426            if let Some(existing_query) = query {
2427                query = Some(
2428                    existing_query
2429                        .intersection(&instrument_positions)
2430                        .copied()
2431                        .collect(),
2432                );
2433            } else {
2434                query = Some(instrument_positions);
2435            }
2436        }
2437
2438        if let Some(strategy_id) = strategy_id {
2439            let strategy_positions = self
2440                .index
2441                .strategy_positions
2442                .get(strategy_id)
2443                .cloned()
2444                .unwrap_or_default();
2445
2446            if let Some(existing_query) = query {
2447                query = Some(
2448                    existing_query
2449                        .intersection(&strategy_positions)
2450                        .copied()
2451                        .collect(),
2452                );
2453            } else {
2454                query = Some(strategy_positions);
2455            }
2456        }
2457
2458        if let Some(account_id) = account_id {
2459            let account_positions = self
2460                .index
2461                .account_positions
2462                .get(account_id)
2463                .cloned()
2464                .unwrap_or_default();
2465
2466            if let Some(existing_query) = query {
2467                query = Some(
2468                    existing_query
2469                        .intersection(&account_positions)
2470                        .copied()
2471                        .collect(),
2472                );
2473            } else {
2474                query = Some(account_positions);
2475            }
2476        }
2477
2478        query
2479    }
2480
2481    /// Retrieves orders corresponding to the `client_order_ids`, optionally filtering by `side`.
2482    ///
2483    /// # Panics
2484    ///
2485    /// Panics if any `client_order_id` in the set is not found in the cache.
2486    fn get_orders_for_ids(
2487        &self,
2488        client_order_ids: &AHashSet<ClientOrderId>,
2489        side: Option<OrderSide>,
2490    ) -> Vec<&OrderAny> {
2491        let side = side.unwrap_or(OrderSide::NoOrderSide);
2492        let mut orders = Vec::new();
2493
2494        for client_order_id in client_order_ids {
2495            let order = self
2496                .orders
2497                .get(client_order_id)
2498                .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
2499
2500            if side == OrderSide::NoOrderSide || side == order.order_side() {
2501                orders.push(order);
2502            }
2503        }
2504
2505        orders
2506    }
2507
2508    /// Retrieves positions corresponding to the `position_ids`, optionally filtering by `side`.
2509    ///
2510    /// # Panics
2511    ///
2512    /// Panics if any `position_id` in the set is not found in the cache.
2513    fn get_positions_for_ids(
2514        &self,
2515        position_ids: &AHashSet<PositionId>,
2516        side: Option<PositionSide>,
2517    ) -> Vec<&Position> {
2518        let side = side.unwrap_or(PositionSide::NoPositionSide);
2519        let mut positions = Vec::new();
2520
2521        for position_id in position_ids {
2522            let position = self
2523                .positions
2524                .get(position_id)
2525                .unwrap_or_else(|| panic!("Position {position_id} not found"));
2526
2527            if side == PositionSide::NoPositionSide || side == position.side {
2528                positions.push(position);
2529            }
2530        }
2531
2532        positions
2533    }
2534
2535    /// Returns the `ClientOrderId`s of all orders.
2536    #[must_use]
2537    pub fn client_order_ids(
2538        &self,
2539        venue: Option<&Venue>,
2540        instrument_id: Option<&InstrumentId>,
2541        strategy_id: Option<&StrategyId>,
2542        account_id: Option<&AccountId>,
2543    ) -> AHashSet<ClientOrderId> {
2544        let query =
2545            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2546        match query {
2547            Some(query) => self.index.orders.intersection(&query).copied().collect(),
2548            None => self.index.orders.clone(),
2549        }
2550    }
2551
2552    /// Returns the `ClientOrderId`s of all open orders.
2553    #[must_use]
2554    pub fn client_order_ids_open(
2555        &self,
2556        venue: Option<&Venue>,
2557        instrument_id: Option<&InstrumentId>,
2558        strategy_id: Option<&StrategyId>,
2559        account_id: Option<&AccountId>,
2560    ) -> AHashSet<ClientOrderId> {
2561        let query =
2562            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2563        match query {
2564            Some(query) => self
2565                .index
2566                .orders_open
2567                .intersection(&query)
2568                .copied()
2569                .collect(),
2570            None => self.index.orders_open.clone(),
2571        }
2572    }
2573
2574    /// Returns the `ClientOrderId`s of all closed orders.
2575    #[must_use]
2576    pub fn client_order_ids_closed(
2577        &self,
2578        venue: Option<&Venue>,
2579        instrument_id: Option<&InstrumentId>,
2580        strategy_id: Option<&StrategyId>,
2581        account_id: Option<&AccountId>,
2582    ) -> AHashSet<ClientOrderId> {
2583        let query =
2584            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2585        match query {
2586            Some(query) => self
2587                .index
2588                .orders_closed
2589                .intersection(&query)
2590                .copied()
2591                .collect(),
2592            None => self.index.orders_closed.clone(),
2593        }
2594    }
2595
2596    /// Returns the `ClientOrderId`s of all locally active orders.
2597    ///
2598    /// Locally active orders are in the `INITIALIZED`, `EMULATED`, or `RELEASED` state
2599    /// (a superset of emulated orders).
2600    #[must_use]
2601    pub fn client_order_ids_active_local(
2602        &self,
2603        venue: Option<&Venue>,
2604        instrument_id: Option<&InstrumentId>,
2605        strategy_id: Option<&StrategyId>,
2606        account_id: Option<&AccountId>,
2607    ) -> AHashSet<ClientOrderId> {
2608        let query =
2609            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2610        match query {
2611            Some(query) => self
2612                .index
2613                .orders_active_local
2614                .intersection(&query)
2615                .copied()
2616                .collect(),
2617            None => self.index.orders_active_local.clone(),
2618        }
2619    }
2620
2621    /// Returns the `ClientOrderId`s of all emulated orders.
2622    #[must_use]
2623    pub fn client_order_ids_emulated(
2624        &self,
2625        venue: Option<&Venue>,
2626        instrument_id: Option<&InstrumentId>,
2627        strategy_id: Option<&StrategyId>,
2628        account_id: Option<&AccountId>,
2629    ) -> AHashSet<ClientOrderId> {
2630        let query =
2631            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2632        match query {
2633            Some(query) => self
2634                .index
2635                .orders_emulated
2636                .intersection(&query)
2637                .copied()
2638                .collect(),
2639            None => self.index.orders_emulated.clone(),
2640        }
2641    }
2642
2643    /// Returns the `ClientOrderId`s of all in-flight orders.
2644    #[must_use]
2645    pub fn client_order_ids_inflight(
2646        &self,
2647        venue: Option<&Venue>,
2648        instrument_id: Option<&InstrumentId>,
2649        strategy_id: Option<&StrategyId>,
2650        account_id: Option<&AccountId>,
2651    ) -> AHashSet<ClientOrderId> {
2652        let query =
2653            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2654        match query {
2655            Some(query) => self
2656                .index
2657                .orders_inflight
2658                .intersection(&query)
2659                .copied()
2660                .collect(),
2661            None => self.index.orders_inflight.clone(),
2662        }
2663    }
2664
2665    /// Returns `PositionId`s of all positions.
2666    #[must_use]
2667    pub fn position_ids(
2668        &self,
2669        venue: Option<&Venue>,
2670        instrument_id: Option<&InstrumentId>,
2671        strategy_id: Option<&StrategyId>,
2672        account_id: Option<&AccountId>,
2673    ) -> AHashSet<PositionId> {
2674        let query =
2675            self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2676        match query {
2677            Some(query) => self.index.positions.intersection(&query).copied().collect(),
2678            None => self.index.positions.clone(),
2679        }
2680    }
2681
2682    /// Returns the `PositionId`s of all open positions.
2683    #[must_use]
2684    pub fn position_open_ids(
2685        &self,
2686        venue: Option<&Venue>,
2687        instrument_id: Option<&InstrumentId>,
2688        strategy_id: Option<&StrategyId>,
2689        account_id: Option<&AccountId>,
2690    ) -> AHashSet<PositionId> {
2691        let query =
2692            self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2693        match query {
2694            Some(query) => self
2695                .index
2696                .positions_open
2697                .intersection(&query)
2698                .copied()
2699                .collect(),
2700            None => self.index.positions_open.clone(),
2701        }
2702    }
2703
2704    /// Returns the `PositionId`s of all closed positions.
2705    #[must_use]
2706    pub fn position_closed_ids(
2707        &self,
2708        venue: Option<&Venue>,
2709        instrument_id: Option<&InstrumentId>,
2710        strategy_id: Option<&StrategyId>,
2711        account_id: Option<&AccountId>,
2712    ) -> AHashSet<PositionId> {
2713        let query =
2714            self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2715        match query {
2716            Some(query) => self
2717                .index
2718                .positions_closed
2719                .intersection(&query)
2720                .copied()
2721                .collect(),
2722            None => self.index.positions_closed.clone(),
2723        }
2724    }
2725
2726    /// Returns the `ComponentId`s of all actors.
2727    #[must_use]
2728    pub fn actor_ids(&self) -> AHashSet<ComponentId> {
2729        self.index.actors.clone()
2730    }
2731
2732    /// Returns the `StrategyId`s of all strategies.
2733    #[must_use]
2734    pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
2735        self.index.strategies.clone()
2736    }
2737
2738    /// Returns the `ExecAlgorithmId`s of all execution algorithms.
2739    #[must_use]
2740    pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
2741        self.index.exec_algorithms.clone()
2742    }
2743
2744    // -- ORDER QUERIES ---------------------------------------------------------------------------
2745
2746    /// Gets a reference to the order with the `client_order_id` (if found).
2747    #[must_use]
2748    pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2749        self.orders.get(client_order_id)
2750    }
2751
2752    /// Gets cloned orders for the given `client_order_ids`, logging an error for any missing.
2753    #[must_use]
2754    pub fn orders_for_ids(
2755        &self,
2756        client_order_ids: &[ClientOrderId],
2757        context: &dyn Display,
2758    ) -> Vec<OrderAny> {
2759        let mut orders = Vec::with_capacity(client_order_ids.len());
2760        for id in client_order_ids {
2761            match self.orders.get(id) {
2762                Some(order) => orders.push(order.clone()),
2763                None => log::error!("Order {id} not found in cache for {context}"),
2764            }
2765        }
2766        orders
2767    }
2768
2769    /// Gets a reference to the order with the `client_order_id` (if found).
2770    #[must_use]
2771    pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2772        self.orders.get_mut(client_order_id)
2773    }
2774
2775    /// Gets a reference to the client order ID for the `venue_order_id` (if found).
2776    #[must_use]
2777    pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2778        self.index.venue_order_ids.get(venue_order_id)
2779    }
2780
2781    /// Gets a reference to the venue order ID for the `client_order_id` (if found).
2782    #[must_use]
2783    pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2784        self.index.client_order_ids.get(client_order_id)
2785    }
2786
2787    /// Gets a reference to the client ID indexed for then `client_order_id` (if found).
2788    #[must_use]
2789    pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2790        self.index.order_client.get(client_order_id)
2791    }
2792
2793    /// Returns references to all orders matching the optional filter parameters.
2794    #[must_use]
2795    pub fn orders(
2796        &self,
2797        venue: Option<&Venue>,
2798        instrument_id: Option<&InstrumentId>,
2799        strategy_id: Option<&StrategyId>,
2800        account_id: Option<&AccountId>,
2801        side: Option<OrderSide>,
2802    ) -> Vec<&OrderAny> {
2803        let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id, account_id);
2804        self.get_orders_for_ids(&client_order_ids, side)
2805    }
2806
2807    /// Returns references to all open orders matching the optional filter parameters.
2808    #[must_use]
2809    pub fn orders_open(
2810        &self,
2811        venue: Option<&Venue>,
2812        instrument_id: Option<&InstrumentId>,
2813        strategy_id: Option<&StrategyId>,
2814        account_id: Option<&AccountId>,
2815        side: Option<OrderSide>,
2816    ) -> Vec<&OrderAny> {
2817        let client_order_ids =
2818            self.client_order_ids_open(venue, instrument_id, strategy_id, account_id);
2819        self.get_orders_for_ids(&client_order_ids, side)
2820    }
2821
2822    /// Returns references to all closed orders matching the optional filter parameters.
2823    #[must_use]
2824    pub fn orders_closed(
2825        &self,
2826        venue: Option<&Venue>,
2827        instrument_id: Option<&InstrumentId>,
2828        strategy_id: Option<&StrategyId>,
2829        account_id: Option<&AccountId>,
2830        side: Option<OrderSide>,
2831    ) -> Vec<&OrderAny> {
2832        let client_order_ids =
2833            self.client_order_ids_closed(venue, instrument_id, strategy_id, account_id);
2834        self.get_orders_for_ids(&client_order_ids, side)
2835    }
2836
2837    /// Returns references to all locally active orders matching the optional filter parameters.
2838    ///
2839    /// Locally active orders are in the `INITIALIZED`, `EMULATED`, or `RELEASED` state
2840    /// (a superset of emulated orders).
2841    #[must_use]
2842    pub fn orders_active_local(
2843        &self,
2844        venue: Option<&Venue>,
2845        instrument_id: Option<&InstrumentId>,
2846        strategy_id: Option<&StrategyId>,
2847        account_id: Option<&AccountId>,
2848        side: Option<OrderSide>,
2849    ) -> Vec<&OrderAny> {
2850        let client_order_ids =
2851            self.client_order_ids_active_local(venue, instrument_id, strategy_id, account_id);
2852        self.get_orders_for_ids(&client_order_ids, side)
2853    }
2854
2855    /// Returns references to all emulated orders matching the optional filter parameters.
2856    #[must_use]
2857    pub fn orders_emulated(
2858        &self,
2859        venue: Option<&Venue>,
2860        instrument_id: Option<&InstrumentId>,
2861        strategy_id: Option<&StrategyId>,
2862        account_id: Option<&AccountId>,
2863        side: Option<OrderSide>,
2864    ) -> Vec<&OrderAny> {
2865        let client_order_ids =
2866            self.client_order_ids_emulated(venue, instrument_id, strategy_id, account_id);
2867        self.get_orders_for_ids(&client_order_ids, side)
2868    }
2869
2870    /// Returns references to all in-flight orders matching the optional filter parameters.
2871    #[must_use]
2872    pub fn orders_inflight(
2873        &self,
2874        venue: Option<&Venue>,
2875        instrument_id: Option<&InstrumentId>,
2876        strategy_id: Option<&StrategyId>,
2877        account_id: Option<&AccountId>,
2878        side: Option<OrderSide>,
2879    ) -> Vec<&OrderAny> {
2880        let client_order_ids =
2881            self.client_order_ids_inflight(venue, instrument_id, strategy_id, account_id);
2882        self.get_orders_for_ids(&client_order_ids, side)
2883    }
2884
2885    /// Returns references to all orders for the `position_id`.
2886    #[must_use]
2887    pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2888        let client_order_ids = self.index.position_orders.get(position_id);
2889        match client_order_ids {
2890            Some(client_order_ids) => {
2891                self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2892            }
2893            None => Vec::new(),
2894        }
2895    }
2896
2897    /// Returns whether an order with the `client_order_id` exists.
2898    #[must_use]
2899    pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2900        self.index.orders.contains(client_order_id)
2901    }
2902
2903    /// Returns whether an order with the `client_order_id` is open.
2904    #[must_use]
2905    pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2906        self.index.orders_open.contains(client_order_id)
2907    }
2908
2909    /// Returns whether an order with the `client_order_id` is closed.
2910    #[must_use]
2911    pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2912        self.index.orders_closed.contains(client_order_id)
2913    }
2914
2915    /// Returns whether an order with the `client_order_id` is locally active.
2916    ///
2917    /// Locally active orders are in the `INITIALIZED`, `EMULATED`, or `RELEASED` state
2918    /// (a superset of emulated orders).
2919    #[must_use]
2920    pub fn is_order_active_local(&self, client_order_id: &ClientOrderId) -> bool {
2921        self.index.orders_active_local.contains(client_order_id)
2922    }
2923
2924    /// Returns whether an order with the `client_order_id` is emulated.
2925    #[must_use]
2926    pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2927        self.index.orders_emulated.contains(client_order_id)
2928    }
2929
2930    /// Returns whether an order with the `client_order_id` is in-flight.
2931    #[must_use]
2932    pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2933        self.index.orders_inflight.contains(client_order_id)
2934    }
2935
2936    /// Returns whether an order with the `client_order_id` is `PENDING_CANCEL` locally.
2937    #[must_use]
2938    pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2939        self.index.orders_pending_cancel.contains(client_order_id)
2940    }
2941
2942    /// Returns the count of all open orders.
2943    #[must_use]
2944    pub fn orders_open_count(
2945        &self,
2946        venue: Option<&Venue>,
2947        instrument_id: Option<&InstrumentId>,
2948        strategy_id: Option<&StrategyId>,
2949        account_id: Option<&AccountId>,
2950        side: Option<OrderSide>,
2951    ) -> usize {
2952        self.orders_open(venue, instrument_id, strategy_id, account_id, side)
2953            .len()
2954    }
2955
2956    /// Returns the count of all closed orders.
2957    #[must_use]
2958    pub fn orders_closed_count(
2959        &self,
2960        venue: Option<&Venue>,
2961        instrument_id: Option<&InstrumentId>,
2962        strategy_id: Option<&StrategyId>,
2963        account_id: Option<&AccountId>,
2964        side: Option<OrderSide>,
2965    ) -> usize {
2966        self.orders_closed(venue, instrument_id, strategy_id, account_id, side)
2967            .len()
2968    }
2969
2970    /// Returns the count of all locally active orders.
2971    ///
2972    /// Locally active orders are in the `INITIALIZED`, `EMULATED`, or `RELEASED` state
2973    /// (a superset of emulated orders).
2974    #[must_use]
2975    pub fn orders_active_local_count(
2976        &self,
2977        venue: Option<&Venue>,
2978        instrument_id: Option<&InstrumentId>,
2979        strategy_id: Option<&StrategyId>,
2980        account_id: Option<&AccountId>,
2981        side: Option<OrderSide>,
2982    ) -> usize {
2983        self.orders_active_local(venue, instrument_id, strategy_id, account_id, side)
2984            .len()
2985    }
2986
2987    /// Returns the count of all emulated orders.
2988    #[must_use]
2989    pub fn orders_emulated_count(
2990        &self,
2991        venue: Option<&Venue>,
2992        instrument_id: Option<&InstrumentId>,
2993        strategy_id: Option<&StrategyId>,
2994        account_id: Option<&AccountId>,
2995        side: Option<OrderSide>,
2996    ) -> usize {
2997        self.orders_emulated(venue, instrument_id, strategy_id, account_id, side)
2998            .len()
2999    }
3000
3001    /// Returns the count of all in-flight orders.
3002    #[must_use]
3003    pub fn orders_inflight_count(
3004        &self,
3005        venue: Option<&Venue>,
3006        instrument_id: Option<&InstrumentId>,
3007        strategy_id: Option<&StrategyId>,
3008        account_id: Option<&AccountId>,
3009        side: Option<OrderSide>,
3010    ) -> usize {
3011        self.orders_inflight(venue, instrument_id, strategy_id, account_id, side)
3012            .len()
3013    }
3014
3015    /// Returns the count of all orders.
3016    #[must_use]
3017    pub fn orders_total_count(
3018        &self,
3019        venue: Option<&Venue>,
3020        instrument_id: Option<&InstrumentId>,
3021        strategy_id: Option<&StrategyId>,
3022        account_id: Option<&AccountId>,
3023        side: Option<OrderSide>,
3024    ) -> usize {
3025        self.orders(venue, instrument_id, strategy_id, account_id, side)
3026            .len()
3027    }
3028
3029    /// Returns the order list for the `order_list_id`.
3030    #[must_use]
3031    pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
3032        self.order_lists.get(order_list_id)
3033    }
3034
3035    /// Returns all order lists matching the optional filter parameters.
3036    #[must_use]
3037    pub fn order_lists(
3038        &self,
3039        venue: Option<&Venue>,
3040        instrument_id: Option<&InstrumentId>,
3041        strategy_id: Option<&StrategyId>,
3042        account_id: Option<&AccountId>,
3043    ) -> Vec<&OrderList> {
3044        let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
3045
3046        if let Some(venue) = venue {
3047            order_lists.retain(|ol| &ol.instrument_id.venue == venue);
3048        }
3049
3050        if let Some(instrument_id) = instrument_id {
3051            order_lists.retain(|ol| &ol.instrument_id == instrument_id);
3052        }
3053
3054        if let Some(strategy_id) = strategy_id {
3055            order_lists.retain(|ol| &ol.strategy_id == strategy_id);
3056        }
3057
3058        if let Some(account_id) = account_id {
3059            order_lists.retain(|ol| {
3060                ol.client_order_ids.iter().any(|client_order_id| {
3061                    self.orders
3062                        .get(client_order_id)
3063                        .is_some_and(|order| order.account_id().as_ref() == Some(account_id))
3064                })
3065            });
3066        }
3067
3068        order_lists
3069    }
3070
3071    /// Returns whether an order list with the `order_list_id` exists.
3072    #[must_use]
3073    pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
3074        self.order_lists.contains_key(order_list_id)
3075    }
3076
3077    // -- EXEC ALGORITHM QUERIES ------------------------------------------------------------------
3078
3079    /// Returns references to all orders associated with the `exec_algorithm_id` matching the
3080    /// optional filter parameters.
3081    #[must_use]
3082    pub fn orders_for_exec_algorithm(
3083        &self,
3084        exec_algorithm_id: &ExecAlgorithmId,
3085        venue: Option<&Venue>,
3086        instrument_id: Option<&InstrumentId>,
3087        strategy_id: Option<&StrategyId>,
3088        account_id: Option<&AccountId>,
3089        side: Option<OrderSide>,
3090    ) -> Vec<&OrderAny> {
3091        let query =
3092            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
3093        let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
3094
3095        if let Some(query) = query
3096            && let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids
3097        {
3098            let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
3099        }
3100
3101        if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
3102            self.get_orders_for_ids(exec_algorithm_order_ids, side)
3103        } else {
3104            Vec::new()
3105        }
3106    }
3107
3108    /// Returns references to all orders with the `exec_spawn_id`.
3109    #[must_use]
3110    pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
3111        self.get_orders_for_ids(
3112            self.index
3113                .exec_spawn_orders
3114                .get(exec_spawn_id)
3115                .unwrap_or(&AHashSet::new()),
3116            None,
3117        )
3118    }
3119
3120    /// Returns the total order quantity for the `exec_spawn_id`.
3121    #[must_use]
3122    pub fn exec_spawn_total_quantity(
3123        &self,
3124        exec_spawn_id: &ClientOrderId,
3125        active_only: bool,
3126    ) -> Option<Quantity> {
3127        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
3128
3129        let mut total_quantity: Option<Quantity> = None;
3130
3131        for spawn_order in exec_spawn_orders {
3132            if active_only && spawn_order.is_closed() {
3133                continue;
3134            }
3135
3136            match total_quantity.as_mut() {
3137                Some(total) => *total = *total + spawn_order.quantity(),
3138                None => total_quantity = Some(spawn_order.quantity()),
3139            }
3140        }
3141
3142        total_quantity
3143    }
3144
3145    /// Returns the total filled quantity for all orders with the `exec_spawn_id`.
3146    #[must_use]
3147    pub fn exec_spawn_total_filled_qty(
3148        &self,
3149        exec_spawn_id: &ClientOrderId,
3150        active_only: bool,
3151    ) -> Option<Quantity> {
3152        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
3153
3154        let mut total_quantity: Option<Quantity> = None;
3155
3156        for spawn_order in exec_spawn_orders {
3157            if active_only && spawn_order.is_closed() {
3158                continue;
3159            }
3160
3161            match total_quantity.as_mut() {
3162                Some(total) => *total = *total + spawn_order.filled_qty(),
3163                None => total_quantity = Some(spawn_order.filled_qty()),
3164            }
3165        }
3166
3167        total_quantity
3168    }
3169
3170    /// Returns the total leaves quantity for all orders with the `exec_spawn_id`.
3171    #[must_use]
3172    pub fn exec_spawn_total_leaves_qty(
3173        &self,
3174        exec_spawn_id: &ClientOrderId,
3175        active_only: bool,
3176    ) -> Option<Quantity> {
3177        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
3178
3179        let mut total_quantity: Option<Quantity> = None;
3180
3181        for spawn_order in exec_spawn_orders {
3182            if active_only && spawn_order.is_closed() {
3183                continue;
3184            }
3185
3186            match total_quantity.as_mut() {
3187                Some(total) => *total = *total + spawn_order.leaves_qty(),
3188                None => total_quantity = Some(spawn_order.leaves_qty()),
3189            }
3190        }
3191
3192        total_quantity
3193    }
3194
3195    // -- POSITION QUERIES ------------------------------------------------------------------------
3196
3197    /// Returns a reference to the position with the `position_id` (if found).
3198    #[must_use]
3199    pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
3200        self.positions.get(position_id)
3201    }
3202
3203    /// Returns a reference to the position for the `client_order_id` (if found).
3204    #[must_use]
3205    pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
3206        self.index
3207            .order_position
3208            .get(client_order_id)
3209            .and_then(|position_id| self.positions.get(position_id))
3210    }
3211
3212    /// Returns a reference to the position ID for the `client_order_id` (if found).
3213    #[must_use]
3214    pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
3215        self.index.order_position.get(client_order_id)
3216    }
3217
3218    /// Returns a reference to all positions matching the optional filter parameters.
3219    #[must_use]
3220    pub fn positions(
3221        &self,
3222        venue: Option<&Venue>,
3223        instrument_id: Option<&InstrumentId>,
3224        strategy_id: Option<&StrategyId>,
3225        account_id: Option<&AccountId>,
3226        side: Option<PositionSide>,
3227    ) -> Vec<&Position> {
3228        let position_ids = self.position_ids(venue, instrument_id, strategy_id, account_id);
3229        self.get_positions_for_ids(&position_ids, side)
3230    }
3231
3232    /// Returns a reference to all open positions matching the optional filter parameters.
3233    #[must_use]
3234    pub fn positions_open(
3235        &self,
3236        venue: Option<&Venue>,
3237        instrument_id: Option<&InstrumentId>,
3238        strategy_id: Option<&StrategyId>,
3239        account_id: Option<&AccountId>,
3240        side: Option<PositionSide>,
3241    ) -> Vec<&Position> {
3242        let position_ids = self.position_open_ids(venue, instrument_id, strategy_id, account_id);
3243        self.get_positions_for_ids(&position_ids, side)
3244    }
3245
3246    /// Returns a reference to all closed positions matching the optional filter parameters.
3247    #[must_use]
3248    pub fn positions_closed(
3249        &self,
3250        venue: Option<&Venue>,
3251        instrument_id: Option<&InstrumentId>,
3252        strategy_id: Option<&StrategyId>,
3253        account_id: Option<&AccountId>,
3254        side: Option<PositionSide>,
3255    ) -> Vec<&Position> {
3256        let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id, account_id);
3257        self.get_positions_for_ids(&position_ids, side)
3258    }
3259
3260    /// Returns whether a position with the `position_id` exists.
3261    #[must_use]
3262    pub fn position_exists(&self, position_id: &PositionId) -> bool {
3263        self.index.positions.contains(position_id)
3264    }
3265
3266    /// Returns whether a position with the `position_id` is open.
3267    #[must_use]
3268    pub fn is_position_open(&self, position_id: &PositionId) -> bool {
3269        self.index.positions_open.contains(position_id)
3270    }
3271
3272    /// Returns whether a position with the `position_id` is closed.
3273    #[must_use]
3274    pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
3275        self.index.positions_closed.contains(position_id)
3276    }
3277
3278    /// Returns the count of all open positions.
3279    #[must_use]
3280    pub fn positions_open_count(
3281        &self,
3282        venue: Option<&Venue>,
3283        instrument_id: Option<&InstrumentId>,
3284        strategy_id: Option<&StrategyId>,
3285        account_id: Option<&AccountId>,
3286        side: Option<PositionSide>,
3287    ) -> usize {
3288        self.positions_open(venue, instrument_id, strategy_id, account_id, side)
3289            .len()
3290    }
3291
3292    /// Returns the count of all closed positions.
3293    #[must_use]
3294    pub fn positions_closed_count(
3295        &self,
3296        venue: Option<&Venue>,
3297        instrument_id: Option<&InstrumentId>,
3298        strategy_id: Option<&StrategyId>,
3299        account_id: Option<&AccountId>,
3300        side: Option<PositionSide>,
3301    ) -> usize {
3302        self.positions_closed(venue, instrument_id, strategy_id, account_id, side)
3303            .len()
3304    }
3305
3306    /// Returns the count of all positions.
3307    #[must_use]
3308    pub fn positions_total_count(
3309        &self,
3310        venue: Option<&Venue>,
3311        instrument_id: Option<&InstrumentId>,
3312        strategy_id: Option<&StrategyId>,
3313        account_id: Option<&AccountId>,
3314        side: Option<PositionSide>,
3315    ) -> usize {
3316        self.positions(venue, instrument_id, strategy_id, account_id, side)
3317            .len()
3318    }
3319
3320    // -- STRATEGY QUERIES ------------------------------------------------------------------------
3321
3322    /// Gets a reference to the strategy ID for the `client_order_id` (if found).
3323    #[must_use]
3324    pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
3325        self.index.order_strategy.get(client_order_id)
3326    }
3327
3328    /// Gets a reference to the strategy ID for the `position_id` (if found).
3329    #[must_use]
3330    pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
3331        self.index.position_strategy.get(position_id)
3332    }
3333
3334    // -- GENERAL ---------------------------------------------------------------------------------
3335
3336    /// Gets a reference to the general value for the `key` (if found).
3337    ///
3338    /// # Errors
3339    ///
3340    /// Returns an error if the `key` is invalid.
3341    pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
3342        check_valid_string_ascii(key, stringify!(key))?;
3343
3344        Ok(self.general.get(key))
3345    }
3346
3347    // -- DATA QUERIES ----------------------------------------------------------------------------
3348
3349    /// Returns the price for the `instrument_id` and `price_type` (if found).
3350    #[must_use]
3351    pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
3352        match price_type {
3353            PriceType::Bid => self
3354                .quotes
3355                .get(instrument_id)
3356                .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
3357            PriceType::Ask => self
3358                .quotes
3359                .get(instrument_id)
3360                .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
3361            PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
3362                quotes.front().map(|quote| {
3363                    Price::new(
3364                        f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
3365                        quote.bid_price.precision + 1,
3366                    )
3367                })
3368            }),
3369            PriceType::Last => self
3370                .trades
3371                .get(instrument_id)
3372                .and_then(|trades| trades.front().map(|trade| trade.price)),
3373            PriceType::Mark => self
3374                .mark_prices
3375                .get(instrument_id)
3376                .and_then(|marks| marks.front().map(|mark| mark.value)),
3377        }
3378    }
3379
3380    /// Gets all quotes for the `instrument_id`.
3381    #[must_use]
3382    pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
3383        self.quotes
3384            .get(instrument_id)
3385            .map(|quotes| quotes.iter().copied().collect())
3386    }
3387
3388    /// Gets all trades for the `instrument_id`.
3389    #[must_use]
3390    pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
3391        self.trades
3392            .get(instrument_id)
3393            .map(|trades| trades.iter().copied().collect())
3394    }
3395
3396    /// Gets all mark price updates for the `instrument_id`.
3397    #[must_use]
3398    pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
3399        self.mark_prices
3400            .get(instrument_id)
3401            .map(|mark_prices| mark_prices.iter().copied().collect())
3402    }
3403
3404    /// Gets all index price updates for the `instrument_id`.
3405    #[must_use]
3406    pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
3407        self.index_prices
3408            .get(instrument_id)
3409            .map(|index_prices| index_prices.iter().copied().collect())
3410    }
3411
3412    /// Gets all funding rate updates for the `instrument_id`.
3413    #[must_use]
3414    pub fn funding_rates(&self, instrument_id: &InstrumentId) -> Option<Vec<FundingRateUpdate>> {
3415        self.funding_rates
3416            .get(instrument_id)
3417            .map(|funding_rates| funding_rates.iter().copied().collect())
3418    }
3419
3420    /// Gets all bars for the `bar_type`.
3421    #[must_use]
3422    pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
3423        self.bars
3424            .get(bar_type)
3425            .map(|bars| bars.iter().copied().collect())
3426    }
3427
3428    /// Gets a reference to the order book for the `instrument_id`.
3429    #[must_use]
3430    pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
3431        self.books.get(instrument_id)
3432    }
3433
3434    /// Gets a reference to the order book for the `instrument_id`.
3435    #[must_use]
3436    pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
3437        self.books.get_mut(instrument_id)
3438    }
3439
3440    /// Gets a reference to the own order book for the `instrument_id`.
3441    #[must_use]
3442    pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
3443        self.own_books.get(instrument_id)
3444    }
3445
3446    /// Gets a reference to the own order book for the `instrument_id`.
3447    #[must_use]
3448    pub fn own_order_book_mut(
3449        &mut self,
3450        instrument_id: &InstrumentId,
3451    ) -> Option<&mut OwnOrderBook> {
3452        self.own_books.get_mut(instrument_id)
3453    }
3454
3455    /// Gets a reference to the latest quote for the `instrument_id`.
3456    #[must_use]
3457    pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
3458        self.quotes
3459            .get(instrument_id)
3460            .and_then(|quotes| quotes.front())
3461    }
3462
3463    /// Gets a reference to the quote at `index` for the `instrument_id`.
3464    ///
3465    /// Index 0 is the most recent.
3466    #[must_use]
3467    pub fn quote_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<&QuoteTick> {
3468        self.quotes
3469            .get(instrument_id)
3470            .and_then(|quotes| quotes.get(index))
3471    }
3472
3473    /// Gets a reference to the latest trade for the `instrument_id`.
3474    #[must_use]
3475    pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
3476        self.trades
3477            .get(instrument_id)
3478            .and_then(|trades| trades.front())
3479    }
3480
3481    /// Gets a reference to the trade at `index` for the `instrument_id`.
3482    ///
3483    /// Index 0 is the most recent.
3484    #[must_use]
3485    pub fn trade_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<&TradeTick> {
3486        self.trades
3487            .get(instrument_id)
3488            .and_then(|trades| trades.get(index))
3489    }
3490
3491    /// Gets a reference to the latest mark price update for the `instrument_id`.
3492    #[must_use]
3493    pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
3494        self.mark_prices
3495            .get(instrument_id)
3496            .and_then(|mark_prices| mark_prices.front())
3497    }
3498
3499    /// Gets a reference to the latest index price update for the `instrument_id`.
3500    #[must_use]
3501    pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
3502        self.index_prices
3503            .get(instrument_id)
3504            .and_then(|index_prices| index_prices.front())
3505    }
3506
3507    /// Gets a reference to the latest funding rate update for the `instrument_id`.
3508    #[must_use]
3509    pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
3510        self.funding_rates
3511            .get(instrument_id)
3512            .and_then(|funding_rates| funding_rates.front())
3513    }
3514
3515    /// Gets a reference to the latest bar for the `bar_type`.
3516    #[must_use]
3517    pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
3518        self.bars.get(bar_type).and_then(|bars| bars.front())
3519    }
3520
3521    /// Gets a reference to the bar at `index` for the `bar_type`.
3522    ///
3523    /// Index 0 is the most recent.
3524    #[must_use]
3525    pub fn bar_at_index(&self, bar_type: &BarType, index: usize) -> Option<&Bar> {
3526        self.bars.get(bar_type).and_then(|bars| bars.get(index))
3527    }
3528
3529    /// Gets the order book update count for the `instrument_id`.
3530    #[must_use]
3531    pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
3532        self.books
3533            .get(instrument_id)
3534            .map_or(0, |book| book.update_count) as usize
3535    }
3536
3537    /// Gets the quote tick count for the `instrument_id`.
3538    #[must_use]
3539    pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
3540        self.quotes
3541            .get(instrument_id)
3542            .map_or(0, std::collections::VecDeque::len)
3543    }
3544
3545    /// Gets the trade tick count for the `instrument_id`.
3546    #[must_use]
3547    pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
3548        self.trades
3549            .get(instrument_id)
3550            .map_or(0, std::collections::VecDeque::len)
3551    }
3552
3553    /// Gets the bar count for the `instrument_id`.
3554    #[must_use]
3555    pub fn bar_count(&self, bar_type: &BarType) -> usize {
3556        self.bars
3557            .get(bar_type)
3558            .map_or(0, std::collections::VecDeque::len)
3559    }
3560
3561    /// Returns whether the cache contains an order book for the `instrument_id`.
3562    #[must_use]
3563    pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
3564        self.books.contains_key(instrument_id)
3565    }
3566
3567    /// Returns whether the cache contains quotes for the `instrument_id`.
3568    #[must_use]
3569    pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
3570        self.quote_count(instrument_id) > 0
3571    }
3572
3573    /// Returns whether the cache contains trades for the `instrument_id`.
3574    #[must_use]
3575    pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
3576        self.trade_count(instrument_id) > 0
3577    }
3578
3579    /// Returns whether the cache contains bars for the `bar_type`.
3580    #[must_use]
3581    pub fn has_bars(&self, bar_type: &BarType) -> bool {
3582        self.bar_count(bar_type) > 0
3583    }
3584
3585    #[must_use]
3586    pub fn get_xrate(
3587        &self,
3588        venue: Venue,
3589        from_currency: Currency,
3590        to_currency: Currency,
3591        price_type: PriceType,
3592    ) -> Option<f64> {
3593        if from_currency == to_currency {
3594            // When the source and target currencies are identical,
3595            // no conversion is needed; return an exchange rate of 1.0.
3596            return Some(1.0);
3597        }
3598
3599        let (bid_quote, ask_quote) = self.build_quote_table(&venue);
3600
3601        match get_exchange_rate(
3602            from_currency.code,
3603            to_currency.code,
3604            price_type,
3605            bid_quote,
3606            ask_quote,
3607        ) {
3608            Ok(rate) => rate,
3609            Err(e) => {
3610                log::error!("Failed to calculate xrate: {e}");
3611                None
3612            }
3613        }
3614    }
3615
3616    fn build_quote_table(&self, venue: &Venue) -> (AHashMap<String, f64>, AHashMap<String, f64>) {
3617        let mut bid_quotes = AHashMap::new();
3618        let mut ask_quotes = AHashMap::new();
3619
3620        for instrument_id in self.instruments.keys() {
3621            if instrument_id.venue != *venue {
3622                continue;
3623            }
3624
3625            let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
3626                if let Some(tick) = ticks.front() {
3627                    (tick.bid_price, tick.ask_price)
3628                } else {
3629                    continue; // Empty ticks vector
3630                }
3631            } else {
3632                let bid_bar = self
3633                    .bars
3634                    .iter()
3635                    .find(|(k, _)| {
3636                        k.instrument_id() == *instrument_id
3637                            && matches!(k.spec().price_type, PriceType::Bid)
3638                    })
3639                    .map(|(_, v)| v);
3640
3641                let ask_bar = self
3642                    .bars
3643                    .iter()
3644                    .find(|(k, _)| {
3645                        k.instrument_id() == *instrument_id
3646                            && matches!(k.spec().price_type, PriceType::Ask)
3647                    })
3648                    .map(|(_, v)| v);
3649
3650                match (bid_bar, ask_bar) {
3651                    (Some(bid), Some(ask)) => {
3652                        match (bid.front(), ask.front()) {
3653                            (Some(bid_bar), Some(ask_bar)) => (bid_bar.close, ask_bar.close),
3654                            _ => {
3655                                // Empty bar VecDeques
3656                                continue;
3657                            }
3658                        }
3659                    }
3660                    _ => continue,
3661                }
3662            };
3663
3664            bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
3665            ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
3666        }
3667
3668        (bid_quotes, ask_quotes)
3669    }
3670
3671    /// Returns the mark exchange rate for the given currency pair, or `None` if not set.
3672    #[must_use]
3673    pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
3674        self.mark_xrates.get(&(from_currency, to_currency)).copied()
3675    }
3676
3677    /// Sets the mark exchange rate for the given currency pair and automatically sets the inverse rate.
3678    ///
3679    /// # Panics
3680    ///
3681    /// Panics if `xrate` is not positive.
3682    pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
3683        assert!(xrate > 0.0, "xrate was zero");
3684        self.mark_xrates.insert((from_currency, to_currency), xrate);
3685        self.mark_xrates
3686            .insert((to_currency, from_currency), 1.0 / xrate);
3687    }
3688
3689    /// Clears the mark exchange rate for the given currency pair.
3690    pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
3691        let _ = self.mark_xrates.remove(&(from_currency, to_currency));
3692    }
3693
3694    /// Clears all mark exchange rates.
3695    pub fn clear_mark_xrates(&mut self) {
3696        self.mark_xrates.clear();
3697    }
3698
3699    // -- INSTRUMENT QUERIES ----------------------------------------------------------------------
3700
3701    /// Returns a reference to the instrument for the `instrument_id` (if found).
3702    #[must_use]
3703    pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
3704        self.instruments.get(instrument_id)
3705    }
3706
3707    /// Returns references to all instrument IDs for the `venue`.
3708    #[must_use]
3709    pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
3710        match venue {
3711            Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
3712            None => self.instruments.keys().collect(),
3713        }
3714    }
3715
3716    /// Returns references to all instruments for the `venue`.
3717    #[must_use]
3718    pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
3719        self.instruments
3720            .values()
3721            .filter(|i| &i.id().venue == venue)
3722            .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
3723            .collect()
3724    }
3725
3726    /// Returns references to all bar types contained in the cache.
3727    #[must_use]
3728    pub fn bar_types(
3729        &self,
3730        instrument_id: Option<&InstrumentId>,
3731        price_type: Option<&PriceType>,
3732        aggregation_source: AggregationSource,
3733    ) -> Vec<&BarType> {
3734        let mut bar_types = self
3735            .bars
3736            .keys()
3737            .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
3738            .collect::<Vec<&BarType>>();
3739
3740        if let Some(instrument_id) = instrument_id {
3741            bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
3742        }
3743
3744        if let Some(price_type) = price_type {
3745            bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
3746        }
3747
3748        bar_types
3749    }
3750
3751    // -- SYNTHETIC QUERIES -----------------------------------------------------------------------
3752
3753    /// Returns a reference to the synthetic instrument for the `instrument_id` (if found).
3754    #[must_use]
3755    pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
3756        self.synthetics.get(instrument_id)
3757    }
3758
3759    /// Returns references to instrument IDs for all synthetic instruments contained in the cache.
3760    #[must_use]
3761    pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
3762        self.synthetics.keys().collect()
3763    }
3764
3765    /// Returns references to all synthetic instruments contained in the cache.
3766    #[must_use]
3767    pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
3768        self.synthetics.values().collect()
3769    }
3770
3771    // -- ACCOUNT QUERIES -----------------------------------------------------------------------
3772
3773    /// Returns a reference to the account for the `account_id` (if found).
3774    #[must_use]
3775    pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
3776        self.accounts.get(account_id)
3777    }
3778
3779    /// Returns a reference to the account for the `venue` (if found).
3780    #[must_use]
3781    pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
3782        self.index
3783            .venue_account
3784            .get(venue)
3785            .and_then(|account_id| self.accounts.get(account_id))
3786    }
3787
3788    /// Returns a reference to the account ID for the `venue` (if found).
3789    #[must_use]
3790    pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
3791        self.index.venue_account.get(venue)
3792    }
3793
3794    /// Returns references to all accounts for the `account_id`.
3795    #[must_use]
3796    pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
3797        self.accounts
3798            .values()
3799            .filter(|account| &account.id() == account_id)
3800            .collect()
3801    }
3802
3803    /// Updates the own order book with an order.
3804    ///
3805    /// This method adds, updates, or removes an order from the own order book
3806    /// based on the order's current state.
3807    ///
3808    /// Orders without prices (MARKET, etc.) are skipped as they cannot be
3809    /// represented in own books.
3810    pub fn update_own_order_book(&mut self, order: &OrderAny) {
3811        if !order.has_price() {
3812            return;
3813        }
3814
3815        let instrument_id = order.instrument_id();
3816
3817        let own_book = self
3818            .own_books
3819            .entry(instrument_id)
3820            .or_insert_with(|| OwnOrderBook::new(instrument_id));
3821
3822        let own_book_order = order.to_own_book_order();
3823
3824        if order.is_closed() {
3825            if let Err(e) = own_book.delete(own_book_order) {
3826                log::debug!(
3827                    "Failed to delete order {} from own book: {e}",
3828                    order.client_order_id(),
3829                );
3830            } else {
3831                log::debug!("Deleted order {} from own book", order.client_order_id());
3832            }
3833        } else {
3834            // Add or update the order in the own book
3835            if let Err(e) = own_book.update(own_book_order) {
3836                log::debug!(
3837                    "Failed to update order {} in own book: {e}; inserting instead",
3838                    order.client_order_id(),
3839                );
3840                own_book.add(own_book_order);
3841            }
3842            log::debug!("Updated order {} in own book", order.client_order_id());
3843        }
3844    }
3845
3846    /// Force removal of an order from own order books and clean up all indexes.
3847    ///
3848    /// This method is used when order event application fails and we need to ensure
3849    /// terminal orders are properly cleaned up from own books and all relevant indexes.
3850    /// Replicates the index cleanup that update_order performs for closed orders.
3851    pub fn force_remove_from_own_order_book(&mut self, client_order_id: &ClientOrderId) {
3852        let order = match self.orders.get(client_order_id) {
3853            Some(order) => order,
3854            None => return,
3855        };
3856
3857        self.index.orders_open.remove(client_order_id);
3858        self.index.orders_pending_cancel.remove(client_order_id);
3859        self.index.orders_inflight.remove(client_order_id);
3860        self.index.orders_emulated.remove(client_order_id);
3861        self.index.orders_active_local.remove(client_order_id);
3862
3863        if let Some(own_book) = self.own_books.get_mut(&order.instrument_id())
3864            && order.has_price()
3865        {
3866            let own_book_order = order.to_own_book_order();
3867            if let Err(e) = own_book.delete(own_book_order) {
3868                log::debug!("Could not force delete {client_order_id} from own book: {e}");
3869            } else {
3870                log::debug!("Force deleted {client_order_id} from own book");
3871            }
3872        }
3873
3874        self.index.orders_closed.insert(*client_order_id);
3875    }
3876
3877    /// Audit all own order books against open and inflight order indexes.
3878    ///
3879    /// Ensures closed orders are removed from own order books. This includes both
3880    /// orders tracked in `orders_open` (ACCEPTED, TRIGGERED, PENDING_*, PARTIALLY_FILLED)
3881    /// and `orders_inflight` (INITIALIZED, SUBMITTED) to prevent false positives
3882    /// during venue latency windows.
3883    pub fn audit_own_order_books(&mut self) {
3884        log::debug!("Starting own books audit");
3885        let start = std::time::Instant::now();
3886
3887        // Build union of open and inflight orders for audit,
3888        // this prevents false positives for SUBMITTED orders during venue latency.
3889        let valid_order_ids: AHashSet<ClientOrderId> = self
3890            .index
3891            .orders_open
3892            .union(&self.index.orders_inflight)
3893            .copied()
3894            .collect();
3895
3896        for own_book in self.own_books.values_mut() {
3897            own_book.audit_open_orders(&valid_order_ids);
3898        }
3899
3900        log::debug!("Completed own books audit in {:?}", start.elapsed());
3901    }
3902}