Skip to main content

nautilus_common/cache/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! In-memory cache for market and execution data, with optional persistent backing.
17//!
18//! Provides methods to load, query, and update cached data such as instruments, orders, and prices.
19
20pub mod config;
21pub mod database;
22pub mod fifo;
23pub mod quote;
24
25mod index;
26
27#[cfg(test)]
28mod tests;
29
30use std::{
31    collections::VecDeque,
32    fmt::{Debug, Display},
33    time::{SystemTime, UNIX_EPOCH},
34};
35
36use ahash::{AHashMap, AHashSet};
37use bytes::Bytes;
38pub use config::CacheConfig; // Re-export
39use database::{CacheDatabaseAdapter, CacheMap};
40use index::CacheIndex;
41use nautilus_core::{
42    UUID4, UnixNanos,
43    correctness::{
44        check_key_not_in_map, check_predicate_false, check_slice_not_empty,
45        check_valid_string_ascii,
46    },
47    datetime::secs_to_nanos_unchecked,
48};
49use nautilus_model::{
50    accounts::{Account, AccountAny},
51    data::{
52        Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate, QuoteTick,
53        TradeTick, YieldCurveData,
54    },
55    enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType},
56    identifiers::{
57        AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
58        OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
59    },
60    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
61    orderbook::{
62        OrderBook,
63        own::{OwnOrderBook, should_handle_own_book_order},
64    },
65    orders::{Order, OrderAny, OrderList},
66    position::Position,
67    types::{Currency, Money, Price, Quantity},
68};
69use ustr::Ustr;
70
71use crate::xrate::get_exchange_rate;
72
73/// A common in-memory `Cache` for market and execution related data.
74#[cfg_attr(
75    feature = "python",
76    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
77)]
78pub struct Cache {
79    config: CacheConfig,
80    index: CacheIndex,
81    database: Option<Box<dyn CacheDatabaseAdapter>>,
82    general: AHashMap<String, Bytes>,
83    currencies: AHashMap<Ustr, Currency>,
84    instruments: AHashMap<InstrumentId, InstrumentAny>,
85    synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
86    books: AHashMap<InstrumentId, OrderBook>,
87    own_books: AHashMap<InstrumentId, OwnOrderBook>,
88    quotes: AHashMap<InstrumentId, VecDeque<QuoteTick>>,
89    trades: AHashMap<InstrumentId, VecDeque<TradeTick>>,
90    mark_xrates: AHashMap<(Currency, Currency), f64>,
91    mark_prices: AHashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
92    index_prices: AHashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
93    funding_rates: AHashMap<InstrumentId, VecDeque<FundingRateUpdate>>,
94    bars: AHashMap<BarType, VecDeque<Bar>>,
95    greeks: AHashMap<InstrumentId, GreeksData>,
96    yield_curves: AHashMap<String, YieldCurveData>,
97    accounts: AHashMap<AccountId, AccountAny>,
98    orders: AHashMap<ClientOrderId, OrderAny>,
99    order_lists: AHashMap<OrderListId, OrderList>,
100    positions: AHashMap<PositionId, Position>,
101    position_snapshots: AHashMap<PositionId, Bytes>,
102    #[cfg(feature = "defi")]
103    pub(crate) defi: crate::defi::cache::DefiCache,
104}
105
106impl Debug for Cache {
107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108        f.debug_struct(stringify!(Cache))
109            .field("config", &self.config)
110            .field("index", &self.index)
111            .field("general", &self.general)
112            .field("currencies", &self.currencies)
113            .field("instruments", &self.instruments)
114            .field("synthetics", &self.synthetics)
115            .field("books", &self.books)
116            .field("own_books", &self.own_books)
117            .field("quotes", &self.quotes)
118            .field("trades", &self.trades)
119            .field("mark_xrates", &self.mark_xrates)
120            .field("mark_prices", &self.mark_prices)
121            .field("index_prices", &self.index_prices)
122            .field("funding_rates", &self.funding_rates)
123            .field("bars", &self.bars)
124            .field("greeks", &self.greeks)
125            .field("yield_curves", &self.yield_curves)
126            .field("accounts", &self.accounts)
127            .field("orders", &self.orders)
128            .field("order_lists", &self.order_lists)
129            .field("positions", &self.positions)
130            .field("position_snapshots", &self.position_snapshots)
131            .finish()
132    }
133}
134
135impl Default for Cache {
136    /// Creates a new default [`Cache`] instance.
137    fn default() -> Self {
138        Self::new(Some(CacheConfig::default()), None)
139    }
140}
141
142impl Cache {
143    /// Creates a new [`Cache`] instance with optional configuration and database adapter.
144    #[must_use]
145    /// # Note
146    ///
147    /// Uses provided `CacheConfig` or defaults, and optional `CacheDatabaseAdapter` for persistence.
148    pub fn new(
149        config: Option<CacheConfig>,
150        database: Option<Box<dyn CacheDatabaseAdapter>>,
151    ) -> Self {
152        Self {
153            config: config.unwrap_or_default(),
154            index: CacheIndex::default(),
155            database,
156            general: AHashMap::new(),
157            currencies: AHashMap::new(),
158            instruments: AHashMap::new(),
159            synthetics: AHashMap::new(),
160            books: AHashMap::new(),
161            own_books: AHashMap::new(),
162            quotes: AHashMap::new(),
163            trades: AHashMap::new(),
164            mark_xrates: AHashMap::new(),
165            mark_prices: AHashMap::new(),
166            index_prices: AHashMap::new(),
167            funding_rates: AHashMap::new(),
168            bars: AHashMap::new(),
169            greeks: AHashMap::new(),
170            yield_curves: AHashMap::new(),
171            accounts: AHashMap::new(),
172            orders: AHashMap::new(),
173            order_lists: AHashMap::new(),
174            positions: AHashMap::new(),
175            position_snapshots: AHashMap::new(),
176            #[cfg(feature = "defi")]
177            defi: crate::defi::cache::DefiCache::default(),
178        }
179    }
180
181    /// Returns the cache instances memory address.
182    #[must_use]
183    pub fn memory_address(&self) -> String {
184        format!("{:?}", std::ptr::from_ref(self))
185    }
186
187    /// Sets the cache database adapter for persistence.
188    ///
189    /// This allows setting or replacing the database adapter after cache construction.
190    pub fn set_database(&mut self, database: Box<dyn CacheDatabaseAdapter>) {
191        let type_name = std::any::type_name_of_val(&*database);
192        log::info!("Cache database adapter set: {type_name}");
193        self.database = Some(database);
194    }
195
196    // -- COMMANDS --------------------------------------------------------------------------------
197
198    /// Clears and reloads general entries from the database into the cache.
199    ///
200    /// # Errors
201    ///
202    /// Returns an error if loading general cache data fails.
203    pub fn cache_general(&mut self) -> anyhow::Result<()> {
204        self.general = match &mut self.database {
205            Some(db) => db.load()?,
206            None => AHashMap::new(),
207        };
208
209        log::info!(
210            "Cached {} general object(s) from database",
211            self.general.len()
212        );
213        Ok(())
214    }
215
216    /// Loads all core caches (currencies, instruments, accounts, orders, positions) from the database.
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if loading all cache data fails.
221    pub async fn cache_all(&mut self) -> anyhow::Result<()> {
222        let cache_map = match &self.database {
223            Some(db) => db.load_all().await?,
224            None => CacheMap::default(),
225        };
226
227        self.currencies = cache_map.currencies;
228        self.instruments = cache_map.instruments;
229        self.synthetics = cache_map.synthetics;
230        self.accounts = cache_map.accounts;
231        self.orders = cache_map.orders;
232        self.positions = cache_map.positions;
233        Ok(())
234    }
235
236    /// Clears and reloads the currency cache from the database.
237    ///
238    /// # Errors
239    ///
240    /// Returns an error if loading currencies cache fails.
241    pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
242        self.currencies = match &mut self.database {
243            Some(db) => db.load_currencies().await?,
244            None => AHashMap::new(),
245        };
246
247        log::info!("Cached {} currencies from database", self.general.len());
248        Ok(())
249    }
250
251    /// Clears and reloads the instrument cache from the database.
252    ///
253    /// # Errors
254    ///
255    /// Returns an error if loading instruments cache fails.
256    pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
257        self.instruments = match &mut self.database {
258            Some(db) => db.load_instruments().await?,
259            None => AHashMap::new(),
260        };
261
262        log::info!("Cached {} instruments from database", self.general.len());
263        Ok(())
264    }
265
266    /// Clears and reloads the synthetic instrument cache from the database.
267    ///
268    /// # Errors
269    ///
270    /// Returns an error if loading synthetic instruments cache fails.
271    pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
272        self.synthetics = match &mut self.database {
273            Some(db) => db.load_synthetics().await?,
274            None => AHashMap::new(),
275        };
276
277        log::info!(
278            "Cached {} synthetic instruments from database",
279            self.general.len()
280        );
281        Ok(())
282    }
283
284    /// Clears and reloads the account cache from the database.
285    ///
286    /// # Errors
287    ///
288    /// Returns an error if loading accounts cache fails.
289    pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
290        self.accounts = match &mut self.database {
291            Some(db) => db.load_accounts().await?,
292            None => AHashMap::new(),
293        };
294
295        log::info!(
296            "Cached {} synthetic instruments from database",
297            self.general.len()
298        );
299        Ok(())
300    }
301
302    /// Clears and reloads the order cache from the database.
303    ///
304    /// # Errors
305    ///
306    /// Returns an error if loading orders cache fails.
307    pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
308        self.orders = match &mut self.database {
309            Some(db) => db.load_orders().await?,
310            None => AHashMap::new(),
311        };
312
313        log::info!("Cached {} orders from database", self.general.len());
314        Ok(())
315    }
316
317    /// Clears and reloads the position cache from the database.
318    ///
319    /// # Errors
320    ///
321    /// Returns an error if loading positions cache fails.
322    pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
323        self.positions = match &mut self.database {
324            Some(db) => db.load_positions().await?,
325            None => AHashMap::new(),
326        };
327
328        log::info!("Cached {} positions from database", self.general.len());
329        Ok(())
330    }
331
332    /// Clears the current cache index and re-build.
333    pub fn build_index(&mut self) {
334        log::debug!("Building index");
335
336        // Index accounts
337        for account_id in self.accounts.keys() {
338            self.index
339                .venue_account
340                .insert(account_id.get_issuer(), *account_id);
341        }
342
343        // Index orders
344        for (client_order_id, order) in &self.orders {
345            let instrument_id = order.instrument_id();
346            let venue = instrument_id.venue;
347            let strategy_id = order.strategy_id();
348
349            // 1: Build index.venue_orders -> {Venue, {ClientOrderId}}
350            self.index
351                .venue_orders
352                .entry(venue)
353                .or_default()
354                .insert(*client_order_id);
355
356            // 2: Build index.order_ids -> {VenueOrderId, ClientOrderId}
357            if let Some(venue_order_id) = order.venue_order_id() {
358                self.index
359                    .venue_order_ids
360                    .insert(venue_order_id, *client_order_id);
361            }
362
363            // 3: Build index.order_position -> {ClientOrderId, PositionId}
364            if let Some(position_id) = order.position_id() {
365                self.index
366                    .order_position
367                    .insert(*client_order_id, position_id);
368            }
369
370            // 4: Build index.order_strategy -> {ClientOrderId, StrategyId}
371            self.index
372                .order_strategy
373                .insert(*client_order_id, order.strategy_id());
374
375            // 5: Build index.instrument_orders -> {InstrumentId, {ClientOrderId}}
376            self.index
377                .instrument_orders
378                .entry(instrument_id)
379                .or_default()
380                .insert(*client_order_id);
381
382            // 6: Build index.strategy_orders -> {StrategyId, {ClientOrderId}}
383            self.index
384                .strategy_orders
385                .entry(strategy_id)
386                .or_default()
387                .insert(*client_order_id);
388
389            // 7: Build index.account_orders -> {AccountId, {ClientOrderId}}
390            if let Some(account_id) = order.account_id() {
391                self.index
392                    .account_orders
393                    .entry(account_id)
394                    .or_default()
395                    .insert(*client_order_id);
396            }
397
398            // 8: Build index.exec_algorithm_orders -> {ExecAlgorithmId, {ClientOrderId}}
399            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
400                self.index
401                    .exec_algorithm_orders
402                    .entry(exec_algorithm_id)
403                    .or_default()
404                    .insert(*client_order_id);
405            }
406
407            // 8: Build index.exec_spawn_orders -> {ClientOrderId, {ClientOrderId}}
408            if let Some(exec_spawn_id) = order.exec_spawn_id() {
409                self.index
410                    .exec_spawn_orders
411                    .entry(exec_spawn_id)
412                    .or_default()
413                    .insert(*client_order_id);
414            }
415
416            // 9: Build index.orders -> {ClientOrderId}
417            self.index.orders.insert(*client_order_id);
418
419            // 10: Build index.orders_open -> {ClientOrderId}
420            if order.is_open() {
421                self.index.orders_open.insert(*client_order_id);
422            }
423
424            // 11: Build index.orders_closed -> {ClientOrderId}
425            if order.is_closed() {
426                self.index.orders_closed.insert(*client_order_id);
427            }
428
429            // 12: Build index.orders_emulated -> {ClientOrderId}
430            if let Some(emulation_trigger) = order.emulation_trigger()
431                && emulation_trigger != TriggerType::NoTrigger
432                && !order.is_closed()
433            {
434                self.index.orders_emulated.insert(*client_order_id);
435            }
436
437            // 13: Build index.orders_inflight -> {ClientOrderId}
438            if order.is_inflight() {
439                self.index.orders_inflight.insert(*client_order_id);
440            }
441
442            // 14: Build index.strategies -> {StrategyId}
443            self.index.strategies.insert(strategy_id);
444
445            // 15: Build index.strategies -> {ExecAlgorithmId}
446            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
447                self.index.exec_algorithms.insert(exec_algorithm_id);
448            }
449        }
450
451        // Index positions
452        for (position_id, position) in &self.positions {
453            let instrument_id = position.instrument_id;
454            let venue = instrument_id.venue;
455            let strategy_id = position.strategy_id;
456
457            // 1: Build index.venue_positions -> {Venue, {PositionId}}
458            self.index
459                .venue_positions
460                .entry(venue)
461                .or_default()
462                .insert(*position_id);
463
464            // 2: Build index.position_strategy -> {PositionId, StrategyId}
465            self.index
466                .position_strategy
467                .insert(*position_id, position.strategy_id);
468
469            // 3: Build index.position_orders -> {PositionId, {ClientOrderId}}
470            self.index
471                .position_orders
472                .entry(*position_id)
473                .or_default()
474                .extend(position.client_order_ids().into_iter());
475
476            // 4: Build index.instrument_positions -> {InstrumentId, {PositionId}}
477            self.index
478                .instrument_positions
479                .entry(instrument_id)
480                .or_default()
481                .insert(*position_id);
482
483            // 5: Build index.strategy_positions -> {StrategyId, {PositionId}}
484            self.index
485                .strategy_positions
486                .entry(strategy_id)
487                .or_default()
488                .insert(*position_id);
489
490            // 6: Build index.account_positions -> {AccountId, {PositionId}}
491            self.index
492                .account_positions
493                .entry(position.account_id)
494                .or_default()
495                .insert(*position_id);
496
497            // 7: Build index.positions -> {PositionId}
498            self.index.positions.insert(*position_id);
499
500            // 8: Build index.positions_open -> {PositionId}
501            if position.is_open() {
502                self.index.positions_open.insert(*position_id);
503            }
504
505            // 9: Build index.positions_closed -> {PositionId}
506            if position.is_closed() {
507                self.index.positions_closed.insert(*position_id);
508            }
509
510            // 10: Build index.strategies -> {StrategyId}
511            self.index.strategies.insert(strategy_id);
512        }
513    }
514
515    /// Returns whether the cache has a backing database.
516    #[must_use]
517    pub const fn has_backing(&self) -> bool {
518        self.config.database.is_some()
519    }
520
521    // Calculate the unrealized profit and loss (PnL) for `position`.
522    #[must_use]
523    pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
524        let quote = if let Some(quote) = self.quote(&position.instrument_id) {
525            quote
526        } else {
527            log::warn!(
528                "Cannot calculate unrealized PnL for {}, no quotes for {}",
529                position.id,
530                position.instrument_id
531            );
532            return None;
533        };
534
535        // Use exit price for mark-to-market: longs exit at bid, shorts exit at ask
536        let last = match position.side {
537            PositionSide::Flat | PositionSide::NoPositionSide => {
538                return Some(Money::new(0.0, position.settlement_currency));
539            }
540            PositionSide::Long => quote.bid_price,
541            PositionSide::Short => quote.ask_price,
542        };
543
544        Some(position.unrealized_pnl(last))
545    }
546
547    /// Checks integrity of data within the cache.
548    ///
549    /// All data should be loaded from the database prior to this call.
550    /// If an error is found then a log error message will also be produced.
551    ///
552    /// # Panics
553    ///
554    /// Panics if failure calling system clock.
555    #[must_use]
556    pub fn check_integrity(&mut self) -> bool {
557        let mut error_count = 0;
558        let failure = "Integrity failure";
559
560        // Get current timestamp in microseconds
561        let timestamp_us = SystemTime::now()
562            .duration_since(UNIX_EPOCH)
563            .expect("Time went backwards")
564            .as_micros();
565
566        log::info!("Checking data integrity");
567
568        // Check object caches
569        for account_id in self.accounts.keys() {
570            if !self
571                .index
572                .venue_account
573                .contains_key(&account_id.get_issuer())
574            {
575                log::error!(
576                    "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
577                );
578                error_count += 1;
579            }
580        }
581
582        for (client_order_id, order) in &self.orders {
583            if !self.index.order_strategy.contains_key(client_order_id) {
584                log::error!(
585                    "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
586                );
587                error_count += 1;
588            }
589
590            if !self.index.orders.contains(client_order_id) {
591                log::error!(
592                    "{failure} in orders: {client_order_id} not found in `self.index.orders`",
593                );
594                error_count += 1;
595            }
596
597            if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
598                log::error!(
599                    "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
600                );
601                error_count += 1;
602            }
603
604            if order.is_open() && !self.index.orders_open.contains(client_order_id) {
605                log::error!(
606                    "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
607                );
608                error_count += 1;
609            }
610
611            if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
612                log::error!(
613                    "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
614                );
615                error_count += 1;
616            }
617
618            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
619                if !self
620                    .index
621                    .exec_algorithm_orders
622                    .contains_key(&exec_algorithm_id)
623                {
624                    log::error!(
625                        "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
626                    );
627                    error_count += 1;
628                }
629
630                if order.exec_spawn_id().is_none()
631                    && !self.index.exec_spawn_orders.contains_key(client_order_id)
632                {
633                    log::error!(
634                        "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
635                    );
636                    error_count += 1;
637                }
638            }
639        }
640
641        for (position_id, position) in &self.positions {
642            if !self.index.position_strategy.contains_key(position_id) {
643                log::error!(
644                    "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
645                );
646                error_count += 1;
647            }
648
649            if !self.index.position_orders.contains_key(position_id) {
650                log::error!(
651                    "{failure} in positions: {position_id} not found in `self.index.position_orders`",
652                );
653                error_count += 1;
654            }
655
656            if !self.index.positions.contains(position_id) {
657                log::error!(
658                    "{failure} in positions: {position_id} not found in `self.index.positions`",
659                );
660                error_count += 1;
661            }
662
663            if position.is_open() && !self.index.positions_open.contains(position_id) {
664                log::error!(
665                    "{failure} in positions: {position_id} not found in `self.index.positions_open`",
666                );
667                error_count += 1;
668            }
669
670            if position.is_closed() && !self.index.positions_closed.contains(position_id) {
671                log::error!(
672                    "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
673                );
674                error_count += 1;
675            }
676        }
677
678        // Check indexes
679        for account_id in self.index.venue_account.values() {
680            if !self.accounts.contains_key(account_id) {
681                log::error!(
682                    "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
683                );
684                error_count += 1;
685            }
686        }
687
688        for client_order_id in self.index.venue_order_ids.values() {
689            if !self.orders.contains_key(client_order_id) {
690                log::error!(
691                    "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
692                );
693                error_count += 1;
694            }
695        }
696
697        for client_order_id in self.index.client_order_ids.keys() {
698            if !self.orders.contains_key(client_order_id) {
699                log::error!(
700                    "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
701                );
702                error_count += 1;
703            }
704        }
705
706        for client_order_id in self.index.order_position.keys() {
707            if !self.orders.contains_key(client_order_id) {
708                log::error!(
709                    "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
710                );
711                error_count += 1;
712            }
713        }
714
715        // Check indexes
716        for client_order_id in self.index.order_strategy.keys() {
717            if !self.orders.contains_key(client_order_id) {
718                log::error!(
719                    "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
720                );
721                error_count += 1;
722            }
723        }
724
725        for position_id in self.index.position_strategy.keys() {
726            if !self.positions.contains_key(position_id) {
727                log::error!(
728                    "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
729                );
730                error_count += 1;
731            }
732        }
733
734        for position_id in self.index.position_orders.keys() {
735            if !self.positions.contains_key(position_id) {
736                log::error!(
737                    "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
738                );
739                error_count += 1;
740            }
741        }
742
743        for (instrument_id, client_order_ids) in &self.index.instrument_orders {
744            for client_order_id in client_order_ids {
745                if !self.orders.contains_key(client_order_id) {
746                    log::error!(
747                        "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
748                    );
749                    error_count += 1;
750                }
751            }
752        }
753
754        for instrument_id in self.index.instrument_positions.keys() {
755            if !self.index.instrument_orders.contains_key(instrument_id) {
756                log::error!(
757                    "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
758                );
759                error_count += 1;
760            }
761        }
762
763        for client_order_ids in self.index.strategy_orders.values() {
764            for client_order_id in client_order_ids {
765                if !self.orders.contains_key(client_order_id) {
766                    log::error!(
767                        "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
768                    );
769                    error_count += 1;
770                }
771            }
772        }
773
774        for position_ids in self.index.strategy_positions.values() {
775            for position_id in position_ids {
776                if !self.positions.contains_key(position_id) {
777                    log::error!(
778                        "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
779                    );
780                    error_count += 1;
781                }
782            }
783        }
784
785        for client_order_id in &self.index.orders {
786            if !self.orders.contains_key(client_order_id) {
787                log::error!(
788                    "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
789                );
790                error_count += 1;
791            }
792        }
793
794        for client_order_id in &self.index.orders_emulated {
795            if !self.orders.contains_key(client_order_id) {
796                log::error!(
797                    "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
798                );
799                error_count += 1;
800            }
801        }
802
803        for client_order_id in &self.index.orders_inflight {
804            if !self.orders.contains_key(client_order_id) {
805                log::error!(
806                    "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
807                );
808                error_count += 1;
809            }
810        }
811
812        for client_order_id in &self.index.orders_open {
813            if !self.orders.contains_key(client_order_id) {
814                log::error!(
815                    "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
816                );
817                error_count += 1;
818            }
819        }
820
821        for client_order_id in &self.index.orders_closed {
822            if !self.orders.contains_key(client_order_id) {
823                log::error!(
824                    "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
825                );
826                error_count += 1;
827            }
828        }
829
830        for position_id in &self.index.positions {
831            if !self.positions.contains_key(position_id) {
832                log::error!(
833                    "{failure} in `index.positions`: {position_id} not found in `self.positions`",
834                );
835                error_count += 1;
836            }
837        }
838
839        for position_id in &self.index.positions_open {
840            if !self.positions.contains_key(position_id) {
841                log::error!(
842                    "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
843                );
844                error_count += 1;
845            }
846        }
847
848        for position_id in &self.index.positions_closed {
849            if !self.positions.contains_key(position_id) {
850                log::error!(
851                    "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
852                );
853                error_count += 1;
854            }
855        }
856
857        for strategy_id in &self.index.strategies {
858            if !self.index.strategy_orders.contains_key(strategy_id) {
859                log::error!(
860                    "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
861                );
862                error_count += 1;
863            }
864        }
865
866        for exec_algorithm_id in &self.index.exec_algorithms {
867            if !self
868                .index
869                .exec_algorithm_orders
870                .contains_key(exec_algorithm_id)
871            {
872                log::error!(
873                    "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
874                );
875                error_count += 1;
876            }
877        }
878
879        let total_us = SystemTime::now()
880            .duration_since(UNIX_EPOCH)
881            .expect("Time went backwards")
882            .as_micros()
883            - timestamp_us;
884
885        if error_count == 0 {
886            log::info!("Integrity check passed in {total_us}μs");
887            true
888        } else {
889            log::error!(
890                "Integrity check failed with {error_count} error{} in {total_us}μs",
891                if error_count == 1 { "" } else { "s" },
892            );
893            false
894        }
895    }
896
897    /// Checks for any residual open state and log warnings if any are found.
898    ///
899    ///'Open state' is considered to be open orders and open positions.
900    #[must_use]
901    pub fn check_residuals(&self) -> bool {
902        log::debug!("Checking residuals");
903
904        let mut residuals = false;
905
906        // Check for any open orders
907        for order in self.orders_open(None, None, None, None, None) {
908            residuals = true;
909            log::warn!("Residual {order}");
910        }
911
912        // Check for any open positions
913        for position in self.positions_open(None, None, None, None, None) {
914            residuals = true;
915            log::warn!("Residual {position}");
916        }
917
918        residuals
919    }
920
921    /// Purges all closed orders from the cache that are older than `buffer_secs`.
922    ///
923    ///
924    /// Only orders that have been closed for at least this amount of time will be purged.
925    /// A value of 0 means purge all closed orders regardless of when they were closed.
926    pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
927        log::debug!(
928            "Purging closed orders{}",
929            if buffer_secs > 0 {
930                format!(" with buffer_secs={buffer_secs}")
931            } else {
932                String::new()
933            }
934        );
935
936        let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
937
938        let mut affected_order_list_ids: AHashSet<OrderListId> = AHashSet::new();
939
940        'outer: for client_order_id in self.index.orders_closed.clone() {
941            if let Some(order) = self.orders.get(&client_order_id)
942                && order.is_closed()
943                && let Some(ts_closed) = order.ts_closed()
944                && ts_closed + buffer_ns <= ts_now
945            {
946                // Check any linked orders (contingency orders)
947                if let Some(linked_order_ids) = order.linked_order_ids() {
948                    for linked_order_id in linked_order_ids {
949                        if let Some(linked_order) = self.orders.get(linked_order_id)
950                            && linked_order.is_open()
951                        {
952                            // Do not purge if linked order still open
953                            continue 'outer;
954                        }
955                    }
956                }
957
958                if let Some(order_list_id) = order.order_list_id() {
959                    affected_order_list_ids.insert(order_list_id);
960                }
961
962                self.purge_order(client_order_id);
963            }
964        }
965
966        for order_list_id in affected_order_list_ids {
967            if let Some(order_list) = self.order_lists.get(&order_list_id) {
968                let all_purged = order_list
969                    .client_order_ids
970                    .iter()
971                    .all(|id| !self.orders.contains_key(id));
972
973                if all_purged {
974                    self.order_lists.remove(&order_list_id);
975                    log::info!("Purged {order_list_id}");
976                }
977            }
978        }
979    }
980
981    /// Purges all closed positions from the cache that are older than `buffer_secs`.
982    pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
983        log::debug!(
984            "Purging closed positions{}",
985            if buffer_secs > 0 {
986                format!(" with buffer_secs={buffer_secs}")
987            } else {
988                String::new()
989            }
990        );
991
992        let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
993
994        for position_id in self.index.positions_closed.clone() {
995            if let Some(position) = self.positions.get(&position_id)
996                && position.is_closed()
997                && let Some(ts_closed) = position.ts_closed
998                && ts_closed + buffer_ns <= ts_now
999            {
1000                self.purge_position(position_id);
1001            }
1002        }
1003    }
1004
1005    /// Purges the order with the `client_order_id` from the cache (if found).
1006    ///
1007    /// For safety, an order is prevented from being purged if it's open.
1008    pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
1009        // Check if order exists and is safe to purge before removing
1010        let order = self.orders.get(&client_order_id).cloned();
1011
1012        // Prevent purging open orders
1013        if let Some(ref ord) = order
1014            && ord.is_open()
1015        {
1016            log::warn!("Order {client_order_id} found open when purging, skipping purge");
1017            return;
1018        }
1019
1020        // If order exists in cache, remove it and clean up order-specific indices
1021        if let Some(ref ord) = order {
1022            // Safe to purge
1023            self.orders.remove(&client_order_id);
1024
1025            // Remove order from venue index
1026            if let Some(venue_orders) = self.index.venue_orders.get_mut(&ord.instrument_id().venue)
1027            {
1028                venue_orders.remove(&client_order_id);
1029            }
1030
1031            // Remove venue order ID index if exists
1032            if let Some(venue_order_id) = ord.venue_order_id() {
1033                self.index.venue_order_ids.remove(&venue_order_id);
1034            }
1035
1036            // Remove from instrument orders index
1037            if let Some(instrument_orders) =
1038                self.index.instrument_orders.get_mut(&ord.instrument_id())
1039            {
1040                instrument_orders.remove(&client_order_id);
1041            }
1042
1043            // Remove from position orders index if associated with a position
1044            if let Some(position_id) = ord.position_id()
1045                && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
1046            {
1047                position_orders.remove(&client_order_id);
1048            }
1049
1050            // Remove from exec algorithm orders index if it has an exec algorithm
1051            if let Some(exec_algorithm_id) = ord.exec_algorithm_id()
1052                && let Some(exec_algorithm_orders) =
1053                    self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
1054            {
1055                exec_algorithm_orders.remove(&client_order_id);
1056            }
1057
1058            // Clean up strategy orders reverse index
1059            if let Some(strategy_orders) = self.index.strategy_orders.get_mut(&ord.strategy_id()) {
1060                strategy_orders.remove(&client_order_id);
1061                if strategy_orders.is_empty() {
1062                    self.index.strategy_orders.remove(&ord.strategy_id());
1063                }
1064            }
1065
1066            // Clean up account orders index
1067            if let Some(account_id) = ord.account_id()
1068                && let Some(account_orders) = self.index.account_orders.get_mut(&account_id)
1069            {
1070                account_orders.remove(&client_order_id);
1071                if account_orders.is_empty() {
1072                    self.index.account_orders.remove(&account_id);
1073                }
1074            }
1075
1076            // Clean up exec spawn reverse index (if this order is a spawned child)
1077            if let Some(exec_spawn_id) = ord.exec_spawn_id()
1078                && let Some(spawn_orders) = self.index.exec_spawn_orders.get_mut(&exec_spawn_id)
1079            {
1080                spawn_orders.remove(&client_order_id);
1081                if spawn_orders.is_empty() {
1082                    self.index.exec_spawn_orders.remove(&exec_spawn_id);
1083                }
1084            }
1085
1086            log::info!("Purged order {client_order_id}");
1087        } else {
1088            log::warn!("Order {client_order_id} not found when purging");
1089        }
1090
1091        // Always clean up order indices (even if order was not in cache)
1092        self.index.order_position.remove(&client_order_id);
1093        let strategy_id = self.index.order_strategy.remove(&client_order_id);
1094        self.index.order_client.remove(&client_order_id);
1095        self.index.client_order_ids.remove(&client_order_id);
1096
1097        // Clean up reverse index when order not in cache (using forward index)
1098        if let Some(strategy_id) = strategy_id
1099            && let Some(strategy_orders) = self.index.strategy_orders.get_mut(&strategy_id)
1100        {
1101            strategy_orders.remove(&client_order_id);
1102            if strategy_orders.is_empty() {
1103                self.index.strategy_orders.remove(&strategy_id);
1104            }
1105        }
1106
1107        // Remove spawn parent entry if this order was a spawn root
1108        self.index.exec_spawn_orders.remove(&client_order_id);
1109
1110        self.index.orders.remove(&client_order_id);
1111        self.index.orders_open.remove(&client_order_id);
1112        self.index.orders_closed.remove(&client_order_id);
1113        self.index.orders_emulated.remove(&client_order_id);
1114        self.index.orders_inflight.remove(&client_order_id);
1115        self.index.orders_pending_cancel.remove(&client_order_id);
1116    }
1117
1118    /// Purges the position with the `position_id` from the cache (if found).
1119    ///
1120    /// For safety, a position is prevented from being purged if it's open.
1121    pub fn purge_position(&mut self, position_id: PositionId) {
1122        // Check if position exists and is safe to purge before removing
1123        let position = self.positions.get(&position_id).cloned();
1124
1125        // Prevent purging open positions
1126        if let Some(ref pos) = position
1127            && pos.is_open()
1128        {
1129            log::warn!("Position {position_id} found open when purging, skipping purge");
1130            return;
1131        }
1132
1133        // If position exists in cache, remove it and clean up position-specific indices
1134        if let Some(ref pos) = position {
1135            self.positions.remove(&position_id);
1136
1137            // Remove from venue positions index
1138            if let Some(venue_positions) =
1139                self.index.venue_positions.get_mut(&pos.instrument_id.venue)
1140            {
1141                venue_positions.remove(&position_id);
1142            }
1143
1144            // Remove from instrument positions index
1145            if let Some(instrument_positions) =
1146                self.index.instrument_positions.get_mut(&pos.instrument_id)
1147            {
1148                instrument_positions.remove(&position_id);
1149            }
1150
1151            // Remove from strategy positions index
1152            if let Some(strategy_positions) =
1153                self.index.strategy_positions.get_mut(&pos.strategy_id)
1154            {
1155                strategy_positions.remove(&position_id);
1156            }
1157
1158            // Remove from account positions index
1159            if let Some(account_positions) = self.index.account_positions.get_mut(&pos.account_id) {
1160                account_positions.remove(&position_id);
1161                if account_positions.is_empty() {
1162                    self.index.account_positions.remove(&pos.account_id);
1163                }
1164            }
1165
1166            // Remove position ID from orders that reference it
1167            for client_order_id in pos.client_order_ids() {
1168                self.index.order_position.remove(&client_order_id);
1169            }
1170
1171            log::info!("Purged position {position_id}");
1172        } else {
1173            log::warn!("Position {position_id} not found when purging");
1174        }
1175
1176        // Always clean up position indices (even if position not in cache)
1177        self.index.position_strategy.remove(&position_id);
1178        self.index.position_orders.remove(&position_id);
1179        self.index.positions.remove(&position_id);
1180        self.index.positions_open.remove(&position_id);
1181        self.index.positions_closed.remove(&position_id);
1182
1183        // Always clean up position snapshots (even if position not in cache)
1184        self.position_snapshots.remove(&position_id);
1185    }
1186
1187    /// Purges all account state events which are outside the lookback window.
1188    ///
1189    /// Only events which are outside the lookback window will be purged.
1190    /// A value of 0 means purge all account state events.
1191    pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1192        log::debug!(
1193            "Purging account events{}",
1194            if lookback_secs > 0 {
1195                format!(" with lookback_secs={lookback_secs}")
1196            } else {
1197                String::new()
1198            }
1199        );
1200
1201        for account in self.accounts.values_mut() {
1202            let event_count = account.event_count();
1203            account.purge_account_events(ts_now, lookback_secs);
1204            let count_diff = event_count - account.event_count();
1205            if count_diff > 0 {
1206                log::info!(
1207                    "Purged {} event(s) from account {}",
1208                    count_diff,
1209                    account.id()
1210                );
1211            }
1212        }
1213    }
1214
1215    /// Clears the caches index.
1216    pub fn clear_index(&mut self) {
1217        self.index.clear();
1218        log::debug!("Cleared index");
1219    }
1220
1221    /// Resets the cache.
1222    ///
1223    /// All stateful fields are reset to their initial value.
1224    pub fn reset(&mut self) {
1225        log::debug!("Resetting cache");
1226
1227        self.general.clear();
1228        self.currencies.clear();
1229        self.instruments.clear();
1230        self.synthetics.clear();
1231        self.books.clear();
1232        self.own_books.clear();
1233        self.quotes.clear();
1234        self.trades.clear();
1235        self.mark_xrates.clear();
1236        self.mark_prices.clear();
1237        self.index_prices.clear();
1238        self.funding_rates.clear();
1239        self.bars.clear();
1240        self.accounts.clear();
1241        self.orders.clear();
1242        self.order_lists.clear();
1243        self.positions.clear();
1244        self.position_snapshots.clear();
1245        self.greeks.clear();
1246        self.yield_curves.clear();
1247
1248        #[cfg(feature = "defi")]
1249        {
1250            self.defi.pools.clear();
1251            self.defi.pool_profilers.clear();
1252        }
1253
1254        self.clear_index();
1255
1256        log::info!("Reset cache");
1257    }
1258
1259    /// Dispose of the cache which will close any underlying database adapter.
1260    ///
1261    /// If closing the database connection fails, an error is logged.
1262    pub fn dispose(&mut self) {
1263        if let Some(database) = &mut self.database
1264            && let Err(e) = database.close()
1265        {
1266            log::error!("Failed to close database during dispose: {e}");
1267        }
1268    }
1269
1270    /// Flushes the caches database which permanently removes all persisted data.
1271    ///
1272    /// If flushing the database connection fails, an error is logged.
1273    pub fn flush_db(&mut self) {
1274        if let Some(database) = &mut self.database
1275            && let Err(e) = database.flush()
1276        {
1277            log::error!("Failed to flush database: {e}");
1278        }
1279    }
1280
1281    /// Adds a raw bytes `value` to the cache under the `key`.
1282    ///
1283    /// The cache stores only raw bytes; interpretation is the caller's responsibility.
1284    ///
1285    /// # Errors
1286    ///
1287    /// Returns an error if persisting the entry to the backing database fails.
1288    pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1289        check_valid_string_ascii(key, stringify!(key))?;
1290        check_predicate_false(value.is_empty(), stringify!(value))?;
1291
1292        log::debug!("Adding general {key}");
1293        self.general.insert(key.to_string(), value.clone());
1294
1295        if let Some(database) = &mut self.database {
1296            database.add(key.to_string(), value)?;
1297        }
1298        Ok(())
1299    }
1300
1301    /// Adds an `OrderBook` to the cache.
1302    ///
1303    /// # Errors
1304    ///
1305    /// Returns an error if persisting the order book to the backing database fails.
1306    pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1307        log::debug!("Adding `OrderBook` {}", book.instrument_id);
1308
1309        if self.config.save_market_data
1310            && let Some(database) = &mut self.database
1311        {
1312            database.add_order_book(&book)?;
1313        }
1314
1315        self.books.insert(book.instrument_id, book);
1316        Ok(())
1317    }
1318
1319    /// Adds an `OwnOrderBook` to the cache.
1320    ///
1321    /// # Errors
1322    ///
1323    /// Returns an error if persisting the own order book fails.
1324    pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1325        log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1326
1327        self.own_books.insert(own_book.instrument_id, own_book);
1328        Ok(())
1329    }
1330
1331    /// Adds the `mark_price` update to the cache.
1332    ///
1333    /// # Errors
1334    ///
1335    /// Returns an error if persisting the mark price to the backing database fails.
1336    pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1337        log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1338
1339        if self.config.save_market_data {
1340            // TODO: Placeholder and return Result for consistency
1341        }
1342
1343        let mark_prices_deque = self
1344            .mark_prices
1345            .entry(mark_price.instrument_id)
1346            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1347        mark_prices_deque.push_front(mark_price);
1348        Ok(())
1349    }
1350
1351    /// Adds the `index_price` update to the cache.
1352    ///
1353    /// # Errors
1354    ///
1355    /// Returns an error if persisting the index price to the backing database fails.
1356    pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1357        log::debug!(
1358            "Adding `IndexPriceUpdate` for {}",
1359            index_price.instrument_id
1360        );
1361
1362        if self.config.save_market_data {
1363            // TODO: Placeholder and return Result for consistency
1364        }
1365
1366        let index_prices_deque = self
1367            .index_prices
1368            .entry(index_price.instrument_id)
1369            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1370        index_prices_deque.push_front(index_price);
1371        Ok(())
1372    }
1373
1374    /// Adds the `funding_rate` update to the cache.
1375    ///
1376    /// # Errors
1377    ///
1378    /// Returns an error if persisting the funding rate update to the backing database fails.
1379    pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
1380        log::debug!(
1381            "Adding `FundingRateUpdate` for {}",
1382            funding_rate.instrument_id
1383        );
1384
1385        if self.config.save_market_data {
1386            // TODO: Placeholder and return Result for consistency
1387        }
1388
1389        let funding_rates_deque = self
1390            .funding_rates
1391            .entry(funding_rate.instrument_id)
1392            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1393        funding_rates_deque.push_front(funding_rate);
1394        Ok(())
1395    }
1396
1397    /// Adds the given `funding rates` to the cache.
1398    ///
1399    /// # Errors
1400    ///
1401    /// Returns an error if persisting the trade ticks to the backing database fails.
1402    pub fn add_funding_rates(&mut self, funding_rates: &[FundingRateUpdate]) -> anyhow::Result<()> {
1403        check_slice_not_empty(funding_rates, stringify!(funding_rates))?;
1404
1405        let instrument_id = funding_rates[0].instrument_id;
1406        log::debug!(
1407            "Adding `FundingRateUpdate`[{}] {instrument_id}",
1408            funding_rates.len()
1409        );
1410
1411        if self.config.save_market_data
1412            && let Some(database) = &mut self.database
1413        {
1414            for funding_rate in funding_rates {
1415                database.add_funding_rate(funding_rate)?;
1416            }
1417        }
1418
1419        let funding_rate_deque = self
1420            .funding_rates
1421            .entry(instrument_id)
1422            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1423
1424        for funding_rate in funding_rates {
1425            funding_rate_deque.push_front(*funding_rate);
1426        }
1427        Ok(())
1428    }
1429
1430    /// Adds the `quote` tick to the cache.
1431    ///
1432    /// # Errors
1433    ///
1434    /// Returns an error if persisting the quote tick to the backing database fails.
1435    pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1436        log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1437
1438        if self.config.save_market_data
1439            && let Some(database) = &mut self.database
1440        {
1441            database.add_quote(&quote)?;
1442        }
1443
1444        let quotes_deque = self
1445            .quotes
1446            .entry(quote.instrument_id)
1447            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1448        quotes_deque.push_front(quote);
1449        Ok(())
1450    }
1451
1452    /// Adds the `quotes` to the cache.
1453    ///
1454    /// # Errors
1455    ///
1456    /// Returns an error if persisting the quote ticks to the backing database fails.
1457    pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1458        check_slice_not_empty(quotes, stringify!(quotes))?;
1459
1460        let instrument_id = quotes[0].instrument_id;
1461        log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1462
1463        if self.config.save_market_data
1464            && let Some(database) = &mut self.database
1465        {
1466            for quote in quotes {
1467                database.add_quote(quote)?;
1468            }
1469        }
1470
1471        let quotes_deque = self
1472            .quotes
1473            .entry(instrument_id)
1474            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1475
1476        for quote in quotes {
1477            quotes_deque.push_front(*quote);
1478        }
1479        Ok(())
1480    }
1481
1482    /// Adds the `trade` tick to the cache.
1483    ///
1484    /// # Errors
1485    ///
1486    /// Returns an error if persisting the trade tick to the backing database fails.
1487    pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1488        log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1489
1490        if self.config.save_market_data
1491            && let Some(database) = &mut self.database
1492        {
1493            database.add_trade(&trade)?;
1494        }
1495
1496        let trades_deque = self
1497            .trades
1498            .entry(trade.instrument_id)
1499            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1500        trades_deque.push_front(trade);
1501        Ok(())
1502    }
1503
1504    /// Adds the give `trades` to the cache.
1505    ///
1506    /// # Errors
1507    ///
1508    /// Returns an error if persisting the trade ticks to the backing database fails.
1509    pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1510        check_slice_not_empty(trades, stringify!(trades))?;
1511
1512        let instrument_id = trades[0].instrument_id;
1513        log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1514
1515        if self.config.save_market_data
1516            && let Some(database) = &mut self.database
1517        {
1518            for trade in trades {
1519                database.add_trade(trade)?;
1520            }
1521        }
1522
1523        let trades_deque = self
1524            .trades
1525            .entry(instrument_id)
1526            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1527
1528        for trade in trades {
1529            trades_deque.push_front(*trade);
1530        }
1531        Ok(())
1532    }
1533
1534    /// Adds the `bar` to the cache.
1535    ///
1536    /// # Errors
1537    ///
1538    /// Returns an error if persisting the bar to the backing database fails.
1539    pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1540        log::debug!("Adding `Bar` {}", bar.bar_type);
1541
1542        if self.config.save_market_data
1543            && let Some(database) = &mut self.database
1544        {
1545            database.add_bar(&bar)?;
1546        }
1547
1548        let bars = self
1549            .bars
1550            .entry(bar.bar_type)
1551            .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1552        bars.push_front(bar);
1553        Ok(())
1554    }
1555
1556    /// Adds the `bars` to the cache.
1557    ///
1558    /// # Errors
1559    ///
1560    /// Returns an error if persisting the bars to the backing database fails.
1561    pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1562        check_slice_not_empty(bars, stringify!(bars))?;
1563
1564        let bar_type = bars[0].bar_type;
1565        log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1566
1567        if self.config.save_market_data
1568            && let Some(database) = &mut self.database
1569        {
1570            for bar in bars {
1571                database.add_bar(bar)?;
1572            }
1573        }
1574
1575        let bars_deque = self
1576            .bars
1577            .entry(bar_type)
1578            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1579
1580        for bar in bars {
1581            bars_deque.push_front(*bar);
1582        }
1583        Ok(())
1584    }
1585
1586    /// Adds the `greeks` data to the cache.
1587    ///
1588    /// # Errors
1589    ///
1590    /// Returns an error if persisting the greeks data to the backing database fails.
1591    pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1592        log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1593
1594        if self.config.save_market_data
1595            && let Some(_database) = &mut self.database
1596        {
1597            // TODO: Implement database.add_greeks(&greeks) when database adapter is updated
1598        }
1599
1600        self.greeks.insert(greeks.instrument_id, greeks);
1601        Ok(())
1602    }
1603
1604    /// Gets the greeks data for the `instrument_id`.
1605    pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1606        self.greeks.get(instrument_id).cloned()
1607    }
1608
1609    /// Adds the `yield_curve` data to the cache.
1610    ///
1611    /// # Errors
1612    ///
1613    /// Returns an error if persisting the yield curve data to the backing database fails.
1614    pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1615        log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1616
1617        if self.config.save_market_data
1618            && let Some(_database) = &mut self.database
1619        {
1620            // TODO: Implement database.add_yield_curve(&yield_curve) when database adapter is updated
1621        }
1622
1623        self.yield_curves
1624            .insert(yield_curve.curve_name.clone(), yield_curve);
1625        Ok(())
1626    }
1627
1628    /// Gets the yield curve for the `key`.
1629    pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1630        self.yield_curves.get(key).map(|curve| {
1631            let curve_clone = curve.clone();
1632            Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
1633                as Box<dyn Fn(f64) -> f64>
1634        })
1635    }
1636
1637    /// Adds the `currency` to the cache.
1638    ///
1639    /// # Errors
1640    ///
1641    /// Returns an error if persisting the currency to the backing database fails.
1642    pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1643        if self.currencies.contains_key(&currency.code) {
1644            return Ok(());
1645        }
1646        log::debug!("Adding `Currency` {}", currency.code);
1647
1648        if let Some(database) = &mut self.database {
1649            database.add_currency(&currency)?;
1650        }
1651
1652        self.currencies.insert(currency.code, currency);
1653        Ok(())
1654    }
1655
1656    /// Adds the `instrument` to the cache.
1657    ///
1658    /// # Errors
1659    ///
1660    /// Returns an error if persisting the instrument to the backing database fails.
1661    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1662        log::debug!("Adding `Instrument` {}", instrument.id());
1663
1664        // Ensure currencies exist in cache - safe to call repeatedly as add_currency is idempotent
1665        if let Some(base_currency) = instrument.base_currency() {
1666            self.add_currency(base_currency)?;
1667        }
1668        self.add_currency(instrument.quote_currency())?;
1669        self.add_currency(instrument.settlement_currency())?;
1670
1671        if let Some(database) = &mut self.database {
1672            database.add_instrument(&instrument)?;
1673        }
1674
1675        self.instruments.insert(instrument.id(), instrument);
1676        Ok(())
1677    }
1678
1679    /// Adds the `synthetic` instrument to the cache.
1680    ///
1681    /// # Errors
1682    ///
1683    /// Returns an error if persisting the synthetic instrument to the backing database fails.
1684    pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1685        log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1686
1687        if let Some(database) = &mut self.database {
1688            database.add_synthetic(&synthetic)?;
1689        }
1690
1691        self.synthetics.insert(synthetic.id, synthetic);
1692        Ok(())
1693    }
1694
1695    /// Adds the `account` to the cache.
1696    ///
1697    /// # Errors
1698    ///
1699    /// Returns an error if persisting the account to the backing database fails.
1700    pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1701        log::debug!("Adding `Account` {}", account.id());
1702
1703        if let Some(database) = &mut self.database {
1704            database.add_account(&account)?;
1705        }
1706
1707        let account_id = account.id();
1708        self.accounts.insert(account_id, account);
1709        self.index
1710            .venue_account
1711            .insert(account_id.get_issuer(), account_id);
1712        Ok(())
1713    }
1714
1715    /// Indexes the `client_order_id` with the `venue_order_id`.
1716    ///
1717    /// The `overwrite` parameter determines whether to overwrite any existing cached identifier.
1718    ///
1719    /// # Errors
1720    ///
1721    /// Returns an error if the existing venue order ID conflicts and overwrite is false.
1722    pub fn add_venue_order_id(
1723        &mut self,
1724        client_order_id: &ClientOrderId,
1725        venue_order_id: &VenueOrderId,
1726        overwrite: bool,
1727    ) -> anyhow::Result<()> {
1728        if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
1729            && !overwrite
1730            && existing_venue_order_id != venue_order_id
1731        {
1732            anyhow::bail!(
1733                "Existing {existing_venue_order_id} for {client_order_id}
1734                    did not match the given {venue_order_id}.
1735                    If you are writing a test then try a different `venue_order_id`,
1736                    otherwise this is probably a bug."
1737            );
1738        }
1739
1740        self.index
1741            .client_order_ids
1742            .insert(*client_order_id, *venue_order_id);
1743        self.index
1744            .venue_order_ids
1745            .insert(*venue_order_id, *client_order_id);
1746
1747        Ok(())
1748    }
1749
1750    /// Adds the `order` to the cache indexed with any given identifiers.
1751    ///
1752    /// # Parameters
1753    ///
1754    /// `override_existing`: If the added order should 'override' any existing order and replace
1755    /// it in the cache. This is currently used for emulated orders which are
1756    /// being released and transformed into another type.
1757    ///
1758    /// # Errors
1759    ///
1760    /// Returns an error if not `replace_existing` and the `order.client_order_id` is already contained in the cache.
1761    pub fn add_order(
1762        &mut self,
1763        order: OrderAny,
1764        position_id: Option<PositionId>,
1765        client_id: Option<ClientId>,
1766        replace_existing: bool,
1767    ) -> anyhow::Result<()> {
1768        let instrument_id = order.instrument_id();
1769        let venue = instrument_id.venue;
1770        let client_order_id = order.client_order_id();
1771        let strategy_id = order.strategy_id();
1772        let exec_algorithm_id = order.exec_algorithm_id();
1773        let exec_spawn_id = order.exec_spawn_id();
1774
1775        if !replace_existing {
1776            check_key_not_in_map(
1777                &client_order_id,
1778                &self.orders,
1779                stringify!(client_order_id),
1780                stringify!(orders),
1781            )?;
1782        }
1783
1784        log::debug!("Adding {order:?}");
1785
1786        self.index.orders.insert(client_order_id);
1787        self.index
1788            .order_strategy
1789            .insert(client_order_id, strategy_id);
1790        self.index.strategies.insert(strategy_id);
1791
1792        // Update venue -> orders index
1793        self.index
1794            .venue_orders
1795            .entry(venue)
1796            .or_default()
1797            .insert(client_order_id);
1798
1799        // Update instrument -> orders index
1800        self.index
1801            .instrument_orders
1802            .entry(instrument_id)
1803            .or_default()
1804            .insert(client_order_id);
1805
1806        // Update strategy -> orders index
1807        self.index
1808            .strategy_orders
1809            .entry(strategy_id)
1810            .or_default()
1811            .insert(client_order_id);
1812
1813        // Update account -> orders index (if account_id known at creation)
1814        if let Some(account_id) = order.account_id() {
1815            self.index
1816                .account_orders
1817                .entry(account_id)
1818                .or_default()
1819                .insert(client_order_id);
1820        }
1821
1822        // Update exec_algorithm -> orders index
1823        if let Some(exec_algorithm_id) = exec_algorithm_id {
1824            self.index.exec_algorithms.insert(exec_algorithm_id);
1825
1826            self.index
1827                .exec_algorithm_orders
1828                .entry(exec_algorithm_id)
1829                .or_default()
1830                .insert(client_order_id);
1831        }
1832
1833        // Update exec_spawn -> orders index
1834        if let Some(exec_spawn_id) = exec_spawn_id {
1835            self.index
1836                .exec_spawn_orders
1837                .entry(exec_spawn_id)
1838                .or_default()
1839                .insert(client_order_id);
1840        }
1841
1842        // Update emulation index
1843        if let Some(emulation_trigger) = order.emulation_trigger()
1844            && emulation_trigger != TriggerType::NoTrigger
1845        {
1846            self.index.orders_emulated.insert(client_order_id);
1847        }
1848
1849        // Index position ID if provided
1850        if let Some(position_id) = position_id {
1851            self.add_position_id(
1852                &position_id,
1853                &order.instrument_id().venue,
1854                &client_order_id,
1855                &strategy_id,
1856            )?;
1857        }
1858
1859        // Index client ID if provided
1860        if let Some(client_id) = client_id {
1861            self.index.order_client.insert(client_order_id, client_id);
1862            log::debug!("Indexed {client_id:?}");
1863        }
1864
1865        if let Some(database) = &mut self.database {
1866            database.add_order(&order, client_id)?;
1867            // TODO: Implement
1868            // if self.config.snapshot_orders {
1869            //     database.snapshot_order_state(order)?;
1870            // }
1871        }
1872
1873        self.orders.insert(client_order_id, order);
1874
1875        Ok(())
1876    }
1877
1878    /// Adds the `order_list` to the cache.
1879    ///
1880    /// # Errors
1881    ///
1882    /// Returns an error if the order list ID is already contained in the cache.
1883    pub fn add_order_list(&mut self, order_list: OrderList) -> anyhow::Result<()> {
1884        let order_list_id = order_list.id;
1885        check_key_not_in_map(
1886            &order_list_id,
1887            &self.order_lists,
1888            stringify!(order_list_id),
1889            stringify!(order_lists),
1890        )?;
1891
1892        log::debug!("Adding {order_list:?}");
1893        self.order_lists.insert(order_list_id, order_list);
1894        Ok(())
1895    }
1896
1897    /// Indexes the `position_id` with the other given IDs.
1898    ///
1899    /// # Errors
1900    ///
1901    /// Returns an error if indexing position ID in the backing database fails.
1902    pub fn add_position_id(
1903        &mut self,
1904        position_id: &PositionId,
1905        venue: &Venue,
1906        client_order_id: &ClientOrderId,
1907        strategy_id: &StrategyId,
1908    ) -> anyhow::Result<()> {
1909        self.index
1910            .order_position
1911            .insert(*client_order_id, *position_id);
1912
1913        // Index: ClientOrderId -> PositionId
1914        if let Some(database) = &mut self.database {
1915            database.index_order_position(*client_order_id, *position_id)?;
1916        }
1917
1918        // Index: PositionId -> StrategyId
1919        self.index
1920            .position_strategy
1921            .insert(*position_id, *strategy_id);
1922
1923        // Index: PositionId -> set[ClientOrderId]
1924        self.index
1925            .position_orders
1926            .entry(*position_id)
1927            .or_default()
1928            .insert(*client_order_id);
1929
1930        // Index: StrategyId -> set[PositionId]
1931        self.index
1932            .strategy_positions
1933            .entry(*strategy_id)
1934            .or_default()
1935            .insert(*position_id);
1936
1937        // Index: Venue -> set[PositionId]
1938        self.index
1939            .venue_positions
1940            .entry(*venue)
1941            .or_default()
1942            .insert(*position_id);
1943
1944        Ok(())
1945    }
1946
1947    /// Adds the `position` to the cache.
1948    ///
1949    /// # Errors
1950    ///
1951    /// Returns an error if persisting the position to the backing database fails.
1952    pub fn add_position(&mut self, position: Position, _oms_type: OmsType) -> anyhow::Result<()> {
1953        self.positions.insert(position.id, position.clone());
1954        self.index.positions.insert(position.id);
1955        self.index.positions_open.insert(position.id);
1956        self.index.positions_closed.remove(&position.id); // Cleanup for NETTING reopen
1957
1958        log::debug!("Adding {position}");
1959
1960        self.add_position_id(
1961            &position.id,
1962            &position.instrument_id.venue,
1963            &position.opening_order_id,
1964            &position.strategy_id,
1965        )?;
1966
1967        let venue = position.instrument_id.venue;
1968        let venue_positions = self.index.venue_positions.entry(venue).or_default();
1969        venue_positions.insert(position.id);
1970
1971        // Index: InstrumentId -> AHashSet
1972        let instrument_id = position.instrument_id;
1973        let instrument_positions = self
1974            .index
1975            .instrument_positions
1976            .entry(instrument_id)
1977            .or_default();
1978        instrument_positions.insert(position.id);
1979
1980        // Index: AccountId -> AHashSet<PositionId>
1981        self.index
1982            .account_positions
1983            .entry(position.account_id)
1984            .or_default()
1985            .insert(position.id);
1986
1987        if let Some(database) = &mut self.database {
1988            database.add_position(&position)?;
1989            // TODO: Implement position snapshots
1990            // if self.snapshot_positions {
1991            //     database.snapshot_position_state(
1992            //         position,
1993            //         position.ts_last,
1994            //         self.calculate_unrealized_pnl(&position),
1995            //     )?;
1996            // }
1997        }
1998
1999        Ok(())
2000    }
2001
2002    /// Updates the `account` in the cache.
2003    ///
2004    /// # Errors
2005    ///
2006    /// Returns an error if updating the account in the database fails.
2007    pub fn update_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
2008        let account_id = account.id();
2009        self.accounts.insert(account_id, account.clone());
2010
2011        if let Some(database) = &mut self.database {
2012            database.update_account(&account)?;
2013        }
2014        Ok(())
2015    }
2016
2017    /// Updates the `order` in the cache.
2018    ///
2019    /// # Errors
2020    ///
2021    /// Returns an error if updating the order in the database fails.
2022    pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
2023        let client_order_id = order.client_order_id();
2024
2025        // Update venue order ID
2026        if let Some(venue_order_id) = order.venue_order_id() {
2027            // If the order is being modified then we allow a changing `VenueOrderId` to accommodate
2028            // venues which use a cancel+replace update strategy.
2029            if !self.index.venue_order_ids.contains_key(&venue_order_id) {
2030                // TODO: If the last event was `OrderUpdated` then overwrite should be true
2031                self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
2032            }
2033        }
2034
2035        // Update in-flight state
2036        if order.is_inflight() {
2037            self.index.orders_inflight.insert(client_order_id);
2038        } else {
2039            self.index.orders_inflight.remove(&client_order_id);
2040        }
2041
2042        // Update open/closed state
2043        if order.is_open() {
2044            self.index.orders_closed.remove(&client_order_id);
2045            self.index.orders_open.insert(client_order_id);
2046        } else if order.is_closed() {
2047            self.index.orders_open.remove(&client_order_id);
2048            self.index.orders_pending_cancel.remove(&client_order_id);
2049            self.index.orders_closed.insert(client_order_id);
2050        }
2051
2052        // Update emulation index
2053        if let Some(emulation_trigger) = order.emulation_trigger()
2054            && emulation_trigger != TriggerType::NoTrigger
2055            && !order.is_closed()
2056        {
2057            self.index.orders_emulated.insert(client_order_id);
2058        } else {
2059            self.index.orders_emulated.remove(&client_order_id);
2060        }
2061
2062        // Update account orders index when account_id becomes available
2063        if let Some(account_id) = order.account_id() {
2064            self.index
2065                .account_orders
2066                .entry(account_id)
2067                .or_default()
2068                .insert(client_order_id);
2069        }
2070
2071        // Update own book
2072        if self.own_order_book(&order.instrument_id()).is_some()
2073            && should_handle_own_book_order(order)
2074        {
2075            self.update_own_order_book(order);
2076        }
2077
2078        if let Some(database) = &mut self.database {
2079            database.update_order(order.last_event())?;
2080            // TODO: Implement order snapshots
2081            // if self.snapshot_orders {
2082            //     database.snapshot_order_state(order)?;
2083            // }
2084        }
2085
2086        // update the order in the cache
2087        self.orders.insert(client_order_id, order.clone());
2088
2089        Ok(())
2090    }
2091
2092    /// Updates the `order` as pending cancel locally.
2093    pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
2094        self.index
2095            .orders_pending_cancel
2096            .insert(order.client_order_id());
2097    }
2098
2099    /// Updates the `position` in the cache.
2100    ///
2101    /// # Errors
2102    ///
2103    /// Returns an error if updating the position in the database fails.
2104    pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
2105        // Update open/closed state
2106
2107        if position.is_open() {
2108            self.index.positions_open.insert(position.id);
2109            self.index.positions_closed.remove(&position.id);
2110        } else {
2111            self.index.positions_closed.insert(position.id);
2112            self.index.positions_open.remove(&position.id);
2113        }
2114
2115        if let Some(database) = &mut self.database {
2116            database.update_position(position)?;
2117            // TODO: Implement order snapshots
2118            // if self.snapshot_orders {
2119            //     database.snapshot_order_state(order)?;
2120            // }
2121        }
2122
2123        self.positions.insert(position.id, position.clone());
2124
2125        Ok(())
2126    }
2127
2128    /// Creates a snapshot of the `position` by cloning it, assigning a new ID,
2129    /// serializing it, and storing it in the position snapshots.
2130    ///
2131    /// # Errors
2132    ///
2133    /// Returns an error if serializing or storing the position snapshot fails.
2134    pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
2135        let position_id = position.id;
2136
2137        let mut copied_position = position.clone();
2138        let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
2139        copied_position.id = PositionId::new(new_id);
2140
2141        // Serialize the position (TODO: temporarily just to JSON to remove a dependency)
2142        let position_serialized = serde_json::to_vec(&copied_position)?;
2143
2144        let snapshots: Option<&Bytes> = self.position_snapshots.get(&position_id);
2145        let new_snapshots = match snapshots {
2146            Some(existing_snapshots) => {
2147                let mut combined = existing_snapshots.to_vec();
2148                combined.extend(position_serialized);
2149                Bytes::from(combined)
2150            }
2151            None => Bytes::from(position_serialized),
2152        };
2153        self.position_snapshots.insert(position_id, new_snapshots);
2154
2155        log::debug!("Snapshot {copied_position}");
2156        Ok(())
2157    }
2158
2159    /// Creates a snapshot of the `position` state in the database.
2160    ///
2161    /// # Errors
2162    ///
2163    /// Returns an error if snapshotting the position state fails.
2164    pub fn snapshot_position_state(
2165        &mut self,
2166        position: &Position,
2167        // ts_snapshot: u64,
2168        // unrealized_pnl: Option<Money>,
2169        open_only: Option<bool>,
2170    ) -> anyhow::Result<()> {
2171        let open_only = open_only.unwrap_or(true);
2172
2173        if open_only && !position.is_open() {
2174            return Ok(());
2175        }
2176
2177        if let Some(database) = &mut self.database {
2178            database.snapshot_position_state(position).map_err(|e| {
2179                log::error!(
2180                    "Failed to snapshot position state for {}: {e:?}",
2181                    position.id
2182                );
2183                e
2184            })?;
2185        } else {
2186            log::warn!(
2187                "Cannot snapshot position state for {} (no database configured)",
2188                position.id
2189            );
2190        }
2191
2192        // Ok(())
2193        todo!()
2194    }
2195
2196    /// Gets the OMS type for the `position_id`.
2197    #[must_use]
2198    pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
2199        // Get OMS type from the index
2200        if self.index.position_strategy.contains_key(position_id) {
2201            // For now, we'll default to NETTING
2202            // TODO: Store and retrieve actual OMS type per position
2203            Some(OmsType::Netting)
2204        } else {
2205            None
2206        }
2207    }
2208
2209    /// Gets position snapshot bytes for the `position_id`.
2210    #[must_use]
2211    pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<u8>> {
2212        self.position_snapshots.get(position_id).map(|b| b.to_vec())
2213    }
2214
2215    /// Gets position snapshot IDs for the `instrument_id`.
2216    #[must_use]
2217    pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> AHashSet<PositionId> {
2218        // Get snapshot position IDs that match the instrument
2219        let mut result = AHashSet::new();
2220        for (position_id, _) in &self.position_snapshots {
2221            // Check if this position is for the requested instrument
2222            if let Some(position) = self.positions.get(position_id)
2223                && position.instrument_id == *instrument_id
2224            {
2225                result.insert(*position_id);
2226            }
2227        }
2228        result
2229    }
2230
2231    /// Snapshots the `order` state in the database.
2232    ///
2233    /// # Errors
2234    ///
2235    /// Returns an error if snapshotting the order state fails.
2236    pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
2237        let database = if let Some(database) = &self.database {
2238            database
2239        } else {
2240            log::warn!(
2241                "Cannot snapshot order state for {} (no database configured)",
2242                order.client_order_id()
2243            );
2244            return Ok(());
2245        };
2246
2247        database.snapshot_order_state(order)
2248    }
2249
2250    // -- IDENTIFIER QUERIES ----------------------------------------------------------------------
2251
2252    fn build_order_query_filter_set(
2253        &self,
2254        venue: Option<&Venue>,
2255        instrument_id: Option<&InstrumentId>,
2256        strategy_id: Option<&StrategyId>,
2257        account_id: Option<&AccountId>,
2258    ) -> Option<AHashSet<ClientOrderId>> {
2259        let mut query: Option<AHashSet<ClientOrderId>> = None;
2260
2261        if let Some(venue) = venue {
2262            query = Some(
2263                self.index
2264                    .venue_orders
2265                    .get(venue)
2266                    .cloned()
2267                    .unwrap_or_default(),
2268            );
2269        }
2270
2271        if let Some(instrument_id) = instrument_id {
2272            let instrument_orders = self
2273                .index
2274                .instrument_orders
2275                .get(instrument_id)
2276                .cloned()
2277                .unwrap_or_default();
2278
2279            if let Some(existing_query) = &mut query {
2280                *existing_query = existing_query
2281                    .intersection(&instrument_orders)
2282                    .copied()
2283                    .collect();
2284            } else {
2285                query = Some(instrument_orders);
2286            }
2287        }
2288
2289        if let Some(strategy_id) = strategy_id {
2290            let strategy_orders = self
2291                .index
2292                .strategy_orders
2293                .get(strategy_id)
2294                .cloned()
2295                .unwrap_or_default();
2296
2297            if let Some(existing_query) = &mut query {
2298                *existing_query = existing_query
2299                    .intersection(&strategy_orders)
2300                    .copied()
2301                    .collect();
2302            } else {
2303                query = Some(strategy_orders);
2304            }
2305        }
2306
2307        if let Some(account_id) = account_id {
2308            let account_orders = self
2309                .index
2310                .account_orders
2311                .get(account_id)
2312                .cloned()
2313                .unwrap_or_default();
2314
2315            if let Some(existing_query) = &mut query {
2316                *existing_query = existing_query
2317                    .intersection(&account_orders)
2318                    .copied()
2319                    .collect();
2320            } else {
2321                query = Some(account_orders);
2322            }
2323        }
2324
2325        query
2326    }
2327
2328    fn build_position_query_filter_set(
2329        &self,
2330        venue: Option<&Venue>,
2331        instrument_id: Option<&InstrumentId>,
2332        strategy_id: Option<&StrategyId>,
2333        account_id: Option<&AccountId>,
2334    ) -> Option<AHashSet<PositionId>> {
2335        let mut query: Option<AHashSet<PositionId>> = None;
2336
2337        if let Some(venue) = venue {
2338            query = Some(
2339                self.index
2340                    .venue_positions
2341                    .get(venue)
2342                    .cloned()
2343                    .unwrap_or_default(),
2344            );
2345        }
2346
2347        if let Some(instrument_id) = instrument_id {
2348            let instrument_positions = self
2349                .index
2350                .instrument_positions
2351                .get(instrument_id)
2352                .cloned()
2353                .unwrap_or_default();
2354
2355            if let Some(existing_query) = query {
2356                query = Some(
2357                    existing_query
2358                        .intersection(&instrument_positions)
2359                        .copied()
2360                        .collect(),
2361                );
2362            } else {
2363                query = Some(instrument_positions);
2364            }
2365        }
2366
2367        if let Some(strategy_id) = strategy_id {
2368            let strategy_positions = self
2369                .index
2370                .strategy_positions
2371                .get(strategy_id)
2372                .cloned()
2373                .unwrap_or_default();
2374
2375            if let Some(existing_query) = query {
2376                query = Some(
2377                    existing_query
2378                        .intersection(&strategy_positions)
2379                        .copied()
2380                        .collect(),
2381                );
2382            } else {
2383                query = Some(strategy_positions);
2384            }
2385        }
2386
2387        if let Some(account_id) = account_id {
2388            let account_positions = self
2389                .index
2390                .account_positions
2391                .get(account_id)
2392                .cloned()
2393                .unwrap_or_default();
2394
2395            if let Some(existing_query) = query {
2396                query = Some(
2397                    existing_query
2398                        .intersection(&account_positions)
2399                        .copied()
2400                        .collect(),
2401                );
2402            } else {
2403                query = Some(account_positions);
2404            }
2405        }
2406
2407        query
2408    }
2409
2410    /// Retrieves orders corresponding to the `client_order_ids`, optionally filtering by `side`.
2411    ///
2412    /// # Panics
2413    ///
2414    /// Panics if any `client_order_id` in the set is not found in the cache.
2415    fn get_orders_for_ids(
2416        &self,
2417        client_order_ids: &AHashSet<ClientOrderId>,
2418        side: Option<OrderSide>,
2419    ) -> Vec<&OrderAny> {
2420        let side = side.unwrap_or(OrderSide::NoOrderSide);
2421        let mut orders = Vec::new();
2422
2423        for client_order_id in client_order_ids {
2424            let order = self
2425                .orders
2426                .get(client_order_id)
2427                .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
2428
2429            if side == OrderSide::NoOrderSide || side == order.order_side() {
2430                orders.push(order);
2431            }
2432        }
2433
2434        orders
2435    }
2436
2437    /// Retrieves positions corresponding to the `position_ids`, optionally filtering by `side`.
2438    ///
2439    /// # Panics
2440    ///
2441    /// Panics if any `position_id` in the set is not found in the cache.
2442    fn get_positions_for_ids(
2443        &self,
2444        position_ids: &AHashSet<PositionId>,
2445        side: Option<PositionSide>,
2446    ) -> Vec<&Position> {
2447        let side = side.unwrap_or(PositionSide::NoPositionSide);
2448        let mut positions = Vec::new();
2449
2450        for position_id in position_ids {
2451            let position = self
2452                .positions
2453                .get(position_id)
2454                .unwrap_or_else(|| panic!("Position {position_id} not found"));
2455
2456            if side == PositionSide::NoPositionSide || side == position.side {
2457                positions.push(position);
2458            }
2459        }
2460
2461        positions
2462    }
2463
2464    /// Returns the `ClientOrderId`s of all orders.
2465    #[must_use]
2466    pub fn client_order_ids(
2467        &self,
2468        venue: Option<&Venue>,
2469        instrument_id: Option<&InstrumentId>,
2470        strategy_id: Option<&StrategyId>,
2471        account_id: Option<&AccountId>,
2472    ) -> AHashSet<ClientOrderId> {
2473        let query =
2474            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2475        match query {
2476            Some(query) => self.index.orders.intersection(&query).copied().collect(),
2477            None => self.index.orders.clone(),
2478        }
2479    }
2480
2481    /// Returns the `ClientOrderId`s of all open orders.
2482    #[must_use]
2483    pub fn client_order_ids_open(
2484        &self,
2485        venue: Option<&Venue>,
2486        instrument_id: Option<&InstrumentId>,
2487        strategy_id: Option<&StrategyId>,
2488        account_id: Option<&AccountId>,
2489    ) -> AHashSet<ClientOrderId> {
2490        let query =
2491            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2492        match query {
2493            Some(query) => self
2494                .index
2495                .orders_open
2496                .intersection(&query)
2497                .copied()
2498                .collect(),
2499            None => self.index.orders_open.clone(),
2500        }
2501    }
2502
2503    /// Returns the `ClientOrderId`s of all closed orders.
2504    #[must_use]
2505    pub fn client_order_ids_closed(
2506        &self,
2507        venue: Option<&Venue>,
2508        instrument_id: Option<&InstrumentId>,
2509        strategy_id: Option<&StrategyId>,
2510        account_id: Option<&AccountId>,
2511    ) -> AHashSet<ClientOrderId> {
2512        let query =
2513            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2514        match query {
2515            Some(query) => self
2516                .index
2517                .orders_closed
2518                .intersection(&query)
2519                .copied()
2520                .collect(),
2521            None => self.index.orders_closed.clone(),
2522        }
2523    }
2524
2525    /// Returns the `ClientOrderId`s of all emulated orders.
2526    #[must_use]
2527    pub fn client_order_ids_emulated(
2528        &self,
2529        venue: Option<&Venue>,
2530        instrument_id: Option<&InstrumentId>,
2531        strategy_id: Option<&StrategyId>,
2532        account_id: Option<&AccountId>,
2533    ) -> AHashSet<ClientOrderId> {
2534        let query =
2535            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2536        match query {
2537            Some(query) => self
2538                .index
2539                .orders_emulated
2540                .intersection(&query)
2541                .copied()
2542                .collect(),
2543            None => self.index.orders_emulated.clone(),
2544        }
2545    }
2546
2547    /// Returns the `ClientOrderId`s of all in-flight orders.
2548    #[must_use]
2549    pub fn client_order_ids_inflight(
2550        &self,
2551        venue: Option<&Venue>,
2552        instrument_id: Option<&InstrumentId>,
2553        strategy_id: Option<&StrategyId>,
2554        account_id: Option<&AccountId>,
2555    ) -> AHashSet<ClientOrderId> {
2556        let query =
2557            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2558        match query {
2559            Some(query) => self
2560                .index
2561                .orders_inflight
2562                .intersection(&query)
2563                .copied()
2564                .collect(),
2565            None => self.index.orders_inflight.clone(),
2566        }
2567    }
2568
2569    /// Returns `PositionId`s of all positions.
2570    #[must_use]
2571    pub fn position_ids(
2572        &self,
2573        venue: Option<&Venue>,
2574        instrument_id: Option<&InstrumentId>,
2575        strategy_id: Option<&StrategyId>,
2576        account_id: Option<&AccountId>,
2577    ) -> AHashSet<PositionId> {
2578        let query =
2579            self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2580        match query {
2581            Some(query) => self.index.positions.intersection(&query).copied().collect(),
2582            None => self.index.positions.clone(),
2583        }
2584    }
2585
2586    /// Returns the `PositionId`s of all open positions.
2587    #[must_use]
2588    pub fn position_open_ids(
2589        &self,
2590        venue: Option<&Venue>,
2591        instrument_id: Option<&InstrumentId>,
2592        strategy_id: Option<&StrategyId>,
2593        account_id: Option<&AccountId>,
2594    ) -> AHashSet<PositionId> {
2595        let query =
2596            self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2597        match query {
2598            Some(query) => self
2599                .index
2600                .positions_open
2601                .intersection(&query)
2602                .copied()
2603                .collect(),
2604            None => self.index.positions_open.clone(),
2605        }
2606    }
2607
2608    /// Returns the `PositionId`s of all closed positions.
2609    #[must_use]
2610    pub fn position_closed_ids(
2611        &self,
2612        venue: Option<&Venue>,
2613        instrument_id: Option<&InstrumentId>,
2614        strategy_id: Option<&StrategyId>,
2615        account_id: Option<&AccountId>,
2616    ) -> AHashSet<PositionId> {
2617        let query =
2618            self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2619        match query {
2620            Some(query) => self
2621                .index
2622                .positions_closed
2623                .intersection(&query)
2624                .copied()
2625                .collect(),
2626            None => self.index.positions_closed.clone(),
2627        }
2628    }
2629
2630    /// Returns the `ComponentId`s of all actors.
2631    #[must_use]
2632    pub fn actor_ids(&self) -> AHashSet<ComponentId> {
2633        self.index.actors.clone()
2634    }
2635
2636    /// Returns the `StrategyId`s of all strategies.
2637    #[must_use]
2638    pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
2639        self.index.strategies.clone()
2640    }
2641
2642    /// Returns the `ExecAlgorithmId`s of all execution algorithms.
2643    #[must_use]
2644    pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
2645        self.index.exec_algorithms.clone()
2646    }
2647
2648    // -- ORDER QUERIES ---------------------------------------------------------------------------
2649
2650    /// Gets a reference to the order with the `client_order_id` (if found).
2651    #[must_use]
2652    pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2653        self.orders.get(client_order_id)
2654    }
2655
2656    /// Gets cloned orders for the given `client_order_ids`, logging an error for any missing.
2657    #[must_use]
2658    pub fn orders_for_ids(
2659        &self,
2660        client_order_ids: &[ClientOrderId],
2661        context: &dyn Display,
2662    ) -> Vec<OrderAny> {
2663        let mut orders = Vec::with_capacity(client_order_ids.len());
2664        for id in client_order_ids {
2665            match self.orders.get(id) {
2666                Some(order) => orders.push(order.clone()),
2667                None => log::error!("Order {id} not found in cache for {context}"),
2668            }
2669        }
2670        orders
2671    }
2672
2673    /// Gets a reference to the order with the `client_order_id` (if found).
2674    #[must_use]
2675    pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2676        self.orders.get_mut(client_order_id)
2677    }
2678
2679    /// Gets a reference to the client order ID for the `venue_order_id` (if found).
2680    #[must_use]
2681    pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2682        self.index.venue_order_ids.get(venue_order_id)
2683    }
2684
2685    /// Gets a reference to the venue order ID for the `client_order_id` (if found).
2686    #[must_use]
2687    pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2688        self.index.client_order_ids.get(client_order_id)
2689    }
2690
2691    /// Gets a reference to the client ID indexed for then `client_order_id` (if found).
2692    #[must_use]
2693    pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2694        self.index.order_client.get(client_order_id)
2695    }
2696
2697    /// Returns references to all orders matching the optional filter parameters.
2698    #[must_use]
2699    pub fn orders(
2700        &self,
2701        venue: Option<&Venue>,
2702        instrument_id: Option<&InstrumentId>,
2703        strategy_id: Option<&StrategyId>,
2704        account_id: Option<&AccountId>,
2705        side: Option<OrderSide>,
2706    ) -> Vec<&OrderAny> {
2707        let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id, account_id);
2708        self.get_orders_for_ids(&client_order_ids, side)
2709    }
2710
2711    /// Returns references to all open orders matching the optional filter parameters.
2712    #[must_use]
2713    pub fn orders_open(
2714        &self,
2715        venue: Option<&Venue>,
2716        instrument_id: Option<&InstrumentId>,
2717        strategy_id: Option<&StrategyId>,
2718        account_id: Option<&AccountId>,
2719        side: Option<OrderSide>,
2720    ) -> Vec<&OrderAny> {
2721        let client_order_ids =
2722            self.client_order_ids_open(venue, instrument_id, strategy_id, account_id);
2723        self.get_orders_for_ids(&client_order_ids, side)
2724    }
2725
2726    /// Returns references to all closed orders matching the optional filter parameters.
2727    #[must_use]
2728    pub fn orders_closed(
2729        &self,
2730        venue: Option<&Venue>,
2731        instrument_id: Option<&InstrumentId>,
2732        strategy_id: Option<&StrategyId>,
2733        account_id: Option<&AccountId>,
2734        side: Option<OrderSide>,
2735    ) -> Vec<&OrderAny> {
2736        let client_order_ids =
2737            self.client_order_ids_closed(venue, instrument_id, strategy_id, account_id);
2738        self.get_orders_for_ids(&client_order_ids, side)
2739    }
2740
2741    /// Returns references to all emulated orders matching the optional filter parameters.
2742    #[must_use]
2743    pub fn orders_emulated(
2744        &self,
2745        venue: Option<&Venue>,
2746        instrument_id: Option<&InstrumentId>,
2747        strategy_id: Option<&StrategyId>,
2748        account_id: Option<&AccountId>,
2749        side: Option<OrderSide>,
2750    ) -> Vec<&OrderAny> {
2751        let client_order_ids =
2752            self.client_order_ids_emulated(venue, instrument_id, strategy_id, account_id);
2753        self.get_orders_for_ids(&client_order_ids, side)
2754    }
2755
2756    /// Returns references to all in-flight orders matching the optional filter parameters.
2757    #[must_use]
2758    pub fn orders_inflight(
2759        &self,
2760        venue: Option<&Venue>,
2761        instrument_id: Option<&InstrumentId>,
2762        strategy_id: Option<&StrategyId>,
2763        account_id: Option<&AccountId>,
2764        side: Option<OrderSide>,
2765    ) -> Vec<&OrderAny> {
2766        let client_order_ids =
2767            self.client_order_ids_inflight(venue, instrument_id, strategy_id, account_id);
2768        self.get_orders_for_ids(&client_order_ids, side)
2769    }
2770
2771    /// Returns references to all orders for the `position_id`.
2772    #[must_use]
2773    pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
2774        let client_order_ids = self.index.position_orders.get(position_id);
2775        match client_order_ids {
2776            Some(client_order_ids) => {
2777                self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
2778            }
2779            None => Vec::new(),
2780        }
2781    }
2782
2783    /// Returns whether an order with the `client_order_id` exists.
2784    #[must_use]
2785    pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
2786        self.index.orders.contains(client_order_id)
2787    }
2788
2789    /// Returns whether an order with the `client_order_id` is open.
2790    #[must_use]
2791    pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
2792        self.index.orders_open.contains(client_order_id)
2793    }
2794
2795    /// Returns whether an order with the `client_order_id` is closed.
2796    #[must_use]
2797    pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
2798        self.index.orders_closed.contains(client_order_id)
2799    }
2800
2801    /// Returns whether an order with the `client_order_id` is emulated.
2802    #[must_use]
2803    pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
2804        self.index.orders_emulated.contains(client_order_id)
2805    }
2806
2807    /// Returns whether an order with the `client_order_id` is in-flight.
2808    #[must_use]
2809    pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
2810        self.index.orders_inflight.contains(client_order_id)
2811    }
2812
2813    /// Returns whether an order with the `client_order_id` is `PENDING_CANCEL` locally.
2814    #[must_use]
2815    pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
2816        self.index.orders_pending_cancel.contains(client_order_id)
2817    }
2818
2819    /// Returns the count of all open orders.
2820    #[must_use]
2821    pub fn orders_open_count(
2822        &self,
2823        venue: Option<&Venue>,
2824        instrument_id: Option<&InstrumentId>,
2825        strategy_id: Option<&StrategyId>,
2826        account_id: Option<&AccountId>,
2827        side: Option<OrderSide>,
2828    ) -> usize {
2829        self.orders_open(venue, instrument_id, strategy_id, account_id, side)
2830            .len()
2831    }
2832
2833    /// Returns the count of all closed orders.
2834    #[must_use]
2835    pub fn orders_closed_count(
2836        &self,
2837        venue: Option<&Venue>,
2838        instrument_id: Option<&InstrumentId>,
2839        strategy_id: Option<&StrategyId>,
2840        account_id: Option<&AccountId>,
2841        side: Option<OrderSide>,
2842    ) -> usize {
2843        self.orders_closed(venue, instrument_id, strategy_id, account_id, side)
2844            .len()
2845    }
2846
2847    /// Returns the count of all emulated orders.
2848    #[must_use]
2849    pub fn orders_emulated_count(
2850        &self,
2851        venue: Option<&Venue>,
2852        instrument_id: Option<&InstrumentId>,
2853        strategy_id: Option<&StrategyId>,
2854        account_id: Option<&AccountId>,
2855        side: Option<OrderSide>,
2856    ) -> usize {
2857        self.orders_emulated(venue, instrument_id, strategy_id, account_id, side)
2858            .len()
2859    }
2860
2861    /// Returns the count of all in-flight orders.
2862    #[must_use]
2863    pub fn orders_inflight_count(
2864        &self,
2865        venue: Option<&Venue>,
2866        instrument_id: Option<&InstrumentId>,
2867        strategy_id: Option<&StrategyId>,
2868        account_id: Option<&AccountId>,
2869        side: Option<OrderSide>,
2870    ) -> usize {
2871        self.orders_inflight(venue, instrument_id, strategy_id, account_id, side)
2872            .len()
2873    }
2874
2875    /// Returns the count of all orders.
2876    #[must_use]
2877    pub fn orders_total_count(
2878        &self,
2879        venue: Option<&Venue>,
2880        instrument_id: Option<&InstrumentId>,
2881        strategy_id: Option<&StrategyId>,
2882        account_id: Option<&AccountId>,
2883        side: Option<OrderSide>,
2884    ) -> usize {
2885        self.orders(venue, instrument_id, strategy_id, account_id, side)
2886            .len()
2887    }
2888
2889    /// Returns the order list for the `order_list_id`.
2890    #[must_use]
2891    pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
2892        self.order_lists.get(order_list_id)
2893    }
2894
2895    /// Returns all order lists matching the optional filter parameters.
2896    #[must_use]
2897    pub fn order_lists(
2898        &self,
2899        venue: Option<&Venue>,
2900        instrument_id: Option<&InstrumentId>,
2901        strategy_id: Option<&StrategyId>,
2902        account_id: Option<&AccountId>,
2903    ) -> Vec<&OrderList> {
2904        let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
2905
2906        if let Some(venue) = venue {
2907            order_lists.retain(|ol| &ol.instrument_id.venue == venue);
2908        }
2909
2910        if let Some(instrument_id) = instrument_id {
2911            order_lists.retain(|ol| &ol.instrument_id == instrument_id);
2912        }
2913
2914        if let Some(strategy_id) = strategy_id {
2915            order_lists.retain(|ol| &ol.strategy_id == strategy_id);
2916        }
2917
2918        if let Some(account_id) = account_id {
2919            order_lists.retain(|ol| {
2920                ol.client_order_ids.iter().any(|client_order_id| {
2921                    self.orders
2922                        .get(client_order_id)
2923                        .is_some_and(|order| order.account_id().as_ref() == Some(account_id))
2924                })
2925            });
2926        }
2927
2928        order_lists
2929    }
2930
2931    /// Returns whether an order list with the `order_list_id` exists.
2932    #[must_use]
2933    pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
2934        self.order_lists.contains_key(order_list_id)
2935    }
2936
2937    // -- EXEC ALGORITHM QUERIES ------------------------------------------------------------------
2938
2939    /// Returns references to all orders associated with the `exec_algorithm_id` matching the
2940    /// optional filter parameters.
2941    #[must_use]
2942    pub fn orders_for_exec_algorithm(
2943        &self,
2944        exec_algorithm_id: &ExecAlgorithmId,
2945        venue: Option<&Venue>,
2946        instrument_id: Option<&InstrumentId>,
2947        strategy_id: Option<&StrategyId>,
2948        account_id: Option<&AccountId>,
2949        side: Option<OrderSide>,
2950    ) -> Vec<&OrderAny> {
2951        let query =
2952            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2953        let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
2954
2955        if let Some(query) = query
2956            && let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids
2957        {
2958            let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
2959        }
2960
2961        if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
2962            self.get_orders_for_ids(exec_algorithm_order_ids, side)
2963        } else {
2964            Vec::new()
2965        }
2966    }
2967
2968    /// Returns references to all orders with the `exec_spawn_id`.
2969    #[must_use]
2970    pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
2971        self.get_orders_for_ids(
2972            self.index
2973                .exec_spawn_orders
2974                .get(exec_spawn_id)
2975                .unwrap_or(&AHashSet::new()),
2976            None,
2977        )
2978    }
2979
2980    /// Returns the total order quantity for the `exec_spawn_id`.
2981    #[must_use]
2982    pub fn exec_spawn_total_quantity(
2983        &self,
2984        exec_spawn_id: &ClientOrderId,
2985        active_only: bool,
2986    ) -> Option<Quantity> {
2987        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
2988
2989        let mut total_quantity: Option<Quantity> = None;
2990
2991        for spawn_order in exec_spawn_orders {
2992            if active_only && spawn_order.is_closed() {
2993                continue;
2994            }
2995
2996            match total_quantity.as_mut() {
2997                Some(total) => *total = *total + spawn_order.quantity(),
2998                None => total_quantity = Some(spawn_order.quantity()),
2999            }
3000        }
3001
3002        total_quantity
3003    }
3004
3005    /// Returns the total filled quantity for all orders with the `exec_spawn_id`.
3006    #[must_use]
3007    pub fn exec_spawn_total_filled_qty(
3008        &self,
3009        exec_spawn_id: &ClientOrderId,
3010        active_only: bool,
3011    ) -> Option<Quantity> {
3012        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
3013
3014        let mut total_quantity: Option<Quantity> = None;
3015
3016        for spawn_order in exec_spawn_orders {
3017            if active_only && spawn_order.is_closed() {
3018                continue;
3019            }
3020
3021            match total_quantity.as_mut() {
3022                Some(total) => *total = *total + spawn_order.filled_qty(),
3023                None => total_quantity = Some(spawn_order.filled_qty()),
3024            }
3025        }
3026
3027        total_quantity
3028    }
3029
3030    /// Returns the total leaves quantity for all orders with the `exec_spawn_id`.
3031    #[must_use]
3032    pub fn exec_spawn_total_leaves_qty(
3033        &self,
3034        exec_spawn_id: &ClientOrderId,
3035        active_only: bool,
3036    ) -> Option<Quantity> {
3037        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
3038
3039        let mut total_quantity: Option<Quantity> = None;
3040
3041        for spawn_order in exec_spawn_orders {
3042            if active_only && spawn_order.is_closed() {
3043                continue;
3044            }
3045
3046            match total_quantity.as_mut() {
3047                Some(total) => *total = *total + spawn_order.leaves_qty(),
3048                None => total_quantity = Some(spawn_order.leaves_qty()),
3049            }
3050        }
3051
3052        total_quantity
3053    }
3054
3055    // -- POSITION QUERIES ------------------------------------------------------------------------
3056
3057    /// Returns a reference to the position with the `position_id` (if found).
3058    #[must_use]
3059    pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
3060        self.positions.get(position_id)
3061    }
3062
3063    /// Returns a reference to the position for the `client_order_id` (if found).
3064    #[must_use]
3065    pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
3066        self.index
3067            .order_position
3068            .get(client_order_id)
3069            .and_then(|position_id| self.positions.get(position_id))
3070    }
3071
3072    /// Returns a reference to the position ID for the `client_order_id` (if found).
3073    #[must_use]
3074    pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
3075        self.index.order_position.get(client_order_id)
3076    }
3077
3078    /// Returns a reference to all positions matching the optional filter parameters.
3079    #[must_use]
3080    pub fn positions(
3081        &self,
3082        venue: Option<&Venue>,
3083        instrument_id: Option<&InstrumentId>,
3084        strategy_id: Option<&StrategyId>,
3085        account_id: Option<&AccountId>,
3086        side: Option<PositionSide>,
3087    ) -> Vec<&Position> {
3088        let position_ids = self.position_ids(venue, instrument_id, strategy_id, account_id);
3089        self.get_positions_for_ids(&position_ids, side)
3090    }
3091
3092    /// Returns a reference to all open positions matching the optional filter parameters.
3093    #[must_use]
3094    pub fn positions_open(
3095        &self,
3096        venue: Option<&Venue>,
3097        instrument_id: Option<&InstrumentId>,
3098        strategy_id: Option<&StrategyId>,
3099        account_id: Option<&AccountId>,
3100        side: Option<PositionSide>,
3101    ) -> Vec<&Position> {
3102        let position_ids = self.position_open_ids(venue, instrument_id, strategy_id, account_id);
3103        self.get_positions_for_ids(&position_ids, side)
3104    }
3105
3106    /// Returns a reference to all closed positions matching the optional filter parameters.
3107    #[must_use]
3108    pub fn positions_closed(
3109        &self,
3110        venue: Option<&Venue>,
3111        instrument_id: Option<&InstrumentId>,
3112        strategy_id: Option<&StrategyId>,
3113        account_id: Option<&AccountId>,
3114        side: Option<PositionSide>,
3115    ) -> Vec<&Position> {
3116        let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id, account_id);
3117        self.get_positions_for_ids(&position_ids, side)
3118    }
3119
3120    /// Returns whether a position with the `position_id` exists.
3121    #[must_use]
3122    pub fn position_exists(&self, position_id: &PositionId) -> bool {
3123        self.index.positions.contains(position_id)
3124    }
3125
3126    /// Returns whether a position with the `position_id` is open.
3127    #[must_use]
3128    pub fn is_position_open(&self, position_id: &PositionId) -> bool {
3129        self.index.positions_open.contains(position_id)
3130    }
3131
3132    /// Returns whether a position with the `position_id` is closed.
3133    #[must_use]
3134    pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
3135        self.index.positions_closed.contains(position_id)
3136    }
3137
3138    /// Returns the count of all open positions.
3139    #[must_use]
3140    pub fn positions_open_count(
3141        &self,
3142        venue: Option<&Venue>,
3143        instrument_id: Option<&InstrumentId>,
3144        strategy_id: Option<&StrategyId>,
3145        account_id: Option<&AccountId>,
3146        side: Option<PositionSide>,
3147    ) -> usize {
3148        self.positions_open(venue, instrument_id, strategy_id, account_id, side)
3149            .len()
3150    }
3151
3152    /// Returns the count of all closed positions.
3153    #[must_use]
3154    pub fn positions_closed_count(
3155        &self,
3156        venue: Option<&Venue>,
3157        instrument_id: Option<&InstrumentId>,
3158        strategy_id: Option<&StrategyId>,
3159        account_id: Option<&AccountId>,
3160        side: Option<PositionSide>,
3161    ) -> usize {
3162        self.positions_closed(venue, instrument_id, strategy_id, account_id, side)
3163            .len()
3164    }
3165
3166    /// Returns the count of all positions.
3167    #[must_use]
3168    pub fn positions_total_count(
3169        &self,
3170        venue: Option<&Venue>,
3171        instrument_id: Option<&InstrumentId>,
3172        strategy_id: Option<&StrategyId>,
3173        account_id: Option<&AccountId>,
3174        side: Option<PositionSide>,
3175    ) -> usize {
3176        self.positions(venue, instrument_id, strategy_id, account_id, side)
3177            .len()
3178    }
3179
3180    // -- STRATEGY QUERIES ------------------------------------------------------------------------
3181
3182    /// Gets a reference to the strategy ID for the `client_order_id` (if found).
3183    #[must_use]
3184    pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
3185        self.index.order_strategy.get(client_order_id)
3186    }
3187
3188    /// Gets a reference to the strategy ID for the `position_id` (if found).
3189    #[must_use]
3190    pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
3191        self.index.position_strategy.get(position_id)
3192    }
3193
3194    // -- GENERAL ---------------------------------------------------------------------------------
3195
3196    /// Gets a reference to the general value for the `key` (if found).
3197    ///
3198    /// # Errors
3199    ///
3200    /// Returns an error if the `key` is invalid.
3201    pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
3202        check_valid_string_ascii(key, stringify!(key))?;
3203
3204        Ok(self.general.get(key))
3205    }
3206
3207    // -- DATA QUERIES ----------------------------------------------------------------------------
3208
3209    /// Returns the price for the `instrument_id` and `price_type` (if found).
3210    #[must_use]
3211    pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
3212        match price_type {
3213            PriceType::Bid => self
3214                .quotes
3215                .get(instrument_id)
3216                .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
3217            PriceType::Ask => self
3218                .quotes
3219                .get(instrument_id)
3220                .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
3221            PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
3222                quotes.front().map(|quote| {
3223                    Price::new(
3224                        f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
3225                        quote.bid_price.precision + 1,
3226                    )
3227                })
3228            }),
3229            PriceType::Last => self
3230                .trades
3231                .get(instrument_id)
3232                .and_then(|trades| trades.front().map(|trade| trade.price)),
3233            PriceType::Mark => self
3234                .mark_prices
3235                .get(instrument_id)
3236                .and_then(|marks| marks.front().map(|mark| mark.value)),
3237        }
3238    }
3239
3240    /// Gets all quotes for the `instrument_id`.
3241    #[must_use]
3242    pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
3243        self.quotes
3244            .get(instrument_id)
3245            .map(|quotes| quotes.iter().copied().collect())
3246    }
3247
3248    /// Gets all trades for the `instrument_id`.
3249    #[must_use]
3250    pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
3251        self.trades
3252            .get(instrument_id)
3253            .map(|trades| trades.iter().copied().collect())
3254    }
3255
3256    /// Gets all mark price updates for the `instrument_id`.
3257    #[must_use]
3258    pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
3259        self.mark_prices
3260            .get(instrument_id)
3261            .map(|mark_prices| mark_prices.iter().copied().collect())
3262    }
3263
3264    /// Gets all index price updates for the `instrument_id`.
3265    #[must_use]
3266    pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
3267        self.index_prices
3268            .get(instrument_id)
3269            .map(|index_prices| index_prices.iter().copied().collect())
3270    }
3271
3272    /// Gets all funding rate updates for the `instrument_id`.
3273    #[must_use]
3274    pub fn funding_rates(&self, instrument_id: &InstrumentId) -> Option<Vec<FundingRateUpdate>> {
3275        self.funding_rates
3276            .get(instrument_id)
3277            .map(|funding_rates| funding_rates.iter().copied().collect())
3278    }
3279
3280    /// Gets all bars for the `bar_type`.
3281    #[must_use]
3282    pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
3283        self.bars
3284            .get(bar_type)
3285            .map(|bars| bars.iter().copied().collect())
3286    }
3287
3288    /// Gets a reference to the order book for the `instrument_id`.
3289    #[must_use]
3290    pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
3291        self.books.get(instrument_id)
3292    }
3293
3294    /// Gets a reference to the order book for the `instrument_id`.
3295    #[must_use]
3296    pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
3297        self.books.get_mut(instrument_id)
3298    }
3299
3300    /// Gets a reference to the own order book for the `instrument_id`.
3301    #[must_use]
3302    pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
3303        self.own_books.get(instrument_id)
3304    }
3305
3306    /// Gets a reference to the own order book for the `instrument_id`.
3307    #[must_use]
3308    pub fn own_order_book_mut(
3309        &mut self,
3310        instrument_id: &InstrumentId,
3311    ) -> Option<&mut OwnOrderBook> {
3312        self.own_books.get_mut(instrument_id)
3313    }
3314
3315    /// Gets a reference to the latest quote for the `instrument_id`.
3316    #[must_use]
3317    pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
3318        self.quotes
3319            .get(instrument_id)
3320            .and_then(|quotes| quotes.front())
3321    }
3322
3323    /// Gets a reference to the latest trade for the `instrument_id`.
3324    #[must_use]
3325    pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
3326        self.trades
3327            .get(instrument_id)
3328            .and_then(|trades| trades.front())
3329    }
3330
3331    /// Gets a reference to the latest mark price update for the `instrument_id`.
3332    #[must_use]
3333    pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
3334        self.mark_prices
3335            .get(instrument_id)
3336            .and_then(|mark_prices| mark_prices.front())
3337    }
3338
3339    /// Gets a reference to the latest index price update for the `instrument_id`.
3340    #[must_use]
3341    pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
3342        self.index_prices
3343            .get(instrument_id)
3344            .and_then(|index_prices| index_prices.front())
3345    }
3346
3347    /// Gets a reference to the latest funding rate update for the `instrument_id`.
3348    #[must_use]
3349    pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
3350        self.funding_rates
3351            .get(instrument_id)
3352            .and_then(|funding_rates| funding_rates.front())
3353    }
3354
3355    /// Gets a reference to the latest bar for the `bar_type`.
3356    #[must_use]
3357    pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
3358        self.bars.get(bar_type).and_then(|bars| bars.front())
3359    }
3360
3361    /// Gets the order book update count for the `instrument_id`.
3362    #[must_use]
3363    pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
3364        self.books
3365            .get(instrument_id)
3366            .map_or(0, |book| book.update_count) as usize
3367    }
3368
3369    /// Gets the quote tick count for the `instrument_id`.
3370    #[must_use]
3371    pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
3372        self.quotes
3373            .get(instrument_id)
3374            .map_or(0, std::collections::VecDeque::len)
3375    }
3376
3377    /// Gets the trade tick count for the `instrument_id`.
3378    #[must_use]
3379    pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
3380        self.trades
3381            .get(instrument_id)
3382            .map_or(0, std::collections::VecDeque::len)
3383    }
3384
3385    /// Gets the bar count for the `instrument_id`.
3386    #[must_use]
3387    pub fn bar_count(&self, bar_type: &BarType) -> usize {
3388        self.bars
3389            .get(bar_type)
3390            .map_or(0, std::collections::VecDeque::len)
3391    }
3392
3393    /// Returns whether the cache contains an order book for the `instrument_id`.
3394    #[must_use]
3395    pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
3396        self.books.contains_key(instrument_id)
3397    }
3398
3399    /// Returns whether the cache contains quotes for the `instrument_id`.
3400    #[must_use]
3401    pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
3402        self.quote_count(instrument_id) > 0
3403    }
3404
3405    /// Returns whether the cache contains trades for the `instrument_id`.
3406    #[must_use]
3407    pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
3408        self.trade_count(instrument_id) > 0
3409    }
3410
3411    /// Returns whether the cache contains bars for the `bar_type`.
3412    #[must_use]
3413    pub fn has_bars(&self, bar_type: &BarType) -> bool {
3414        self.bar_count(bar_type) > 0
3415    }
3416
3417    #[must_use]
3418    pub fn get_xrate(
3419        &self,
3420        venue: Venue,
3421        from_currency: Currency,
3422        to_currency: Currency,
3423        price_type: PriceType,
3424    ) -> Option<f64> {
3425        if from_currency == to_currency {
3426            // When the source and target currencies are identical,
3427            // no conversion is needed; return an exchange rate of 1.0.
3428            return Some(1.0);
3429        }
3430
3431        let (bid_quote, ask_quote) = self.build_quote_table(&venue);
3432
3433        match get_exchange_rate(
3434            from_currency.code,
3435            to_currency.code,
3436            price_type,
3437            bid_quote,
3438            ask_quote,
3439        ) {
3440            Ok(rate) => rate,
3441            Err(e) => {
3442                log::error!("Failed to calculate xrate: {e}");
3443                None
3444            }
3445        }
3446    }
3447
3448    fn build_quote_table(&self, venue: &Venue) -> (AHashMap<String, f64>, AHashMap<String, f64>) {
3449        let mut bid_quotes = AHashMap::new();
3450        let mut ask_quotes = AHashMap::new();
3451
3452        for instrument_id in self.instruments.keys() {
3453            if instrument_id.venue != *venue {
3454                continue;
3455            }
3456
3457            let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
3458                if let Some(tick) = ticks.front() {
3459                    (tick.bid_price, tick.ask_price)
3460                } else {
3461                    continue; // Empty ticks vector
3462                }
3463            } else {
3464                let bid_bar = self
3465                    .bars
3466                    .iter()
3467                    .find(|(k, _)| {
3468                        k.instrument_id() == *instrument_id
3469                            && matches!(k.spec().price_type, PriceType::Bid)
3470                    })
3471                    .map(|(_, v)| v);
3472
3473                let ask_bar = self
3474                    .bars
3475                    .iter()
3476                    .find(|(k, _)| {
3477                        k.instrument_id() == *instrument_id
3478                            && matches!(k.spec().price_type, PriceType::Ask)
3479                    })
3480                    .map(|(_, v)| v);
3481
3482                match (bid_bar, ask_bar) {
3483                    (Some(bid), Some(ask)) => {
3484                        match (bid.front(), ask.front()) {
3485                            (Some(bid_bar), Some(ask_bar)) => (bid_bar.close, ask_bar.close),
3486                            _ => {
3487                                // Empty bar VecDeques
3488                                continue;
3489                            }
3490                        }
3491                    }
3492                    _ => continue,
3493                }
3494            };
3495
3496            bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
3497            ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
3498        }
3499
3500        (bid_quotes, ask_quotes)
3501    }
3502
3503    /// Returns the mark exchange rate for the given currency pair, or `None` if not set.
3504    #[must_use]
3505    pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
3506        self.mark_xrates.get(&(from_currency, to_currency)).copied()
3507    }
3508
3509    /// Sets the mark exchange rate for the given currency pair and automatically sets the inverse rate.
3510    ///
3511    /// # Panics
3512    ///
3513    /// Panics if `xrate` is not positive.
3514    pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
3515        assert!(xrate > 0.0, "xrate was zero");
3516        self.mark_xrates.insert((from_currency, to_currency), xrate);
3517        self.mark_xrates
3518            .insert((to_currency, from_currency), 1.0 / xrate);
3519    }
3520
3521    /// Clears the mark exchange rate for the given currency pair.
3522    pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
3523        let _ = self.mark_xrates.remove(&(from_currency, to_currency));
3524    }
3525
3526    /// Clears all mark exchange rates.
3527    pub fn clear_mark_xrates(&mut self) {
3528        self.mark_xrates.clear();
3529    }
3530
3531    // -- INSTRUMENT QUERIES ----------------------------------------------------------------------
3532
3533    /// Returns a reference to the instrument for the `instrument_id` (if found).
3534    #[must_use]
3535    pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
3536        self.instruments.get(instrument_id)
3537    }
3538
3539    /// Returns references to all instrument IDs for the `venue`.
3540    #[must_use]
3541    pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
3542        match venue {
3543            Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
3544            None => self.instruments.keys().collect(),
3545        }
3546    }
3547
3548    /// Returns references to all instruments for the `venue`.
3549    #[must_use]
3550    pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
3551        self.instruments
3552            .values()
3553            .filter(|i| &i.id().venue == venue)
3554            .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
3555            .collect()
3556    }
3557
3558    /// Returns references to all bar types contained in the cache.
3559    #[must_use]
3560    pub fn bar_types(
3561        &self,
3562        instrument_id: Option<&InstrumentId>,
3563        price_type: Option<&PriceType>,
3564        aggregation_source: AggregationSource,
3565    ) -> Vec<&BarType> {
3566        let mut bar_types = self
3567            .bars
3568            .keys()
3569            .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
3570            .collect::<Vec<&BarType>>();
3571
3572        if let Some(instrument_id) = instrument_id {
3573            bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
3574        }
3575
3576        if let Some(price_type) = price_type {
3577            bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
3578        }
3579
3580        bar_types
3581    }
3582
3583    // -- SYNTHETIC QUERIES -----------------------------------------------------------------------
3584
3585    /// Returns a reference to the synthetic instrument for the `instrument_id` (if found).
3586    #[must_use]
3587    pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
3588        self.synthetics.get(instrument_id)
3589    }
3590
3591    /// Returns references to instrument IDs for all synthetic instruments contained in the cache.
3592    #[must_use]
3593    pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
3594        self.synthetics.keys().collect()
3595    }
3596
3597    /// Returns references to all synthetic instruments contained in the cache.
3598    #[must_use]
3599    pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
3600        self.synthetics.values().collect()
3601    }
3602
3603    // -- ACCOUNT QUERIES -----------------------------------------------------------------------
3604
3605    /// Returns a reference to the account for the `account_id` (if found).
3606    #[must_use]
3607    pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
3608        self.accounts.get(account_id)
3609    }
3610
3611    /// Returns a reference to the account for the `venue` (if found).
3612    #[must_use]
3613    pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
3614        self.index
3615            .venue_account
3616            .get(venue)
3617            .and_then(|account_id| self.accounts.get(account_id))
3618    }
3619
3620    /// Returns a reference to the account ID for the `venue` (if found).
3621    #[must_use]
3622    pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
3623        self.index.venue_account.get(venue)
3624    }
3625
3626    /// Returns references to all accounts for the `account_id`.
3627    #[must_use]
3628    pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
3629        self.accounts
3630            .values()
3631            .filter(|account| &account.id() == account_id)
3632            .collect()
3633    }
3634
3635    /// Updates the own order book with an order.
3636    ///
3637    /// This method adds, updates, or removes an order from the own order book
3638    /// based on the order's current state.
3639    ///
3640    /// Orders without prices (MARKET, etc.) are skipped as they cannot be
3641    /// represented in own books.
3642    pub fn update_own_order_book(&mut self, order: &OrderAny) {
3643        if !order.has_price() {
3644            return;
3645        }
3646
3647        let instrument_id = order.instrument_id();
3648
3649        let own_book = self
3650            .own_books
3651            .entry(instrument_id)
3652            .or_insert_with(|| OwnOrderBook::new(instrument_id));
3653
3654        let own_book_order = order.to_own_book_order();
3655
3656        if order.is_closed() {
3657            if let Err(e) = own_book.delete(own_book_order) {
3658                log::debug!(
3659                    "Failed to delete order {} from own book: {e}",
3660                    order.client_order_id(),
3661                );
3662            } else {
3663                log::debug!("Deleted order {} from own book", order.client_order_id());
3664            }
3665        } else {
3666            // Add or update the order in the own book
3667            if let Err(e) = own_book.update(own_book_order) {
3668                log::debug!(
3669                    "Failed to update order {} in own book: {e}; inserting instead",
3670                    order.client_order_id(),
3671                );
3672                own_book.add(own_book_order);
3673            }
3674            log::debug!("Updated order {} in own book", order.client_order_id());
3675        }
3676    }
3677
3678    /// Force removal of an order from own order books and clean up all indexes.
3679    ///
3680    /// This method is used when order event application fails and we need to ensure
3681    /// terminal orders are properly cleaned up from own books and all relevant indexes.
3682    /// Replicates the index cleanup that update_order performs for closed orders.
3683    pub fn force_remove_from_own_order_book(&mut self, client_order_id: &ClientOrderId) {
3684        let order = match self.orders.get(client_order_id) {
3685            Some(order) => order,
3686            None => return,
3687        };
3688
3689        self.index.orders_open.remove(client_order_id);
3690        self.index.orders_pending_cancel.remove(client_order_id);
3691        self.index.orders_inflight.remove(client_order_id);
3692        self.index.orders_emulated.remove(client_order_id);
3693
3694        if let Some(own_book) = self.own_books.get_mut(&order.instrument_id())
3695            && order.has_price()
3696        {
3697            let own_book_order = order.to_own_book_order();
3698            if let Err(e) = own_book.delete(own_book_order) {
3699                log::debug!("Could not force delete {client_order_id} from own book: {e}");
3700            } else {
3701                log::debug!("Force deleted {client_order_id} from own book");
3702            }
3703        }
3704
3705        self.index.orders_closed.insert(*client_order_id);
3706    }
3707
3708    /// Audit all own order books against open and inflight order indexes.
3709    ///
3710    /// Ensures closed orders are removed from own order books. This includes both
3711    /// orders tracked in `orders_open` (ACCEPTED, TRIGGERED, PENDING_*, PARTIALLY_FILLED)
3712    /// and `orders_inflight` (INITIALIZED, SUBMITTED) to prevent false positives
3713    /// during venue latency windows.
3714    pub fn audit_own_order_books(&mut self) {
3715        log::debug!("Starting own books audit");
3716        let start = std::time::Instant::now();
3717
3718        // Build union of open and inflight orders for audit,
3719        // this prevents false positives for SUBMITTED orders during venue latency.
3720        let valid_order_ids: AHashSet<ClientOrderId> = self
3721            .index
3722            .orders_open
3723            .union(&self.index.orders_inflight)
3724            .copied()
3725            .collect();
3726
3727        for own_book in self.own_books.values_mut() {
3728            own_book.audit_open_orders(&valid_order_ids);
3729        }
3730
3731        log::debug!("Completed own books audit in {:?}", start.elapsed());
3732    }
3733}