Skip to main content

nautilus_common/cache/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! In-memory cache for market and execution data, with optional persistent backing.
17//!
18//! Provides methods to load, query, and update cached data such as instruments, orders, and prices.
19
20pub mod config;
21pub mod database;
22pub mod fifo;
23pub mod quote;
24pub mod refs;
25
26mod index;
27
28#[cfg(test)]
29mod tests;
30
31use std::{
32    borrow::Cow,
33    cell::{Ref, RefCell},
34    collections::VecDeque,
35    fmt::{Debug, Display},
36    rc::Rc,
37    str::FromStr,
38    time::{SystemTime, UNIX_EPOCH},
39};
40
41use ahash::{AHashMap, AHashSet};
42use bytes::Bytes;
43pub use config::CacheConfig; // Re-export
44use database::{CacheDatabaseAdapter, CacheMap};
45use index::CacheIndex;
46use nautilus_core::{
47    SharedCell, UUID4, UnixNanos,
48    correctness::{
49        check_key_not_in_map, check_predicate_false, check_slice_not_empty,
50        check_valid_string_ascii,
51    },
52    datetime::secs_to_nanos_unchecked,
53};
54use nautilus_model::{
55    accounts::{Account, AccountAny},
56    data::{
57        Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, InstrumentStatus,
58        MarkPriceUpdate, QuoteTick, TradeTick, YieldCurveData, option_chain::OptionGreeks,
59    },
60    enums::{
61        AggregationSource, ContingencyType, InstrumentClass, OmsType, OrderSide, PositionSide,
62        PriceType, TriggerType,
63    },
64    events::{AccountState, OrderEventAny},
65    identifiers::{
66        AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
67        OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
68    },
69    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
70    orderbook::{
71        OrderBook,
72        own::{OwnOrderBook, should_handle_own_book_order},
73    },
74    orders::{Order, OrderAny, OrderError, OrderList},
75    position::Position,
76    types::{Currency, Money, Price, Quantity},
77};
78pub use refs::{AccountRef, AccountRefMut, OrderRef, OrderRefMut, PositionRef, PositionRefMut};
79use ustr::Ustr;
80
81use crate::xrate::get_exchange_rate;
82
83/// Cache-owned reference to a snapshot blob.
84///
85/// The cache writes and later fetches the blob; external systems persist this opaque reference
86/// and may hash the bytes before recording a durable anchor.
87#[derive(Clone, Debug, PartialEq, Eq)]
88pub struct CacheSnapshotRef {
89    /// Opaque cache-owned snapshot location.
90    pub blob_ref: String,
91    /// Snapshot bytes stored under [`Self::blob_ref`].
92    pub blob: Bytes,
93}
94
95impl CacheSnapshotRef {
96    /// Creates a new [`CacheSnapshotRef`].
97    #[must_use]
98    pub fn new(blob_ref: impl Into<String>, blob: impl Into<Bytes>) -> Self {
99        Self {
100            blob_ref: blob_ref.into(),
101            blob: blob.into(),
102        }
103    }
104}
105
106/// Read-only view over the platform cache.
107///
108/// Adapter-facing code receives this type instead of the mutable cache handle so cache writes stay
109/// owned by the data and execution engines.
110#[derive(Clone, Debug)]
111pub struct CacheView {
112    inner: Rc<RefCell<Cache>>,
113}
114
115impl CacheView {
116    /// Creates a new [`CacheView`] from a cache handle.
117    #[must_use]
118    pub fn new(inner: Rc<RefCell<Cache>>) -> Self {
119        Self { inner }
120    }
121
122    /// Borrows the cache immutably.
123    ///
124    /// # Panics
125    ///
126    /// Panics if the cache is already mutably borrowed.
127    pub fn borrow(&self) -> Ref<'_, Cache> {
128        self.inner.borrow()
129    }
130}
131
132impl From<Rc<RefCell<Cache>>> for CacheView {
133    fn from(inner: Rc<RefCell<Cache>>) -> Self {
134        Self::new(inner)
135    }
136}
137
138// Filter sources resolved from an order or position query.
139//
140// Captures the three states of a multi-key index intersection without committing to an owned
141// result set: no filters at all (the caller iterates the bucket directly), one or more filter
142// sources resolved successfully (intersect them lazily), or one filter resolved to no entries
143// at all (the result is unconditionally empty).
144enum FilterSources<'a, K> {
145    Unfiltered,
146    Empty,
147    Sets(Vec<&'a AHashSet<K>>),
148}
149
150// Intersects a non-empty collection of filter sources by sorting them ascending by length and
151// driving the loop from the smallest set, collecting one `AHashSet` of matching keys.
152//
153// Single-source inputs short-circuit to a direct `AHashSet::clone` (memcopy of the bucket
154// table) rather than rehashing each entry through `iter().copied().collect()`.
155fn intersect_filter_sources<K>(mut sources: Vec<&AHashSet<K>>) -> AHashSet<K>
156where
157    K: Copy + Eq + std::hash::Hash,
158{
159    debug_assert!(!sources.is_empty());
160    sources.sort_unstable_by_key(|s| s.len());
161    let driver = sources[0];
162    let rest = &sources[1..];
163
164    if rest.is_empty() {
165        return driver.clone();
166    }
167
168    driver
169        .iter()
170        .filter(|id| rest.iter().all(|s| s.contains(id)))
171        .copied()
172        .collect()
173}
174
175// Intersects `bucket` with one or more filter sources.
176//
177// For exactly one filter source, iterates the larger of (bucket, filter) and looks up in the
178// smaller. The larger set scans linearly (HW-prefetcher friendly) and the smaller stays hot in
179// cache, which empirically beats the size-ordered approach when the smaller filter is too
180// large to fit in L1 (e.g., a 20k-entry venue filter against a 100k-entry bucket). For two or
181// more filters the size-ordered driver is reinstated and the bucket joins the source list.
182fn intersect_pair_or_many<'a, K>(
183    bucket: &'a AHashSet<K>,
184    mut sources: Vec<&'a AHashSet<K>>,
185) -> AHashSet<K>
186where
187    K: Copy + Eq + std::hash::Hash,
188{
189    debug_assert!(!sources.is_empty());
190    if sources.len() == 1 {
191        let filter = sources[0];
192        let (larger, smaller) = if bucket.len() >= filter.len() {
193            (bucket, filter)
194        } else {
195            (filter, bucket)
196        };
197        return larger.intersection(smaller).copied().collect();
198    }
199
200    sources.push(bucket);
201    intersect_filter_sources(sources)
202}
203
204/// A common in-memory `Cache` for market and execution related data.
205#[cfg_attr(
206    feature = "python",
207    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
208)]
209pub struct Cache {
210    config: CacheConfig,
211    index: CacheIndex,
212    database: Option<Box<dyn CacheDatabaseAdapter>>,
213    general: AHashMap<String, Bytes>,
214    currencies: AHashMap<Ustr, Currency>,
215    instruments: AHashMap<InstrumentId, InstrumentAny>,
216    synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
217    books: AHashMap<InstrumentId, OrderBook>,
218    own_books: AHashMap<InstrumentId, OwnOrderBook>,
219    quotes: AHashMap<InstrumentId, VecDeque<QuoteTick>>,
220    trades: AHashMap<InstrumentId, VecDeque<TradeTick>>,
221    mark_xrates: AHashMap<(Currency, Currency), f64>,
222    mark_prices: AHashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
223    index_prices: AHashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
224    funding_rates: AHashMap<InstrumentId, VecDeque<FundingRateUpdate>>,
225    instrument_statuses: AHashMap<InstrumentId, VecDeque<InstrumentStatus>>,
226    bars: AHashMap<BarType, VecDeque<Bar>>,
227    greeks: AHashMap<InstrumentId, GreeksData>,
228    option_greeks: AHashMap<InstrumentId, OptionGreeks>,
229    yield_curves: AHashMap<String, YieldCurveData>,
230    accounts: AHashMap<AccountId, SharedCell<AccountAny>>,
231    orders: AHashMap<ClientOrderId, SharedCell<OrderAny>>,
232    order_lists: AHashMap<OrderListId, OrderList>,
233    positions: AHashMap<PositionId, SharedCell<Position>>,
234    position_snapshots: AHashMap<PositionId, Vec<Bytes>>,
235    #[cfg(feature = "defi")]
236    pub(crate) defi: crate::defi::cache::DefiCache,
237}
238
239impl Debug for Cache {
240    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241        f.debug_struct(stringify!(Cache))
242            .field("config", &self.config)
243            .field("index", &self.index)
244            .field("general", &self.general)
245            .field("currencies", &self.currencies)
246            .field("instruments", &self.instruments)
247            .field("synthetics", &self.synthetics)
248            .field("books", &self.books)
249            .field("own_books", &self.own_books)
250            .field("quotes", &self.quotes)
251            .field("trades", &self.trades)
252            .field("mark_xrates", &self.mark_xrates)
253            .field("mark_prices", &self.mark_prices)
254            .field("index_prices", &self.index_prices)
255            .field("funding_rates", &self.funding_rates)
256            .field("instrument_statuses", &self.instrument_statuses)
257            .field("bars", &self.bars)
258            .field("greeks", &self.greeks)
259            .field("option_greeks", &self.option_greeks)
260            .field("yield_curves", &self.yield_curves)
261            .field("accounts", &self.accounts)
262            .field("orders", &self.orders)
263            .field("order_lists", &self.order_lists)
264            .field("positions", &self.positions)
265            .field("position_snapshots", &self.position_snapshots)
266            .finish()
267    }
268}
269
270impl Default for Cache {
271    /// Creates a new default [`Cache`] instance.
272    fn default() -> Self {
273        Self::new(Some(CacheConfig::default()), None)
274    }
275}
276
277impl Cache {
278    /// Creates a new [`Cache`] instance with optional configuration and database adapter.
279    #[must_use]
280    /// # Note
281    ///
282    /// Uses provided `CacheConfig` or defaults, and optional `CacheDatabaseAdapter` for persistence.
283    pub fn new(
284        config: Option<CacheConfig>,
285        database: Option<Box<dyn CacheDatabaseAdapter>>,
286    ) -> Self {
287        Self {
288            config: config.unwrap_or_default(),
289            index: CacheIndex::default(),
290            database,
291            general: AHashMap::new(),
292            currencies: AHashMap::new(),
293            instruments: AHashMap::new(),
294            synthetics: AHashMap::new(),
295            books: AHashMap::new(),
296            own_books: AHashMap::new(),
297            quotes: AHashMap::new(),
298            trades: AHashMap::new(),
299            mark_xrates: AHashMap::new(),
300            mark_prices: AHashMap::new(),
301            index_prices: AHashMap::new(),
302            funding_rates: AHashMap::new(),
303            instrument_statuses: AHashMap::new(),
304            bars: AHashMap::new(),
305            greeks: AHashMap::new(),
306            option_greeks: AHashMap::new(),
307            yield_curves: AHashMap::new(),
308            accounts: AHashMap::new(),
309            orders: AHashMap::new(),
310            order_lists: AHashMap::new(),
311            positions: AHashMap::new(),
312            position_snapshots: AHashMap::new(),
313            #[cfg(feature = "defi")]
314            defi: crate::defi::cache::DefiCache::default(),
315        }
316    }
317
318    /// Returns the cache instances memory address.
319    #[must_use]
320    pub fn memory_address(&self) -> String {
321        format!("{:?}", std::ptr::from_ref(self))
322    }
323
324    /// Sets the cache database adapter for persistence.
325    ///
326    /// This allows setting or replacing the database adapter after cache construction.
327    pub fn set_database(&mut self, database: Box<dyn CacheDatabaseAdapter>) {
328        let type_name = std::any::type_name_of_val(&*database);
329        log::info!("Cache database adapter set: {type_name}");
330        self.database = Some(database);
331    }
332
333    // -- COMMANDS --------------------------------------------------------------------------------
334
335    /// Clears and reloads general entries from the database into the cache.
336    ///
337    /// # Errors
338    ///
339    /// Returns an error if loading general cache data fails.
340    pub fn cache_general(&mut self) -> anyhow::Result<()> {
341        self.general = match &mut self.database {
342            Some(db) => db.load()?,
343            None => AHashMap::new(),
344        };
345
346        log::info!(
347            "Cached {} general object(s) from database",
348            self.general.len()
349        );
350        Ok(())
351    }
352
353    /// Loads all core caches (currencies, instruments, accounts, orders, positions) from the database.
354    ///
355    /// # Errors
356    ///
357    /// Returns an error if loading all cache data fails.
358    pub async fn cache_all(&mut self) -> anyhow::Result<()> {
359        let cache_map = match &self.database {
360            Some(db) => db.load_all().await?,
361            None => CacheMap::default(),
362        };
363
364        self.currencies = cache_map.currencies;
365        self.instruments = cache_map.instruments;
366        self.synthetics = cache_map.synthetics;
367        self.accounts = cache_map
368            .accounts
369            .into_iter()
370            .map(|(id, account)| (id, SharedCell::new(account)))
371            .collect();
372        self.orders = cache_map
373            .orders
374            .into_iter()
375            .map(|(id, order)| (id, SharedCell::new(order)))
376            .collect();
377        self.positions = cache_map
378            .positions
379            .into_iter()
380            .map(|(id, position)| (id, SharedCell::new(position)))
381            .collect();
382
383        self.assign_position_ids_to_contingencies();
384        Ok(())
385    }
386
387    /// Clears and reloads the currency cache from the database.
388    ///
389    /// # Errors
390    ///
391    /// Returns an error if loading currencies cache fails.
392    pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
393        self.currencies = match &mut self.database {
394            Some(db) => db.load_currencies().await?,
395            None => AHashMap::new(),
396        };
397
398        log::info!("Cached {} currencies from database", self.general.len());
399        Ok(())
400    }
401
402    /// Clears and reloads the instrument cache from the database.
403    ///
404    /// # Errors
405    ///
406    /// Returns an error if loading instruments cache fails.
407    pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
408        self.instruments = match &mut self.database {
409            Some(db) => db.load_instruments().await?,
410            None => AHashMap::new(),
411        };
412
413        log::info!("Cached {} instruments from database", self.general.len());
414        Ok(())
415    }
416
417    /// Clears and reloads the synthetic instrument cache from the database.
418    ///
419    /// # Errors
420    ///
421    /// Returns an error if loading synthetic instruments cache fails.
422    pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
423        self.synthetics = match &mut self.database {
424            Some(db) => db.load_synthetics().await?,
425            None => AHashMap::new(),
426        };
427
428        log::info!(
429            "Cached {} synthetic instruments from database",
430            self.general.len()
431        );
432        Ok(())
433    }
434
435    /// Clears and reloads the account cache from the database.
436    ///
437    /// # Errors
438    ///
439    /// Returns an error if loading accounts cache fails.
440    pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
441        self.accounts = match &mut self.database {
442            Some(db) => db
443                .load_accounts()
444                .await?
445                .into_iter()
446                .map(|(id, account)| (id, SharedCell::new(account)))
447                .collect(),
448            None => AHashMap::new(),
449        };
450
451        log::info!(
452            "Cached {} synthetic instruments from database",
453            self.general.len()
454        );
455        Ok(())
456    }
457
458    /// Clears and reloads the order cache from the database.
459    ///
460    /// # Errors
461    ///
462    /// Returns an error if loading orders cache fails.
463    pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
464        self.orders = match &mut self.database {
465            Some(db) => db
466                .load_orders()
467                .await?
468                .into_iter()
469                .map(|(id, order)| (id, SharedCell::new(order)))
470                .collect(),
471            None => AHashMap::new(),
472        };
473
474        log::info!("Cached {} orders from database", self.general.len());
475
476        self.assign_position_ids_to_contingencies();
477        Ok(())
478    }
479
480    /// Clears and reloads the position cache from the database.
481    ///
482    /// # Errors
483    ///
484    /// Returns an error if loading positions cache fails.
485    pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
486        self.positions = match &mut self.database {
487            Some(db) => db
488                .load_positions()
489                .await?
490                .into_iter()
491                .map(|(id, position)| (id, SharedCell::new(position)))
492                .collect(),
493            None => AHashMap::new(),
494        };
495
496        log::info!("Cached {} positions from database", self.general.len());
497        Ok(())
498    }
499
500    /// Clears the current cache index and re-build.
501    pub fn build_index(&mut self) {
502        log::debug!("Building index");
503
504        // Index accounts
505        for account_id in self.accounts.keys() {
506            self.index
507                .venue_account
508                .insert(account_id.get_issuer(), *account_id);
509        }
510
511        // Index orders
512        for (client_order_id, order_cell) in &self.orders {
513            let order = order_cell.borrow();
514            let instrument_id = order.instrument_id();
515            let venue = instrument_id.venue;
516            let strategy_id = order.strategy_id();
517
518            // 1: Build index.venue_orders -> {Venue, {ClientOrderId}}
519            self.index
520                .venue_orders
521                .entry(venue)
522                .or_default()
523                .insert(*client_order_id);
524
525            // 2: Build index.order_ids -> {VenueOrderId, ClientOrderId}
526            if let Some(venue_order_id) = order.venue_order_id() {
527                self.index
528                    .venue_order_ids
529                    .insert(venue_order_id, *client_order_id);
530            }
531
532            // 3: Build index.order_position -> {ClientOrderId, PositionId}
533            if let Some(position_id) = order.position_id() {
534                self.index
535                    .order_position
536                    .insert(*client_order_id, position_id);
537            }
538
539            // 4: Build index.order_strategy -> {ClientOrderId, StrategyId}
540            self.index
541                .order_strategy
542                .insert(*client_order_id, order.strategy_id());
543
544            // 5: Build index.instrument_orders -> {InstrumentId, {ClientOrderId}}
545            self.index
546                .instrument_orders
547                .entry(instrument_id)
548                .or_default()
549                .insert(*client_order_id);
550
551            // 6: Build index.strategy_orders -> {StrategyId, {ClientOrderId}}
552            self.index
553                .strategy_orders
554                .entry(strategy_id)
555                .or_default()
556                .insert(*client_order_id);
557
558            // 7: Build index.account_orders -> {AccountId, {ClientOrderId}}
559            if let Some(account_id) = order.account_id() {
560                self.index
561                    .account_orders
562                    .entry(account_id)
563                    .or_default()
564                    .insert(*client_order_id);
565            }
566
567            // 8: Build index.exec_algorithm_orders -> {ExecAlgorithmId, {ClientOrderId}}
568            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
569                self.index
570                    .exec_algorithm_orders
571                    .entry(exec_algorithm_id)
572                    .or_default()
573                    .insert(*client_order_id);
574            }
575
576            // 8: Build index.exec_spawn_orders -> {ClientOrderId, {ClientOrderId}}
577            if let Some(exec_spawn_id) = order.exec_spawn_id() {
578                self.index
579                    .exec_spawn_orders
580                    .entry(exec_spawn_id)
581                    .or_default()
582                    .insert(*client_order_id);
583            }
584
585            // 9: Build index.orders -> {ClientOrderId}
586            self.index.orders.insert(*client_order_id);
587
588            // 10: Build index.orders_active_local -> {ClientOrderId}
589            if order.is_active_local() {
590                self.index.orders_active_local.insert(*client_order_id);
591            }
592
593            // 11: Build index.orders_open -> {ClientOrderId}
594            if order.is_open() {
595                self.index.orders_open.insert(*client_order_id);
596            }
597
598            // 12: Build index.orders_closed -> {ClientOrderId}
599            if order.is_closed() {
600                self.index.orders_closed.insert(*client_order_id);
601            }
602
603            // 13: Build index.orders_emulated -> {ClientOrderId}
604            if let Some(emulation_trigger) = order.emulation_trigger()
605                && emulation_trigger != TriggerType::NoTrigger
606                && !order.is_closed()
607            {
608                self.index.orders_emulated.insert(*client_order_id);
609            }
610
611            // 14: Build index.orders_inflight -> {ClientOrderId}
612            if order.is_inflight() {
613                self.index.orders_inflight.insert(*client_order_id);
614            }
615
616            // 15: Build index.strategies -> {StrategyId}
617            self.index.strategies.insert(strategy_id);
618
619            // 16: Build index.strategies -> {ExecAlgorithmId}
620            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
621                self.index.exec_algorithms.insert(exec_algorithm_id);
622            }
623        }
624
625        // Index positions
626        for (position_id, position_cell) in &self.positions {
627            let position = position_cell.borrow();
628            let instrument_id = position.instrument_id;
629            let venue = instrument_id.venue;
630            let strategy_id = position.strategy_id;
631
632            // 1: Build index.venue_positions -> {Venue, {PositionId}}
633            self.index
634                .venue_positions
635                .entry(venue)
636                .or_default()
637                .insert(*position_id);
638
639            // 2: Build index.position_strategy -> {PositionId, StrategyId}
640            self.index
641                .position_strategy
642                .insert(*position_id, position.strategy_id);
643
644            // 3: Build index.position_orders -> {PositionId, {ClientOrderId}}
645            self.index
646                .position_orders
647                .entry(*position_id)
648                .or_default()
649                .extend(position.client_order_ids());
650
651            // 4: Build index.instrument_positions -> {InstrumentId, {PositionId}}
652            self.index
653                .instrument_positions
654                .entry(instrument_id)
655                .or_default()
656                .insert(*position_id);
657
658            // 5: Build index.strategy_positions -> {StrategyId, {PositionId}}
659            self.index
660                .strategy_positions
661                .entry(strategy_id)
662                .or_default()
663                .insert(*position_id);
664
665            // 6: Build index.account_positions -> {AccountId, {PositionId}}
666            self.index
667                .account_positions
668                .entry(position.account_id)
669                .or_default()
670                .insert(*position_id);
671
672            // 7: Build index.positions -> {PositionId}
673            self.index.positions.insert(*position_id);
674
675            // 8: Build index.positions_open -> {PositionId}
676            if position.is_open() {
677                self.index.positions_open.insert(*position_id);
678            }
679
680            // 9: Build index.positions_closed -> {PositionId}
681            if position.is_closed() {
682                self.index.positions_closed.insert(*position_id);
683            }
684
685            // 10: Build index.strategies -> {StrategyId}
686            self.index.strategies.insert(strategy_id);
687        }
688    }
689
690    /// Returns whether the cache has a backing database.
691    #[must_use]
692    pub const fn has_backing(&self) -> bool {
693        self.database.is_some()
694    }
695
696    // Calculate the unrealized profit and loss (PnL) for `position`.
697    #[must_use]
698    pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
699        let Some(quote) = self.quote(&position.instrument_id) else {
700            log::warn!(
701                "Cannot calculate unrealized PnL for {}, no quotes for {}",
702                position.id,
703                position.instrument_id
704            );
705            return None;
706        };
707
708        // Use exit price for mark-to-market: longs exit at bid, shorts exit at ask
709        let last = match position.side {
710            PositionSide::Flat | PositionSide::NoPositionSide => {
711                return Some(Money::new(0.0, position.settlement_currency));
712            }
713            PositionSide::Long => quote.bid_price,
714            PositionSide::Short => quote.ask_price,
715        };
716
717        Some(position.unrealized_pnl(last))
718    }
719
720    /// Checks integrity of data within the cache.
721    ///
722    /// All data should be loaded from the database prior to this call.
723    /// If an error is found then a log error message will also be produced.
724    ///
725    /// # Panics
726    ///
727    /// Panics if failure calling system clock.
728    #[must_use]
729    pub fn check_integrity(&mut self) -> bool {
730        let mut error_count = 0;
731        let failure = "Integrity failure";
732
733        // Get current timestamp in microseconds
734        let timestamp_us = SystemTime::now()
735            .duration_since(UNIX_EPOCH)
736            .expect("Time went backwards")
737            .as_micros();
738
739        log::info!("Checking data integrity");
740
741        // Check object caches
742        for account_id in self.accounts.keys() {
743            if !self
744                .index
745                .venue_account
746                .contains_key(&account_id.get_issuer())
747            {
748                log::error!(
749                    "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
750                );
751                error_count += 1;
752            }
753        }
754
755        for (client_order_id, order_cell) in &self.orders {
756            let order = order_cell.borrow();
757
758            if !self.index.order_strategy.contains_key(client_order_id) {
759                log::error!(
760                    "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
761                );
762                error_count += 1;
763            }
764
765            if !self.index.orders.contains(client_order_id) {
766                log::error!(
767                    "{failure} in orders: {client_order_id} not found in `self.index.orders`",
768                );
769                error_count += 1;
770            }
771
772            if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
773                log::error!(
774                    "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
775                );
776                error_count += 1;
777            }
778
779            if order.is_active_local() && !self.index.orders_active_local.contains(client_order_id)
780            {
781                log::error!(
782                    "{failure} in orders: {client_order_id} not found in `self.index.orders_active_local`",
783                );
784                error_count += 1;
785            }
786
787            if order.is_open() && !self.index.orders_open.contains(client_order_id) {
788                log::error!(
789                    "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
790                );
791                error_count += 1;
792            }
793
794            if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
795                log::error!(
796                    "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
797                );
798                error_count += 1;
799            }
800
801            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
802                if !self
803                    .index
804                    .exec_algorithm_orders
805                    .contains_key(&exec_algorithm_id)
806                {
807                    log::error!(
808                        "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
809                    );
810                    error_count += 1;
811                }
812
813                if order.exec_spawn_id().is_none()
814                    && !self.index.exec_spawn_orders.contains_key(client_order_id)
815                {
816                    log::error!(
817                        "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
818                    );
819                    error_count += 1;
820                }
821            }
822        }
823
824        for (position_id, position_cell) in &self.positions {
825            let position = position_cell.borrow();
826
827            if !self.index.position_strategy.contains_key(position_id) {
828                log::error!(
829                    "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
830                );
831                error_count += 1;
832            }
833
834            if !self.index.position_orders.contains_key(position_id) {
835                log::error!(
836                    "{failure} in positions: {position_id} not found in `self.index.position_orders`",
837                );
838                error_count += 1;
839            }
840
841            if !self.index.positions.contains(position_id) {
842                log::error!(
843                    "{failure} in positions: {position_id} not found in `self.index.positions`",
844                );
845                error_count += 1;
846            }
847
848            if position.is_open() && !self.index.positions_open.contains(position_id) {
849                log::error!(
850                    "{failure} in positions: {position_id} not found in `self.index.positions_open`",
851                );
852                error_count += 1;
853            }
854
855            if position.is_closed() && !self.index.positions_closed.contains(position_id) {
856                log::error!(
857                    "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
858                );
859                error_count += 1;
860            }
861        }
862
863        // Check indexes
864        for account_id in self.index.venue_account.values() {
865            if !self.accounts.contains_key(account_id) {
866                log::error!(
867                    "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
868                );
869                error_count += 1;
870            }
871        }
872
873        for client_order_id in self.index.venue_order_ids.values() {
874            if !self.orders.contains_key(client_order_id) {
875                log::error!(
876                    "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
877                );
878                error_count += 1;
879            }
880        }
881
882        for client_order_id in self.index.client_order_ids.keys() {
883            if !self.orders.contains_key(client_order_id) {
884                log::error!(
885                    "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
886                );
887                error_count += 1;
888            }
889        }
890
891        for client_order_id in self.index.order_position.keys() {
892            if !self.orders.contains_key(client_order_id) {
893                log::error!(
894                    "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
895                );
896                error_count += 1;
897            }
898        }
899
900        // Check indexes
901        for client_order_id in self.index.order_strategy.keys() {
902            if !self.orders.contains_key(client_order_id) {
903                log::error!(
904                    "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
905                );
906                error_count += 1;
907            }
908        }
909
910        for position_id in self.index.position_strategy.keys() {
911            if !self.positions.contains_key(position_id) {
912                log::error!(
913                    "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
914                );
915                error_count += 1;
916            }
917        }
918
919        for position_id in self.index.position_orders.keys() {
920            if !self.positions.contains_key(position_id) {
921                log::error!(
922                    "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
923                );
924                error_count += 1;
925            }
926        }
927
928        for (instrument_id, client_order_ids) in &self.index.instrument_orders {
929            for client_order_id in client_order_ids {
930                if !self.orders.contains_key(client_order_id) {
931                    log::error!(
932                        "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
933                    );
934                    error_count += 1;
935                }
936            }
937        }
938
939        for instrument_id in self.index.instrument_positions.keys() {
940            if !self.index.instrument_orders.contains_key(instrument_id) {
941                log::error!(
942                    "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
943                );
944                error_count += 1;
945            }
946        }
947
948        for client_order_ids in self.index.strategy_orders.values() {
949            for client_order_id in client_order_ids {
950                if !self.orders.contains_key(client_order_id) {
951                    log::error!(
952                        "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
953                    );
954                    error_count += 1;
955                }
956            }
957        }
958
959        for position_ids in self.index.strategy_positions.values() {
960            for position_id in position_ids {
961                if !self.positions.contains_key(position_id) {
962                    log::error!(
963                        "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
964                    );
965                    error_count += 1;
966                }
967            }
968        }
969
970        for client_order_id in &self.index.orders {
971            if !self.orders.contains_key(client_order_id) {
972                log::error!(
973                    "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
974                );
975                error_count += 1;
976            }
977        }
978
979        for client_order_id in &self.index.orders_emulated {
980            if !self.orders.contains_key(client_order_id) {
981                log::error!(
982                    "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
983                );
984                error_count += 1;
985            }
986        }
987
988        for client_order_id in &self.index.orders_active_local {
989            if !self.orders.contains_key(client_order_id) {
990                log::error!(
991                    "{failure} in `index.orders_active_local`: {client_order_id} not found in `self.orders`",
992                );
993                error_count += 1;
994            }
995        }
996
997        for client_order_id in &self.index.orders_inflight {
998            if !self.orders.contains_key(client_order_id) {
999                log::error!(
1000                    "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
1001                );
1002                error_count += 1;
1003            }
1004        }
1005
1006        for client_order_id in &self.index.orders_open {
1007            if !self.orders.contains_key(client_order_id) {
1008                log::error!(
1009                    "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
1010                );
1011                error_count += 1;
1012            }
1013        }
1014
1015        for client_order_id in &self.index.orders_closed {
1016            if !self.orders.contains_key(client_order_id) {
1017                log::error!(
1018                    "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
1019                );
1020                error_count += 1;
1021            }
1022        }
1023
1024        for position_id in &self.index.positions {
1025            if !self.positions.contains_key(position_id) {
1026                log::error!(
1027                    "{failure} in `index.positions`: {position_id} not found in `self.positions`",
1028                );
1029                error_count += 1;
1030            }
1031        }
1032
1033        for position_id in &self.index.positions_open {
1034            if !self.positions.contains_key(position_id) {
1035                log::error!(
1036                    "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
1037                );
1038                error_count += 1;
1039            }
1040        }
1041
1042        for position_id in &self.index.positions_closed {
1043            if !self.positions.contains_key(position_id) {
1044                log::error!(
1045                    "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
1046                );
1047                error_count += 1;
1048            }
1049        }
1050
1051        for strategy_id in &self.index.strategies {
1052            if !self.index.strategy_orders.contains_key(strategy_id) {
1053                log::error!(
1054                    "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
1055                );
1056                error_count += 1;
1057            }
1058        }
1059
1060        for exec_algorithm_id in &self.index.exec_algorithms {
1061            if !self
1062                .index
1063                .exec_algorithm_orders
1064                .contains_key(exec_algorithm_id)
1065            {
1066                log::error!(
1067                    "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
1068                );
1069                error_count += 1;
1070            }
1071        }
1072
1073        let total_us = SystemTime::now()
1074            .duration_since(UNIX_EPOCH)
1075            .expect("Time went backwards")
1076            .as_micros()
1077            - timestamp_us;
1078
1079        if error_count == 0 {
1080            log::info!("Integrity check passed in {total_us}μs");
1081            true
1082        } else {
1083            log::error!(
1084                "Integrity check failed with {error_count} error{} in {total_us}μs",
1085                if error_count == 1 { "" } else { "s" },
1086            );
1087            false
1088        }
1089    }
1090
1091    /// Checks for any residual open state and log warnings if any are found.
1092    ///
1093    ///'Open state' is considered to be open orders and open positions.
1094    #[must_use]
1095    pub fn check_residuals(&self) -> bool {
1096        log::debug!("Checking residuals");
1097
1098        let mut residuals = false;
1099
1100        // Check for any open orders
1101        for order in self.orders_open(None, None, None, None, None) {
1102            residuals = true;
1103            log::warn!("Residual {order}");
1104        }
1105
1106        // Check for any open positions
1107        for position in self.positions_open(None, None, None, None, None) {
1108            residuals = true;
1109            log::warn!("Residual {position}");
1110        }
1111
1112        residuals
1113    }
1114
1115    /// Purges all closed orders from the cache that are older than `buffer_secs`.
1116    ///
1117    ///
1118    /// Only orders that have been closed for at least this amount of time will be purged.
1119    /// A value of 0 means purge all closed orders regardless of when they were closed.
1120    pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
1121        log::debug!(
1122            "Purging closed orders{}",
1123            if buffer_secs > 0 {
1124                format!(" with buffer_secs={buffer_secs}")
1125            } else {
1126                String::new()
1127            }
1128        );
1129
1130        let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
1131
1132        let mut affected_order_list_ids: AHashSet<OrderListId> = AHashSet::new();
1133
1134        'outer: for client_order_id in self.index.orders_closed.clone() {
1135            let purge_target = self.orders.get(&client_order_id).and_then(|order_cell| {
1136                let order = order_cell.borrow();
1137                if order.is_closed()
1138                    && let Some(ts_closed) = order.ts_closed()
1139                    && ts_closed + buffer_ns <= ts_now
1140                {
1141                    let linked = order.linked_order_ids().map(<[_]>::to_vec);
1142                    let order_list_id = order.order_list_id();
1143                    Some((linked, order_list_id))
1144                } else {
1145                    None
1146                }
1147            });
1148
1149            let Some((linked, order_list_id)) = purge_target else {
1150                continue;
1151            };
1152
1153            // Check any linked orders (contingency orders)
1154            if let Some(linked_order_ids) = linked {
1155                for linked_order_id in &linked_order_ids {
1156                    if let Some(linked_order_cell) = self.orders.get(linked_order_id)
1157                        && linked_order_cell.borrow().is_open()
1158                    {
1159                        // Do not purge if linked order still open
1160                        continue 'outer;
1161                    }
1162                }
1163            }
1164
1165            if let Some(order_list_id) = order_list_id {
1166                affected_order_list_ids.insert(order_list_id);
1167            }
1168
1169            self.purge_order(client_order_id);
1170        }
1171
1172        for order_list_id in affected_order_list_ids {
1173            if let Some(order_list) = self.order_lists.get(&order_list_id) {
1174                let all_purged = order_list
1175                    .client_order_ids
1176                    .iter()
1177                    .all(|id| !self.orders.contains_key(id));
1178
1179                if all_purged {
1180                    self.order_lists.remove(&order_list_id);
1181                    log::info!("Purged {order_list_id}");
1182                }
1183            }
1184        }
1185    }
1186
1187    /// Purges all closed positions from the cache that are older than `buffer_secs`.
1188    pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
1189        log::debug!(
1190            "Purging closed positions{}",
1191            if buffer_secs > 0 {
1192                format!(" with buffer_secs={buffer_secs}")
1193            } else {
1194                String::new()
1195            }
1196        );
1197
1198        let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
1199
1200        for position_id in self.index.positions_closed.clone() {
1201            let should_purge = self.positions.get(&position_id).is_some_and(|cell| {
1202                let position = cell.borrow();
1203                position.is_closed()
1204                    && position
1205                        .ts_closed
1206                        .is_some_and(|ts_closed| ts_closed + buffer_ns <= ts_now)
1207            });
1208
1209            if should_purge {
1210                self.purge_position(position_id);
1211            }
1212        }
1213    }
1214
1215    /// Purges the order with the `client_order_id` from the cache (if found).
1216    ///
1217    /// For safety, an order is prevented from being purged if it's open.
1218    pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
1219        // Check if order exists and is safe to purge before removing
1220        let order_cell = self.orders.get(&client_order_id).cloned();
1221
1222        // Prevent purging open orders
1223        if let Some(ref order_cell) = order_cell
1224            && order_cell.borrow().is_open()
1225        {
1226            log::warn!("Order {client_order_id} found open when purging, skipping purge");
1227            return;
1228        }
1229
1230        // If order exists in cache, remove it and clean up order-specific indices
1231        if let Some(ref order_cell) = order_cell {
1232            let order = order_cell.borrow();
1233            // Safe to purge
1234            self.orders.remove(&client_order_id);
1235
1236            // Remove order from venue index
1237            if let Some(venue_orders) = self
1238                .index
1239                .venue_orders
1240                .get_mut(&order.instrument_id().venue)
1241            {
1242                venue_orders.remove(&client_order_id);
1243                if venue_orders.is_empty() {
1244                    self.index.venue_orders.remove(&order.instrument_id().venue);
1245                }
1246            }
1247
1248            // Remove venue order ID index if exists
1249            if let Some(venue_order_id) = order.venue_order_id() {
1250                self.index.venue_order_ids.remove(&venue_order_id);
1251            }
1252
1253            // Remove from instrument orders index
1254            if let Some(instrument_orders) =
1255                self.index.instrument_orders.get_mut(&order.instrument_id())
1256            {
1257                instrument_orders.remove(&client_order_id);
1258                if instrument_orders.is_empty() {
1259                    self.index.instrument_orders.remove(&order.instrument_id());
1260                }
1261            }
1262
1263            // Remove from position orders index if associated with a position
1264            if let Some(position_id) = order.position_id()
1265                && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
1266            {
1267                position_orders.remove(&client_order_id);
1268                if position_orders.is_empty() {
1269                    self.index.position_orders.remove(&position_id);
1270                }
1271            }
1272
1273            // Remove from exec algorithm orders index if it has an exec algorithm
1274            if let Some(exec_algorithm_id) = order.exec_algorithm_id()
1275                && let Some(exec_algorithm_orders) =
1276                    self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
1277            {
1278                exec_algorithm_orders.remove(&client_order_id);
1279                if exec_algorithm_orders.is_empty() {
1280                    self.index.exec_algorithm_orders.remove(&exec_algorithm_id);
1281                }
1282            }
1283
1284            // Clean up strategy orders reverse index
1285            if let Some(strategy_orders) = self.index.strategy_orders.get_mut(&order.strategy_id())
1286            {
1287                strategy_orders.remove(&client_order_id);
1288                if strategy_orders.is_empty() {
1289                    self.index.strategy_orders.remove(&order.strategy_id());
1290                }
1291            }
1292
1293            // Clean up account orders index
1294            if let Some(account_id) = order.account_id()
1295                && let Some(account_orders) = self.index.account_orders.get_mut(&account_id)
1296            {
1297                account_orders.remove(&client_order_id);
1298                if account_orders.is_empty() {
1299                    self.index.account_orders.remove(&account_id);
1300                }
1301            }
1302
1303            // Clean up exec spawn reverse index (if this order is a spawned child)
1304            if let Some(exec_spawn_id) = order.exec_spawn_id()
1305                && let Some(spawn_orders) = self.index.exec_spawn_orders.get_mut(&exec_spawn_id)
1306            {
1307                spawn_orders.remove(&client_order_id);
1308                if spawn_orders.is_empty() {
1309                    self.index.exec_spawn_orders.remove(&exec_spawn_id);
1310                }
1311            }
1312
1313            log::info!("Purged order {client_order_id}");
1314        } else {
1315            log::warn!("Order {client_order_id} not found when purging");
1316        }
1317
1318        // Always clean up order indices (even if order was not in cache)
1319        self.index.order_position.remove(&client_order_id);
1320        let strategy_id = self.index.order_strategy.remove(&client_order_id);
1321        self.index.order_client.remove(&client_order_id);
1322        self.index.client_order_ids.remove(&client_order_id);
1323
1324        // Clean up reverse index when order not in cache (using forward index)
1325        if let Some(strategy_id) = strategy_id
1326            && let Some(strategy_orders) = self.index.strategy_orders.get_mut(&strategy_id)
1327        {
1328            strategy_orders.remove(&client_order_id);
1329            if strategy_orders.is_empty() {
1330                self.index.strategy_orders.remove(&strategy_id);
1331            }
1332        }
1333
1334        // Remove spawn parent entry if this order was a spawn root
1335        self.index.exec_spawn_orders.remove(&client_order_id);
1336
1337        self.index.orders.remove(&client_order_id);
1338        self.index.orders_active_local.remove(&client_order_id);
1339        self.index.orders_open.remove(&client_order_id);
1340        self.index.orders_closed.remove(&client_order_id);
1341        self.index.orders_emulated.remove(&client_order_id);
1342        self.index.orders_inflight.remove(&client_order_id);
1343        self.index.orders_pending_cancel.remove(&client_order_id);
1344    }
1345
1346    /// Purges the position with the `position_id` from the cache (if found).
1347    ///
1348    /// For safety, a position is prevented from being purged if it's open.
1349    pub fn purge_position(&mut self, position_id: PositionId) {
1350        // Snapshot the position so we can release the borrow before mutating indexes.
1351        let position = self
1352            .positions
1353            .get(&position_id)
1354            .map(|cell| cell.borrow().clone());
1355
1356        // Prevent purging open positions
1357        if let Some(ref pos) = position
1358            && pos.is_open()
1359        {
1360            log::warn!("Position {position_id} found open when purging, skipping purge");
1361            return;
1362        }
1363
1364        // If position exists in cache, remove it and clean up position-specific indices
1365        if let Some(ref pos) = position {
1366            self.positions.remove(&position_id);
1367
1368            // Remove from venue positions index
1369            if let Some(venue_positions) =
1370                self.index.venue_positions.get_mut(&pos.instrument_id.venue)
1371            {
1372                venue_positions.remove(&position_id);
1373                if venue_positions.is_empty() {
1374                    self.index.venue_positions.remove(&pos.instrument_id.venue);
1375                }
1376            }
1377
1378            // Remove from instrument positions index
1379            if let Some(instrument_positions) =
1380                self.index.instrument_positions.get_mut(&pos.instrument_id)
1381            {
1382                instrument_positions.remove(&position_id);
1383                if instrument_positions.is_empty() {
1384                    self.index.instrument_positions.remove(&pos.instrument_id);
1385                }
1386            }
1387
1388            // Remove from strategy positions index
1389            if let Some(strategy_positions) =
1390                self.index.strategy_positions.get_mut(&pos.strategy_id)
1391            {
1392                strategy_positions.remove(&position_id);
1393                if strategy_positions.is_empty() {
1394                    self.index.strategy_positions.remove(&pos.strategy_id);
1395                }
1396            }
1397
1398            // Remove from account positions index
1399            if let Some(account_positions) = self.index.account_positions.get_mut(&pos.account_id) {
1400                account_positions.remove(&position_id);
1401                if account_positions.is_empty() {
1402                    self.index.account_positions.remove(&pos.account_id);
1403                }
1404            }
1405
1406            // Remove position ID from orders that reference it
1407            for client_order_id in pos.client_order_ids() {
1408                self.index.order_position.remove(&client_order_id);
1409            }
1410
1411            log::info!("Purged position {position_id}");
1412        } else {
1413            log::warn!("Position {position_id} not found when purging");
1414        }
1415
1416        // Always clean up position indices (even if position not in cache)
1417        self.index.position_strategy.remove(&position_id);
1418        self.index.position_orders.remove(&position_id);
1419        self.index.positions.remove(&position_id);
1420        self.index.positions_open.remove(&position_id);
1421        self.index.positions_closed.remove(&position_id);
1422
1423        // Always clean up position snapshots (even if position not in cache)
1424        self.position_snapshots.remove(&position_id);
1425    }
1426
1427    /// Purges the instrument with the `instrument_id` from the cache (if found).
1428    ///
1429    /// All cache-owned data keyed by the instrument is removed: the instrument record,
1430    /// any synthetic with the same id, order book and own-order-book state, quote/trade
1431    /// histories, mark/index/funding price histories, instrument status, bars for any
1432    /// `BarType` referencing the instrument, and the `instrument_orders` /
1433    /// `instrument_positions` index entries.
1434    ///
1435    /// For safety, an instrument is prevented from being purged while any associated
1436    /// order is non-terminal (anything not in `orders_closed`, including
1437    /// initialized, submitted, accepted, emulated, released, or inflight states) or
1438    /// any associated position is non-closed.
1439    ///
1440    /// Active subscriptions and other live data-engine state are not touched here;
1441    /// those belong to the data and execution engines.
1442    ///
1443    /// # Warning
1444    ///
1445    /// Intended for actors and strategies that have their own lifecycle logic for
1446    /// deciding when an instrument is no longer needed. Purging an instrument that any
1447    /// other actor, strategy, or engine still relies on may cause incorrect behavior
1448    /// (missing instrument lookups, lost market-data history). The caller is
1449    /// responsible for ensuring the instrument is no longer in use before purging.
1450    pub fn purge_instrument(&mut self, instrument_id: InstrumentId) {
1451        #[cfg(feature = "defi")]
1452        let defi_found = self.defi.pools.contains_key(&instrument_id)
1453            || self.defi.pool_profilers.contains_key(&instrument_id);
1454        #[cfg(not(feature = "defi"))]
1455        let defi_found = false;
1456
1457        let found = self.instruments.contains_key(&instrument_id)
1458            || self.synthetics.contains_key(&instrument_id)
1459            || defi_found;
1460
1461        if !found {
1462            log::warn!("Instrument {instrument_id} not found when purging");
1463            return;
1464        }
1465
1466        if let Some(orders) = self.index.instrument_orders.get(&instrument_id) {
1467            let has_non_terminal = orders
1468                .iter()
1469                .any(|client_order_id| !self.index.orders_closed.contains(client_order_id));
1470
1471            if has_non_terminal {
1472                log::warn!(
1473                    "Instrument {instrument_id} has non-terminal orders when purging, skipping purge"
1474                );
1475                return;
1476            }
1477        }
1478
1479        if let Some(positions) = self.index.instrument_positions.get(&instrument_id) {
1480            let has_non_closed = positions
1481                .iter()
1482                .any(|position_id| !self.index.positions_closed.contains(position_id));
1483
1484            if has_non_closed {
1485                log::warn!(
1486                    "Instrument {instrument_id} has non-closed positions when purging, skipping purge"
1487                );
1488                return;
1489            }
1490        }
1491
1492        self.instruments.remove(&instrument_id);
1493        self.synthetics.remove(&instrument_id);
1494        self.books.remove(&instrument_id);
1495        self.own_books.remove(&instrument_id);
1496        self.quotes.remove(&instrument_id);
1497        self.trades.remove(&instrument_id);
1498        self.mark_prices.remove(&instrument_id);
1499        self.index_prices.remove(&instrument_id);
1500        self.funding_rates.remove(&instrument_id);
1501        self.instrument_statuses.remove(&instrument_id);
1502        self.greeks.remove(&instrument_id);
1503        self.option_greeks.remove(&instrument_id);
1504
1505        self.bars
1506            .retain(|bar_type, _| bar_type.instrument_id() != instrument_id);
1507
1508        #[cfg(feature = "defi")]
1509        {
1510            self.defi.pools.remove(&instrument_id);
1511            self.defi.pool_profilers.remove(&instrument_id);
1512        }
1513
1514        self.index.instrument_orders.remove(&instrument_id);
1515        self.index.instrument_positions.remove(&instrument_id);
1516
1517        log::info!("Purged instrument {instrument_id}");
1518    }
1519
1520    /// Purges all account state events which are outside the lookback window.
1521    ///
1522    /// Only events which are outside the lookback window will be purged.
1523    /// A value of 0 means purge all account state events.
1524    pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1525        log::debug!(
1526            "Purging account events{}",
1527            if lookback_secs > 0 {
1528                format!(" with lookback_secs={lookback_secs}")
1529            } else {
1530                String::new()
1531            }
1532        );
1533
1534        for account_cell in self.accounts.values() {
1535            let mut account = account_cell.borrow_mut();
1536            let event_count = account.event_count();
1537            account.purge_account_events(ts_now, lookback_secs);
1538            let count_diff = event_count - account.event_count();
1539            if count_diff > 0 {
1540                log::info!(
1541                    "Purged {} event(s) from account {}",
1542                    count_diff,
1543                    account.id()
1544                );
1545            }
1546        }
1547    }
1548
1549    /// Clears the caches index.
1550    pub fn clear_index(&mut self) {
1551        self.index.clear();
1552        log::debug!("Cleared index");
1553    }
1554
1555    /// Resets the cache.
1556    ///
1557    /// All stateful fields are reset to their initial value. Instruments,
1558    /// currencies and synthetics are retained when `drop_instruments_on_reset`
1559    /// is `false` so that repeated backtest runs can reuse the same dataset.
1560    pub fn reset(&mut self) {
1561        log::debug!("Resetting cache");
1562
1563        self.general.clear();
1564        self.books.clear();
1565        self.own_books.clear();
1566        self.quotes.clear();
1567        self.trades.clear();
1568        self.mark_xrates.clear();
1569        self.mark_prices.clear();
1570        self.index_prices.clear();
1571        self.funding_rates.clear();
1572        self.instrument_statuses.clear();
1573        self.bars.clear();
1574        self.accounts.clear();
1575        self.orders.clear();
1576        self.order_lists.clear();
1577        self.positions.clear();
1578        self.position_snapshots.clear();
1579        self.greeks.clear();
1580        self.yield_curves.clear();
1581
1582        if self.config.drop_instruments_on_reset {
1583            self.currencies.clear();
1584            self.instruments.clear();
1585            self.synthetics.clear();
1586        }
1587
1588        #[cfg(feature = "defi")]
1589        {
1590            self.defi.pools.clear();
1591            self.defi.pool_profilers.clear();
1592        }
1593
1594        self.clear_index();
1595
1596        log::info!("Reset cache");
1597    }
1598
1599    /// Dispose of the cache which will close any underlying database adapter.
1600    ///
1601    /// If closing the database connection fails, an error is logged.
1602    pub fn dispose(&mut self) {
1603        self.reset();
1604
1605        if let Some(database) = &mut self.database
1606            && let Err(e) = database.close()
1607        {
1608            log::error!("Failed to close database during dispose: {e}");
1609        }
1610    }
1611
1612    /// Flushes the caches database which permanently removes all persisted data.
1613    ///
1614    /// If flushing the database connection fails, an error is logged.
1615    pub fn flush_db(&mut self) {
1616        if let Some(database) = &mut self.database
1617            && let Err(e) = database.flush()
1618        {
1619            log::error!("Failed to flush database: {e}");
1620        }
1621    }
1622
1623    /// Adds a raw bytes `value` to the cache under the `key`.
1624    ///
1625    /// The cache stores only raw bytes; interpretation is the caller's responsibility.
1626    ///
1627    /// # Errors
1628    ///
1629    /// Returns an error if persisting the entry to the backing database fails.
1630    pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1631        check_valid_string_ascii(key, stringify!(key))?;
1632        check_predicate_false(value.is_empty(), stringify!(value))?;
1633
1634        log::debug!("Adding general {key}");
1635        self.general.insert(key.to_string(), value.clone());
1636
1637        if let Some(database) = &mut self.database {
1638            database.add(key.to_string(), value)?;
1639        }
1640        Ok(())
1641    }
1642
1643    /// Adds an `OrderBook` to the cache.
1644    ///
1645    /// # Errors
1646    ///
1647    /// Returns an error if persisting the order book to the backing database fails.
1648    pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1649        log::debug!("Adding `OrderBook` {}", book.instrument_id);
1650
1651        if self.config.save_market_data
1652            && let Some(database) = &mut self.database
1653        {
1654            database.add_order_book(&book)?;
1655        }
1656
1657        self.books.insert(book.instrument_id, book);
1658        Ok(())
1659    }
1660
1661    /// Adds an `OwnOrderBook` to the cache.
1662    ///
1663    /// # Errors
1664    ///
1665    /// Returns an error if persisting the own order book fails.
1666    pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1667        log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1668
1669        self.own_books.insert(own_book.instrument_id, own_book);
1670        Ok(())
1671    }
1672
1673    /// Adds the `mark_price` update to the cache.
1674    ///
1675    /// # Errors
1676    ///
1677    /// Returns an error if persisting the mark price to the backing database fails.
1678    pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1679        log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1680
1681        if self.config.save_market_data {
1682            // TODO: Placeholder and return Result for consistency
1683        }
1684
1685        let mark_prices_deque = self
1686            .mark_prices
1687            .entry(mark_price.instrument_id)
1688            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1689        mark_prices_deque.push_front(mark_price);
1690        Ok(())
1691    }
1692
1693    /// Adds the `index_price` update to the cache.
1694    ///
1695    /// # Errors
1696    ///
1697    /// Returns an error if persisting the index price to the backing database fails.
1698    pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1699        log::debug!(
1700            "Adding `IndexPriceUpdate` for {}",
1701            index_price.instrument_id
1702        );
1703
1704        if self.config.save_market_data {
1705            // TODO: Placeholder and return Result for consistency
1706        }
1707
1708        let index_prices_deque = self
1709            .index_prices
1710            .entry(index_price.instrument_id)
1711            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1712        index_prices_deque.push_front(index_price);
1713        Ok(())
1714    }
1715
1716    /// Adds the `funding_rate` update to the cache.
1717    ///
1718    /// # Errors
1719    ///
1720    /// Returns an error if persisting the funding rate update to the backing database fails.
1721    pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
1722        log::debug!(
1723            "Adding `FundingRateUpdate` for {}",
1724            funding_rate.instrument_id
1725        );
1726
1727        if self.config.save_market_data {
1728            // TODO: Placeholder and return Result for consistency
1729        }
1730
1731        let funding_rates_deque = self
1732            .funding_rates
1733            .entry(funding_rate.instrument_id)
1734            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1735        funding_rates_deque.push_front(funding_rate);
1736        Ok(())
1737    }
1738
1739    /// Adds the given `funding rates` to the cache.
1740    ///
1741    /// # Errors
1742    ///
1743    /// Returns an error if persisting the trade ticks to the backing database fails.
1744    pub fn add_funding_rates(&mut self, funding_rates: &[FundingRateUpdate]) -> anyhow::Result<()> {
1745        check_slice_not_empty(funding_rates, stringify!(funding_rates))?;
1746
1747        let instrument_id = funding_rates[0].instrument_id;
1748        log::debug!(
1749            "Adding `FundingRateUpdate`[{}] {instrument_id}",
1750            funding_rates.len()
1751        );
1752
1753        if self.config.save_market_data
1754            && let Some(database) = &mut self.database
1755        {
1756            for funding_rate in funding_rates {
1757                database.add_funding_rate(funding_rate)?;
1758            }
1759        }
1760
1761        let funding_rate_deque = self
1762            .funding_rates
1763            .entry(instrument_id)
1764            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1765
1766        for funding_rate in funding_rates {
1767            funding_rate_deque.push_front(*funding_rate);
1768        }
1769        Ok(())
1770    }
1771
1772    /// Adds the `instrument_status` update to the cache.
1773    ///
1774    /// # Errors
1775    ///
1776    /// Returns an error if persisting the instrument status to the backing database fails.
1777    pub fn add_instrument_status(&mut self, status: InstrumentStatus) -> anyhow::Result<()> {
1778        log::debug!("Adding `InstrumentStatus` for {}", status.instrument_id);
1779
1780        if self.config.save_market_data {
1781            // TODO: Placeholder and return Result for consistency
1782        }
1783
1784        let statuses_deque = self
1785            .instrument_statuses
1786            .entry(status.instrument_id)
1787            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1788        statuses_deque.push_front(status);
1789        Ok(())
1790    }
1791
1792    /// Adds the `quote` tick to the cache.
1793    ///
1794    /// # Errors
1795    ///
1796    /// Returns an error if persisting the quote tick to the backing database fails.
1797    pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1798        log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1799
1800        if self.config.save_market_data
1801            && let Some(database) = &mut self.database
1802        {
1803            database.add_quote(&quote)?;
1804        }
1805
1806        let quotes_deque = self
1807            .quotes
1808            .entry(quote.instrument_id)
1809            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1810        quotes_deque.push_front(quote);
1811        Ok(())
1812    }
1813
1814    /// Adds the `quotes` to the cache.
1815    ///
1816    /// # Errors
1817    ///
1818    /// Returns an error if persisting the quote ticks to the backing database fails.
1819    pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1820        check_slice_not_empty(quotes, stringify!(quotes))?;
1821
1822        let instrument_id = quotes[0].instrument_id;
1823        log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1824
1825        if self.config.save_market_data
1826            && let Some(database) = &mut self.database
1827        {
1828            for quote in quotes {
1829                database.add_quote(quote)?;
1830            }
1831        }
1832
1833        let quotes_deque = self
1834            .quotes
1835            .entry(instrument_id)
1836            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1837
1838        for quote in quotes {
1839            quotes_deque.push_front(*quote);
1840        }
1841        Ok(())
1842    }
1843
1844    /// Adds the `trade` tick to the cache.
1845    ///
1846    /// # Errors
1847    ///
1848    /// Returns an error if persisting the trade tick to the backing database fails.
1849    pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1850        log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1851
1852        if self.config.save_market_data
1853            && let Some(database) = &mut self.database
1854        {
1855            database.add_trade(&trade)?;
1856        }
1857
1858        let trades_deque = self
1859            .trades
1860            .entry(trade.instrument_id)
1861            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1862        trades_deque.push_front(trade);
1863        Ok(())
1864    }
1865
1866    /// Adds the give `trades` to the cache.
1867    ///
1868    /// # Errors
1869    ///
1870    /// Returns an error if persisting the trade ticks to the backing database fails.
1871    pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1872        check_slice_not_empty(trades, stringify!(trades))?;
1873
1874        let instrument_id = trades[0].instrument_id;
1875        log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1876
1877        if self.config.save_market_data
1878            && let Some(database) = &mut self.database
1879        {
1880            for trade in trades {
1881                database.add_trade(trade)?;
1882            }
1883        }
1884
1885        let trades_deque = self
1886            .trades
1887            .entry(instrument_id)
1888            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1889
1890        for trade in trades {
1891            trades_deque.push_front(*trade);
1892        }
1893        Ok(())
1894    }
1895
1896    /// Adds the `bar` to the cache.
1897    ///
1898    /// # Errors
1899    ///
1900    /// Returns an error if persisting the bar to the backing database fails.
1901    pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1902        log::debug!("Adding `Bar` {}", bar.bar_type);
1903
1904        if self.config.save_market_data
1905            && let Some(database) = &mut self.database
1906        {
1907            database.add_bar(&bar)?;
1908        }
1909
1910        let bars = self
1911            .bars
1912            .entry(bar.bar_type)
1913            .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1914        bars.push_front(bar);
1915        Ok(())
1916    }
1917
1918    /// Adds the `bars` to the cache.
1919    ///
1920    /// # Errors
1921    ///
1922    /// Returns an error if persisting the bars to the backing database fails.
1923    pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1924        check_slice_not_empty(bars, stringify!(bars))?;
1925
1926        let bar_type = bars[0].bar_type;
1927        log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1928
1929        if self.config.save_market_data
1930            && let Some(database) = &mut self.database
1931        {
1932            for bar in bars {
1933                database.add_bar(bar)?;
1934            }
1935        }
1936
1937        let bars_deque = self
1938            .bars
1939            .entry(bar_type)
1940            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1941
1942        for bar in bars {
1943            bars_deque.push_front(*bar);
1944        }
1945        Ok(())
1946    }
1947
1948    /// Adds the `greeks` data to the cache.
1949    ///
1950    /// # Errors
1951    ///
1952    /// Returns an error if persisting the greeks data to the backing database fails.
1953    pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1954        log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1955
1956        if self.config.save_market_data
1957            && let Some(_database) = &mut self.database
1958        {
1959            // TODO: Implement database.add_greeks(&greeks) when database adapter is updated
1960        }
1961
1962        self.greeks.insert(greeks.instrument_id, greeks);
1963        Ok(())
1964    }
1965
1966    /// Gets the greeks data for the `instrument_id`.
1967    pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1968        self.greeks.get(instrument_id).cloned()
1969    }
1970
1971    /// Adds exchange-provided option greeks to the cache.
1972    pub fn add_option_greeks(&mut self, greeks: OptionGreeks) {
1973        log::debug!("Adding `OptionGreeks` {}", greeks.instrument_id);
1974        self.option_greeks.insert(greeks.instrument_id, greeks);
1975    }
1976
1977    /// Gets a reference to the exchange-provided option greeks for the `instrument_id`.
1978    #[must_use]
1979    pub fn option_greeks(&self, instrument_id: &InstrumentId) -> Option<&OptionGreeks> {
1980        self.option_greeks.get(instrument_id)
1981    }
1982
1983    /// Adds the `yield_curve` data to the cache.
1984    ///
1985    /// # Errors
1986    ///
1987    /// Returns an error if persisting the yield curve data to the backing database fails.
1988    pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1989        log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1990
1991        if self.config.save_market_data
1992            && let Some(_database) = &mut self.database
1993        {
1994            // TODO: Implement database.add_yield_curve(&yield_curve) when database adapter is updated
1995        }
1996
1997        self.yield_curves
1998            .insert(yield_curve.curve_name.clone(), yield_curve);
1999        Ok(())
2000    }
2001
2002    /// Gets the yield curve for the `key`.
2003    pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
2004        self.yield_curves.get(key).map(|curve| {
2005            let curve_clone = curve.clone();
2006            Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
2007                as Box<dyn Fn(f64) -> f64>
2008        })
2009    }
2010
2011    /// Adds the `currency` to the cache.
2012    ///
2013    /// # Errors
2014    ///
2015    /// Returns an error if persisting the currency to the backing database fails.
2016    pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
2017        if self.currencies.contains_key(&currency.code) {
2018            return Ok(());
2019        }
2020        log::debug!("Adding `Currency` {}", currency.code);
2021
2022        if let Some(database) = &mut self.database {
2023            database.add_currency(&currency)?;
2024        }
2025
2026        self.currencies.insert(currency.code, currency);
2027        Ok(())
2028    }
2029
2030    /// Adds the `instrument` to the cache.
2031    ///
2032    /// # Errors
2033    ///
2034    /// Returns an error if persisting the instrument to the backing database fails.
2035    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
2036        log::debug!("Adding `Instrument` {}", instrument.id());
2037
2038        // Ensure currencies exist in cache - safe to call repeatedly as add_currency is idempotent
2039        if let Some(base_currency) = instrument.base_currency() {
2040            self.add_currency(base_currency)?;
2041        }
2042        self.add_currency(instrument.quote_currency())?;
2043        self.add_currency(instrument.settlement_currency())?;
2044
2045        if let Some(database) = &mut self.database {
2046            database.add_instrument(&instrument)?;
2047        }
2048
2049        self.instruments.insert(instrument.id(), instrument);
2050        Ok(())
2051    }
2052
2053    /// Adds the `synthetic` instrument to the cache.
2054    ///
2055    /// # Errors
2056    ///
2057    /// Returns an error if persisting the synthetic instrument to the backing database fails.
2058    pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
2059        log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
2060
2061        if let Some(database) = &mut self.database {
2062            database.add_synthetic(&synthetic)?;
2063        }
2064
2065        self.synthetics.insert(synthetic.id, synthetic);
2066        Ok(())
2067    }
2068
2069    /// Adds the `account` to the cache.
2070    ///
2071    /// # Errors
2072    ///
2073    /// Returns an error if persisting the account to the backing database fails.
2074    pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
2075        log::debug!("Adding `Account` {}", account.id());
2076
2077        if let Some(database) = &mut self.database {
2078            database.add_account(&account)?;
2079        }
2080
2081        let account_id = account.id();
2082        self.accounts.insert(account_id, SharedCell::new(account));
2083        self.index
2084            .venue_account
2085            .insert(account_id.get_issuer(), account_id);
2086        Ok(())
2087    }
2088
2089    /// Indexes the `client_order_id` with the `venue_order_id`.
2090    ///
2091    /// The `overwrite` parameter determines whether to overwrite any existing cached identifier.
2092    ///
2093    /// # Errors
2094    ///
2095    /// Returns an error if the existing venue order ID conflicts and overwrite is false.
2096    pub fn add_venue_order_id(
2097        &mut self,
2098        client_order_id: &ClientOrderId,
2099        venue_order_id: &VenueOrderId,
2100        overwrite: bool,
2101    ) -> anyhow::Result<()> {
2102        if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
2103            && !overwrite
2104            && existing_venue_order_id != venue_order_id
2105        {
2106            anyhow::bail!(
2107                "Existing {existing_venue_order_id} for {client_order_id}
2108                    did not match the given {venue_order_id}.
2109                    If you are writing a test then try a different `venue_order_id`,
2110                    otherwise this is probably a bug."
2111            );
2112        }
2113
2114        self.index
2115            .client_order_ids
2116            .insert(*client_order_id, *venue_order_id);
2117        self.index
2118            .venue_order_ids
2119            .insert(*venue_order_id, *client_order_id);
2120
2121        Ok(())
2122    }
2123
2124    /// Adds the `order` to the cache indexed with any given identifiers.
2125    ///
2126    /// # Parameters
2127    ///
2128    /// `override_existing`: If the added order should 'override' any existing order and replace
2129    /// it in the cache. This is currently used for emulated orders which are
2130    /// being released and transformed into another type.
2131    ///
2132    /// # Errors
2133    ///
2134    /// Returns an error if not `replace_existing` and the `order.client_order_id` is already contained in the cache.
2135    pub fn add_order(
2136        &mut self,
2137        order: OrderAny,
2138        position_id: Option<PositionId>,
2139        client_id: Option<ClientId>,
2140        replace_existing: bool,
2141    ) -> anyhow::Result<()> {
2142        let instrument_id = order.instrument_id();
2143        let venue = instrument_id.venue;
2144        let client_order_id = order.client_order_id();
2145        let strategy_id = order.strategy_id();
2146        let exec_algorithm_id = order.exec_algorithm_id();
2147        let exec_spawn_id = order.exec_spawn_id();
2148
2149        if !replace_existing {
2150            check_key_not_in_map(
2151                &client_order_id,
2152                &self.orders,
2153                stringify!(client_order_id),
2154                stringify!(orders),
2155            )?;
2156        }
2157
2158        log::debug!("Adding {order:?}");
2159
2160        self.index.orders.insert(client_order_id);
2161
2162        if order.is_active_local() {
2163            self.index.orders_active_local.insert(client_order_id);
2164        }
2165        self.index
2166            .order_strategy
2167            .insert(client_order_id, strategy_id);
2168        self.index.strategies.insert(strategy_id);
2169
2170        // Update venue -> orders index
2171        self.index
2172            .venue_orders
2173            .entry(venue)
2174            .or_default()
2175            .insert(client_order_id);
2176
2177        // Update instrument -> orders index
2178        self.index
2179            .instrument_orders
2180            .entry(instrument_id)
2181            .or_default()
2182            .insert(client_order_id);
2183
2184        // Update strategy -> orders index
2185        self.index
2186            .strategy_orders
2187            .entry(strategy_id)
2188            .or_default()
2189            .insert(client_order_id);
2190
2191        // Update account -> orders index (if account_id known at creation)
2192        if let Some(account_id) = order.account_id() {
2193            self.index
2194                .account_orders
2195                .entry(account_id)
2196                .or_default()
2197                .insert(client_order_id);
2198        }
2199
2200        // Update exec_algorithm -> orders index
2201        if let Some(exec_algorithm_id) = exec_algorithm_id {
2202            self.index.exec_algorithms.insert(exec_algorithm_id);
2203
2204            self.index
2205                .exec_algorithm_orders
2206                .entry(exec_algorithm_id)
2207                .or_default()
2208                .insert(client_order_id);
2209        }
2210
2211        // Update exec_spawn -> orders index
2212        if let Some(exec_spawn_id) = exec_spawn_id {
2213            self.index
2214                .exec_spawn_orders
2215                .entry(exec_spawn_id)
2216                .or_default()
2217                .insert(client_order_id);
2218        }
2219
2220        // Update emulation index
2221        if let Some(emulation_trigger) = order.emulation_trigger()
2222            && emulation_trigger != TriggerType::NoTrigger
2223        {
2224            self.index.orders_emulated.insert(client_order_id);
2225        }
2226
2227        // Index position ID if provided
2228        if let Some(position_id) = position_id {
2229            self.add_position_id(
2230                &position_id,
2231                &order.instrument_id().venue,
2232                &client_order_id,
2233                &strategy_id,
2234            )?;
2235        }
2236
2237        // Index client ID if provided
2238        if let Some(client_id) = client_id {
2239            self.index.order_client.insert(client_order_id, client_id);
2240            log::debug!("Indexed {client_id:?}");
2241        }
2242
2243        if let Some(database) = &mut self.database {
2244            database.add_order(&order, client_id)?;
2245            // TODO: Implement
2246            // if self.config.snapshot_orders {
2247            //     database.snapshot_order_state(order)?;
2248            // }
2249        }
2250
2251        match self.orders.get(&client_order_id) {
2252            // Reuse the existing cell on replace so the canonical entry stays in place
2253            // rather than orphaning a stale cell.
2254            Some(order_cell) => *order_cell.borrow_mut() = order,
2255            None => {
2256                self.orders.insert(client_order_id, SharedCell::new(order));
2257            }
2258        }
2259
2260        Ok(())
2261    }
2262
2263    /// Adds the `order_list` to the cache.
2264    ///
2265    /// # Errors
2266    ///
2267    /// Returns an error if the order list ID is already contained in the cache.
2268    pub fn add_order_list(&mut self, order_list: OrderList) -> anyhow::Result<()> {
2269        let order_list_id = order_list.id;
2270        check_key_not_in_map(
2271            &order_list_id,
2272            &self.order_lists,
2273            stringify!(order_list_id),
2274            stringify!(order_lists),
2275        )?;
2276
2277        log::debug!("Adding {order_list:?}");
2278        self.order_lists.insert(order_list_id, order_list);
2279        Ok(())
2280    }
2281
2282    /// Indexes the `position_id` with the other given IDs.
2283    ///
2284    /// # Errors
2285    ///
2286    /// Returns an error if indexing position ID in the backing database fails.
2287    pub fn add_position_id(
2288        &mut self,
2289        position_id: &PositionId,
2290        venue: &Venue,
2291        client_order_id: &ClientOrderId,
2292        strategy_id: &StrategyId,
2293    ) -> anyhow::Result<()> {
2294        self.index
2295            .order_position
2296            .insert(*client_order_id, *position_id);
2297
2298        // Index: ClientOrderId -> PositionId
2299        if let Some(database) = &mut self.database {
2300            database.index_order_position(*client_order_id, *position_id)?;
2301        }
2302
2303        // Index: PositionId -> StrategyId
2304        self.index
2305            .position_strategy
2306            .insert(*position_id, *strategy_id);
2307
2308        // Index: PositionId -> set[ClientOrderId]
2309        self.index
2310            .position_orders
2311            .entry(*position_id)
2312            .or_default()
2313            .insert(*client_order_id);
2314
2315        // Index: StrategyId -> set[PositionId]
2316        self.index
2317            .strategy_positions
2318            .entry(*strategy_id)
2319            .or_default()
2320            .insert(*position_id);
2321
2322        // Index: Venue -> set[PositionId]
2323        self.index
2324            .venue_positions
2325            .entry(*venue)
2326            .or_default()
2327            .insert(*position_id);
2328
2329        Ok(())
2330    }
2331
2332    // Propagates parent OTO `position_id` to contingent children that are missing one.
2333    //
2334    // Recovers from a partial-write window during fill handling: the fill-time path in the
2335    // execution engine assigns `position_id` to each contingent child in a non-atomic loop
2336    // (`set_position_id` then `add_position_id`), so a crash mid-loop can leave the database
2337    // with the parent updated and some children un-updated. This pass re-applies any missing
2338    // assignments after load. Mirrors the Cython behaviour at
2339    // `nautilus_trader/cache/cache.pyx::_assign_position_id_to_contingencies`.
2340    fn assign_position_ids_to_contingencies(&mut self) {
2341        let mut assignments: Vec<(PositionId, ClientOrderId)> = Vec::new();
2342
2343        for parent_order_cell in self.orders.values() {
2344            let parent = parent_order_cell.borrow();
2345            if parent.contingency_type() != Some(ContingencyType::Oto) {
2346                continue;
2347            }
2348            let Some(parent_position_id) = parent.position_id() else {
2349                continue;
2350            };
2351            let Some(linked_order_ids) = parent.linked_order_ids() else {
2352                continue;
2353            };
2354
2355            for client_order_id in linked_order_ids {
2356                match self.orders.get(client_order_id) {
2357                    None => {
2358                        log::error!("Contingency order {client_order_id} not found");
2359                    }
2360                    Some(contingent_order_cell) => {
2361                        if contingent_order_cell.borrow().position_id().is_none() {
2362                            assignments.push((parent_position_id, *client_order_id));
2363                        }
2364                    }
2365                }
2366            }
2367        }
2368
2369        for (position_id, client_order_id) in assignments {
2370            let Some((venue, strategy_id)) = self.orders.get(&client_order_id).map(|order_cell| {
2371                let mut contingent = order_cell.borrow_mut();
2372                contingent.set_position_id(Some(position_id));
2373                (contingent.instrument_id().venue, contingent.strategy_id())
2374            }) else {
2375                continue;
2376            };
2377
2378            // In-memory index updates only. The persistent index entry (if any) was written by
2379            // the original fill-time `add_position_id` call; replaying the database write here
2380            // would invoke `CacheDatabaseAdapter::index_order_position`, which is currently
2381            // `todo!()` on both the Redis and SQL adapters. Until those land, the load-time
2382            // recovery is in-memory-only: sufficient for the current process to operate, but
2383            // not durable across another restart.
2384            self.index
2385                .order_position
2386                .insert(client_order_id, position_id);
2387            self.index
2388                .position_strategy
2389                .insert(position_id, strategy_id);
2390            self.index
2391                .position_orders
2392                .entry(position_id)
2393                .or_default()
2394                .insert(client_order_id);
2395            self.index
2396                .strategy_positions
2397                .entry(strategy_id)
2398                .or_default()
2399                .insert(position_id);
2400            self.index
2401                .venue_positions
2402                .entry(venue)
2403                .or_default()
2404                .insert(position_id);
2405        }
2406    }
2407
2408    /// Adds the `position` to the cache.
2409    ///
2410    /// # Errors
2411    ///
2412    /// Returns an error if persisting the position to the backing database fails.
2413    pub fn add_position(&mut self, position: &Position, _oms_type: OmsType) -> anyhow::Result<()> {
2414        self.positions
2415            .insert(position.id, SharedCell::new(position.clone()));
2416        self.index.positions.insert(position.id);
2417        self.index.positions_open.insert(position.id);
2418        self.index.positions_closed.remove(&position.id); // Cleanup for NETTING reopen
2419
2420        log::debug!("Adding {position}");
2421
2422        self.add_position_id(
2423            &position.id,
2424            &position.instrument_id.venue,
2425            &position.opening_order_id,
2426            &position.strategy_id,
2427        )?;
2428
2429        let venue = position.instrument_id.venue;
2430        let venue_positions = self.index.venue_positions.entry(venue).or_default();
2431        venue_positions.insert(position.id);
2432
2433        // Index: InstrumentId -> AHashSet
2434        let instrument_id = position.instrument_id;
2435        let instrument_positions = self
2436            .index
2437            .instrument_positions
2438            .entry(instrument_id)
2439            .or_default();
2440        instrument_positions.insert(position.id);
2441
2442        // Index: AccountId -> AHashSet<PositionId>
2443        self.index
2444            .account_positions
2445            .entry(position.account_id)
2446            .or_default()
2447            .insert(position.id);
2448
2449        if let Some(database) = &mut self.database {
2450            database.add_position(position)?;
2451            // TODO: Implement position snapshots
2452            // if self.snapshot_positions {
2453            //     database.snapshot_position_state(
2454            //         position,
2455            //         position.ts_last,
2456            //         self.calculate_unrealized_pnl(&position),
2457            //     )?;
2458            // }
2459        }
2460
2461        Ok(())
2462    }
2463
2464    /// Updates the `account` in the cache.
2465    ///
2466    /// Reuses the existing cell when present so any held [`AccountRef`] handles continue to point
2467    /// at the canonical entry; only inserts a new cell when the account is unknown.
2468    ///
2469    /// # Errors
2470    ///
2471    /// Returns an error if updating the account in the database fails.
2472    pub fn update_account(&mut self, account: &AccountAny) -> anyhow::Result<()> {
2473        let account_id = account.id();
2474        match self.accounts.get(&account_id) {
2475            Some(account_cell) => *account_cell.borrow_mut() = account.clone(),
2476            None => {
2477                self.accounts
2478                    .insert(account_id, SharedCell::new(account.clone()));
2479            }
2480        }
2481
2482        if let Some(database) = &mut self.database {
2483            database.update_account(account)?;
2484        }
2485        Ok(())
2486    }
2487
2488    /// Removes the `account` from the cache and returns it.
2489    ///
2490    /// This supports hot paths which need owned account mutation without
2491    /// cloning the account event history. The cache is the sole owner of the
2492    /// account cell (the field is private and accessors only hand out
2493    /// lifetime-scoped [`AccountRef`] borrows), so the value is moved out of
2494    /// its cell rather than cloned.
2495    ///
2496    /// # Panics
2497    ///
2498    /// Panics if the cache no longer holds the only strong handle to the
2499    /// account cell. This indicates an internal invariant violation: some
2500    /// component cloned the underlying [`SharedCell`] and held it past the
2501    /// scope of a single cache method.
2502    #[must_use]
2503    pub fn take_account(&mut self, account_id: &AccountId) -> Option<AccountAny> {
2504        self.accounts.remove(account_id).map(|cell| {
2505            let rc: Rc<RefCell<AccountAny>> = cell.into();
2506            Rc::try_unwrap(rc).map_or_else(
2507                |_| panic!("take_account: cache must be sole owner of {account_id} cell"),
2508                RefCell::into_inner,
2509            )
2510        })
2511    }
2512
2513    /// Caches the `account` in memory without updating the database.
2514    pub fn cache_account_owned(&mut self, account: AccountAny) {
2515        let account_id = account.id();
2516        self.index
2517            .venue_account
2518            .insert(account_id.get_issuer(), account_id);
2519        match self.accounts.get(&account_id) {
2520            Some(account_cell) => *account_cell.borrow_mut() = account,
2521            None => {
2522                self.accounts.insert(account_id, SharedCell::new(account));
2523            }
2524        }
2525    }
2526
2527    /// Updates the `account` in the cache, taking ownership of the updated account.
2528    ///
2529    /// # Errors
2530    ///
2531    /// Returns an error if updating the account in the database fails.
2532    pub fn update_account_owned(&mut self, account: AccountAny) -> anyhow::Result<()> {
2533        let account_id = account.id();
2534        self.cache_account_owned(account);
2535
2536        if let Some(database) = &mut self.database {
2537            let Some(account_cell) = self.accounts.get(&account_id) else {
2538                anyhow::bail!("Account {account_id} not found after cache update");
2539            };
2540            database.update_account(&account_cell.borrow())?;
2541        }
2542        Ok(())
2543    }
2544
2545    /// Applies an account state event to the cached account.
2546    ///
2547    /// Mutates the cached account in place to avoid cloning the account event
2548    /// history on the hot path; long-running sessions accumulate many events
2549    /// per account, so a snapshot-clone here would be O(history) per update.
2550    ///
2551    /// # Errors
2552    ///
2553    /// Returns an error if applying or persisting the account state fails.
2554    pub fn update_account_state(&mut self, event: &AccountState) -> anyhow::Result<()> {
2555        let Some(cell) = self.accounts.get(&event.account_id) else {
2556            return self.add_account(AccountAny::from_events(std::slice::from_ref(event))?);
2557        };
2558
2559        cell.borrow_mut().apply(event.clone())?;
2560
2561        if let Some(database) = &mut self.database {
2562            database.update_account(&cell.borrow())?;
2563        }
2564        Ok(())
2565    }
2566
2567    /// Replaces the cached `order` from a non-event snapshot.
2568    ///
2569    /// Prefer [`Self::update_order`] for lifecycle state changes. Use this only for order state
2570    /// that is not represented by [`OrderEventAny`].
2571    ///
2572    /// # Errors
2573    ///
2574    /// Returns an error if updating the order indexes or database fails.
2575    pub fn replace_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
2576        self.refresh_order(order)?;
2577
2578        let client_order_id = order.client_order_id();
2579        match self.orders.get(&client_order_id) {
2580            // Reuse the existing cell so the canonical entry stays in place rather than
2581            // orphaning a stale cell.
2582            Some(order_cell) => *order_cell.borrow_mut() = order.clone(),
2583            None => {
2584                self.orders
2585                    .insert(client_order_id, SharedCell::new(order.clone()));
2586            }
2587        }
2588
2589        Ok(())
2590    }
2591
2592    /// Updates the cached order by applying an event and refreshing derived cache state.
2593    ///
2594    /// # Errors
2595    ///
2596    /// Returns an error if the order is not found or rejects the event.
2597    pub fn update_order(&mut self, event: &OrderEventAny) -> anyhow::Result<OrderAny> {
2598        let event_client_order_id = event.client_order_id();
2599        let client_order_id = if self.order_exists(&event_client_order_id) {
2600            event_client_order_id
2601        } else if let Some(venue_order_id) = event.venue_order_id() {
2602            self.index
2603                .venue_order_ids
2604                .get(&venue_order_id)
2605                .copied()
2606                .ok_or(OrderError::NotFound(event_client_order_id))?
2607        } else {
2608            return Err(OrderError::NotFound(event_client_order_id).into());
2609        };
2610
2611        let order_cell = self
2612            .orders
2613            .get(&client_order_id)
2614            .cloned()
2615            .ok_or(OrderError::NotFound(client_order_id))?;
2616
2617        // Apply on a snapshot first so a fallible `apply` (e.g. invalid state
2618        // transition) leaves the canonical cell untouched. On success we swap the
2619        // post-event value back into the cell so subsequent reads see the new state.
2620        let mut snapshot = order_cell.borrow().clone();
2621        snapshot.apply(event.clone())?;
2622        *order_cell.borrow_mut() = snapshot.clone();
2623
2624        if let Err(e) = self.refresh_order(&snapshot) {
2625            log::error!("Error updating order in cache: {e}");
2626        }
2627
2628        Ok(snapshot)
2629    }
2630
2631    fn refresh_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
2632        let client_order_id = order.client_order_id();
2633
2634        if order.is_active_local() {
2635            self.index.orders_active_local.insert(client_order_id);
2636        } else {
2637            self.index.orders_active_local.remove(&client_order_id);
2638        }
2639
2640        // Update venue order ID
2641        if let Some(venue_order_id) = order.venue_order_id() {
2642            // If the order is being modified then we allow a changing `VenueOrderId` to accommodate
2643            // venues which use a cancel+replace update strategy.
2644            if !self.index.venue_order_ids.contains_key(&venue_order_id) {
2645                let overwrite = matches!(order.last_event(), OrderEventAny::Updated(_));
2646                if let Err(e) =
2647                    self.add_venue_order_id(&order.client_order_id(), &venue_order_id, overwrite)
2648                {
2649                    log::error!("Error indexing venue order ID in cache: {e}");
2650                }
2651            }
2652        }
2653
2654        // Update in-flight state
2655        if order.is_inflight() {
2656            self.index.orders_inflight.insert(client_order_id);
2657        } else {
2658            self.index.orders_inflight.remove(&client_order_id);
2659        }
2660
2661        // Update open/closed state
2662        if order.is_open() {
2663            self.index.orders_closed.remove(&client_order_id);
2664            self.index.orders_open.insert(client_order_id);
2665        } else if order.is_closed() {
2666            self.index.orders_open.remove(&client_order_id);
2667            self.index.orders_pending_cancel.remove(&client_order_id);
2668            self.index.orders_closed.insert(client_order_id);
2669        }
2670
2671        // Update emulation index
2672        if let Some(emulation_trigger) = order.emulation_trigger()
2673            && emulation_trigger != TriggerType::NoTrigger
2674            && !order.is_closed()
2675        {
2676            self.index.orders_emulated.insert(client_order_id);
2677        } else {
2678            self.index.orders_emulated.remove(&client_order_id);
2679        }
2680
2681        // Update account orders index when account_id becomes available
2682        if let Some(account_id) = order.account_id() {
2683            self.index
2684                .account_orders
2685                .entry(account_id)
2686                .or_default()
2687                .insert(client_order_id);
2688        }
2689
2690        // Update own book
2691        if !self.own_books.is_empty() {
2692            let own_book = self.own_order_book(&order.instrument_id());
2693            if (own_book.is_some() && order.is_closed()) || should_handle_own_book_order(order) {
2694                self.update_own_order_book(order);
2695            }
2696        }
2697
2698        if let Some(database) = &mut self.database {
2699            database.update_order(order.last_event())?;
2700            // TODO: Implement order snapshots
2701            // if self.snapshot_orders {
2702            //     database.snapshot_order_state(order)?;
2703            // }
2704        }
2705
2706        Ok(())
2707    }
2708
2709    /// Updates the `order` as pending cancel locally.
2710    pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
2711        self.index
2712            .orders_pending_cancel
2713            .insert(order.client_order_id());
2714    }
2715
2716    /// Updates the `position` in the cache.
2717    ///
2718    /// Reuses the existing cell when present so any held [`PositionRef`] handles continue to point
2719    /// at the canonical entry; only inserts a new cell when the position is unknown.
2720    ///
2721    /// # Errors
2722    ///
2723    /// Returns an error if updating the position in the database fails.
2724    pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
2725        // Update open/closed state
2726
2727        if position.is_open() {
2728            self.index.positions_open.insert(position.id);
2729            self.index.positions_closed.remove(&position.id);
2730        } else {
2731            self.index.positions_closed.insert(position.id);
2732            self.index.positions_open.remove(&position.id);
2733        }
2734
2735        if let Some(database) = &mut self.database {
2736            database.update_position(position)?;
2737            // TODO: Implement order snapshots
2738            // if self.snapshot_orders {
2739            //     database.snapshot_order_state(order)?;
2740            // }
2741        }
2742
2743        match self.positions.get(&position.id) {
2744            Some(position_cell) => *position_cell.borrow_mut() = position.clone(),
2745            None => {
2746                self.positions
2747                    .insert(position.id, SharedCell::new(position.clone()));
2748            }
2749        }
2750
2751        Ok(())
2752    }
2753
2754    /// Creates a snapshot of the `position` by cloning it, assigning a new ID,
2755    /// serializing it, and storing it in the position snapshots.
2756    ///
2757    /// # Errors
2758    ///
2759    /// Returns an error if serializing or storing the position snapshot fails.
2760    pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<CacheSnapshotRef> {
2761        let position_id = position.id;
2762
2763        let mut copied_position = position.clone();
2764        let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
2765        copied_position.id = PositionId::new(new_id);
2766
2767        // Serialize the position (TODO: temporarily just to JSON to remove a dependency)
2768        let position_serialized = serde_json::to_vec(&copied_position)?;
2769        let snapshot_index = self.position_snapshot_count(&position_id);
2770        let blob_ref = format!(
2771            "cache://position-snapshots/{}/{}",
2772            position_id.as_str(),
2773            snapshot_index,
2774        );
2775        let snapshot_blob = Bytes::from(position_serialized);
2776
2777        self.add(&blob_ref, snapshot_blob.clone())?;
2778        self.position_snapshots
2779            .entry(position_id)
2780            .or_default()
2781            .push(snapshot_blob.clone());
2782
2783        log::debug!("Snapshot {copied_position}");
2784        Ok(CacheSnapshotRef::new(blob_ref, snapshot_blob))
2785    }
2786
2787    /// Loads the cache-owned snapshot blob stored under `blob_ref`.
2788    ///
2789    /// The cache first checks in-memory snapshot state. When the blob is not present and a
2790    /// database adapter exists, the generic cache entries are loaded and checked for the same
2791    /// opaque reference.
2792    ///
2793    /// # Errors
2794    ///
2795    /// Returns an error if loading generic cache entries from the backing database fails.
2796    pub fn load_snapshot_blob(&mut self, blob_ref: &str) -> anyhow::Result<Option<Bytes>> {
2797        if let Some(blob) = self.snapshot_blob(blob_ref) {
2798            return Ok(Some(blob));
2799        }
2800
2801        if self.database.is_some() {
2802            self.cache_general()?;
2803        }
2804
2805        Ok(self.snapshot_blob(blob_ref))
2806    }
2807
2808    /// Restores the cache-owned snapshot blob stored under `blob_ref`.
2809    ///
2810    /// Only cache-owned `cache://position-snapshots/...` blobs are currently supported.
2811    ///
2812    /// # Errors
2813    ///
2814    /// Returns an error if the blob reference is unsupported, malformed, skips earlier
2815    /// snapshot frames, conflicts with an existing frame, or does not decode to the expected
2816    /// position snapshot.
2817    pub fn restore_snapshot_blob(&mut self, blob_ref: &str, blob: Bytes) -> anyhow::Result<()> {
2818        let (position_id, snapshot_index) = parse_position_snapshot_blob_ref(blob_ref)?;
2819        validate_position_snapshot_blob(&position_id, blob.as_ref())?;
2820
2821        let frames = self.position_snapshots.entry(position_id).or_default();
2822        match frames.get(snapshot_index) {
2823            Some(existing) if existing == &blob => {}
2824            Some(_) => {
2825                anyhow::bail!(
2826                    "position snapshot frame {snapshot_index} for {position_id} already exists with different bytes"
2827                );
2828            }
2829            None if frames.len() == snapshot_index => frames.push(blob.clone()),
2830            None => {
2831                anyhow::bail!(
2832                    "position snapshot blob_ref {blob_ref} skips missing frame {}",
2833                    frames.len()
2834                );
2835            }
2836        }
2837
2838        self.general.insert(blob_ref.to_string(), blob);
2839        Ok(())
2840    }
2841
2842    fn snapshot_blob(&self, blob_ref: &str) -> Option<Bytes> {
2843        if let Some(blob) = self.general.get(blob_ref) {
2844            return Some(blob.clone());
2845        }
2846
2847        let (position_id, snapshot_index) = parse_position_snapshot_blob_ref(blob_ref).ok()?;
2848        self.position_snapshots
2849            .get(&position_id)
2850            .and_then(|frames| frames.get(snapshot_index))
2851            .cloned()
2852    }
2853
2854    /// Creates a snapshot of the `position` state in the database.
2855    ///
2856    /// # Errors
2857    ///
2858    /// Returns an error if snapshotting the position state fails.
2859    pub fn snapshot_position_state(
2860        &mut self,
2861        position: &Position,
2862        // ts_snapshot: u64,
2863        // unrealized_pnl: Option<Money>,
2864        open_only: Option<bool>,
2865    ) -> anyhow::Result<()> {
2866        let open_only = open_only.unwrap_or(true);
2867
2868        if open_only && !position.is_open() {
2869            return Ok(());
2870        }
2871
2872        if let Some(database) = &mut self.database {
2873            database.snapshot_position_state(position).map_err(|e| {
2874                log::error!(
2875                    "Failed to snapshot position state for {}: {e:?}",
2876                    position.id
2877                );
2878                e
2879            })?;
2880        } else {
2881            log::warn!(
2882                "Cannot snapshot position state for {} (no database configured)",
2883                position.id
2884            );
2885        }
2886
2887        // Ok(())
2888        todo!()
2889    }
2890
2891    /// Gets the OMS type for the `position_id`.
2892    #[must_use]
2893    pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
2894        // Get OMS type from the index
2895        if self.index.position_strategy.contains_key(position_id) {
2896            // For now, we'll default to NETTING
2897            // TODO: Store and retrieve actual OMS type per position
2898            Some(OmsType::Netting)
2899        } else {
2900            None
2901        }
2902    }
2903
2904    /// Gets the serialized position snapshot frames for the `position_id`.
2905    ///
2906    /// Each element in the returned vector is one JSON-encoded [`Position`] snapshot,
2907    /// in the order they were taken.
2908    #[must_use]
2909    pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<Vec<u8>>> {
2910        self.position_snapshots
2911            .get(position_id)
2912            .map(|frames| frames.iter().map(|b| b.to_vec()).collect())
2913    }
2914
2915    /// Returns the number of stored snapshot frames for the `position_id`.
2916    ///
2917    /// Returns `0` when no frames are stored. Does not allocate or copy frame bytes.
2918    #[must_use]
2919    pub fn position_snapshot_count(&self, position_id: &PositionId) -> usize {
2920        self.position_snapshots.get(position_id).map_or(0, Vec::len)
2921    }
2922
2923    /// Returns all position snapshots with the given optional filters.
2924    ///
2925    /// When `position_id` is `Some`, only snapshots for that position are returned.
2926    /// When `account_id` is `Some`, snapshots are filtered to that account.
2927    /// Frames that fail to deserialize are skipped with a warning.
2928    #[must_use]
2929    pub fn position_snapshots(
2930        &self,
2931        position_id: Option<&PositionId>,
2932        account_id: Option<&AccountId>,
2933    ) -> Vec<Position> {
2934        let frames: Box<dyn Iterator<Item = &Bytes> + '_> = match position_id {
2935            Some(pid) => match self.position_snapshots.get(pid) {
2936                Some(v) => Box::new(v.iter()),
2937                None => Box::new(std::iter::empty()),
2938            },
2939            None => Box::new(self.position_snapshots.values().flat_map(|v| v.iter())),
2940        };
2941
2942        let mut results: Vec<Position> = frames
2943            .filter_map(|bytes| match serde_json::from_slice::<Position>(bytes) {
2944                Ok(position) => Some(position),
2945                Err(e) => {
2946                    log::warn!("Failed to decode position snapshot: {e}");
2947                    None
2948                }
2949            })
2950            .collect();
2951
2952        if let Some(aid) = account_id {
2953            results.retain(|p| p.account_id == *aid);
2954        }
2955
2956        results
2957    }
2958
2959    /// Returns position snapshots for `position_id` starting from the `skip`th frame.
2960    ///
2961    /// Use this to deserialize only newly appended snapshots when the caller already
2962    /// processed earlier frames. Returns an empty vector when no frames or fewer than
2963    /// `skip` frames are stored. Frames that fail to deserialize are skipped with a warning.
2964    #[must_use]
2965    pub fn position_snapshots_from(&self, position_id: &PositionId, skip: usize) -> Vec<Position> {
2966        let Some(frames) = self.position_snapshots.get(position_id) else {
2967            return Vec::new();
2968        };
2969
2970        frames
2971            .iter()
2972            .skip(skip)
2973            .filter_map(|bytes| match serde_json::from_slice::<Position>(bytes) {
2974                Ok(position) => Some(position),
2975                Err(e) => {
2976                    log::warn!("Failed to decode position snapshot: {e}");
2977                    None
2978                }
2979            })
2980            .collect()
2981    }
2982
2983    /// Gets position snapshot IDs for the `instrument_id`.
2984    #[must_use]
2985    pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> AHashSet<PositionId> {
2986        // Get snapshot position IDs that match the instrument
2987        let mut result = AHashSet::new();
2988
2989        for (position_id, _) in &self.position_snapshots {
2990            // Check if this position is for the requested instrument
2991            if let Some(position_cell) = self.positions.get(position_id)
2992                && position_cell.borrow().instrument_id == *instrument_id
2993            {
2994                result.insert(*position_id);
2995            }
2996        }
2997        result
2998    }
2999
3000    /// Snapshots the `order` state in the database.
3001    ///
3002    /// # Errors
3003    ///
3004    /// Returns an error if snapshotting the order state fails.
3005    pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
3006        let Some(database) = &self.database else {
3007            log::warn!(
3008                "Cannot snapshot order state for {} (no database configured)",
3009                order.client_order_id()
3010            );
3011            return Ok(());
3012        };
3013
3014        database.snapshot_order_state(order)
3015    }
3016
3017    // -- IDENTIFIER QUERIES ----------------------------------------------------------------------
3018
3019    // Collects references to the index sets that constrain an order query.
3020    //
3021    // Returns:
3022    // - `FilterSources::Unfiltered` when no filter is provided (the caller should iterate
3023    //   the full bucket).
3024    // - `FilterSources::Empty` when a filter is provided but the index has no entry for it
3025    //   (the resolved set is unconditionally empty, no further work needed).
3026    // - `FilterSources::Sets` with borrowed references to each filter source set.
3027    fn collect_order_filter_sources<'a>(
3028        &'a self,
3029        venue: Option<&Venue>,
3030        instrument_id: Option<&InstrumentId>,
3031        strategy_id: Option<&StrategyId>,
3032        account_id: Option<&AccountId>,
3033    ) -> FilterSources<'a, ClientOrderId> {
3034        let mut sources: Vec<&AHashSet<ClientOrderId>> = Vec::with_capacity(4);
3035
3036        if let Some(venue) = venue {
3037            match self.index.venue_orders.get(venue) {
3038                Some(set) => sources.push(set),
3039                None => return FilterSources::Empty,
3040            }
3041        }
3042
3043        if let Some(instrument_id) = instrument_id {
3044            match self.index.instrument_orders.get(instrument_id) {
3045                Some(set) => sources.push(set),
3046                None => return FilterSources::Empty,
3047            }
3048        }
3049
3050        if let Some(strategy_id) = strategy_id {
3051            match self.index.strategy_orders.get(strategy_id) {
3052                Some(set) => sources.push(set),
3053                None => return FilterSources::Empty,
3054            }
3055        }
3056
3057        if let Some(account_id) = account_id {
3058            match self.index.account_orders.get(account_id) {
3059                Some(set) => sources.push(set),
3060                None => return FilterSources::Empty,
3061            }
3062        }
3063
3064        if sources.is_empty() {
3065            FilterSources::Unfiltered
3066        } else {
3067            FilterSources::Sets(sources)
3068        }
3069    }
3070
3071    fn collect_position_filter_sources<'a>(
3072        &'a self,
3073        venue: Option<&Venue>,
3074        instrument_id: Option<&InstrumentId>,
3075        strategy_id: Option<&StrategyId>,
3076        account_id: Option<&AccountId>,
3077    ) -> FilterSources<'a, PositionId> {
3078        let mut sources: Vec<&AHashSet<PositionId>> = Vec::with_capacity(4);
3079
3080        if let Some(venue) = venue {
3081            match self.index.venue_positions.get(venue) {
3082                Some(set) => sources.push(set),
3083                None => return FilterSources::Empty,
3084            }
3085        }
3086
3087        if let Some(instrument_id) = instrument_id {
3088            match self.index.instrument_positions.get(instrument_id) {
3089                Some(set) => sources.push(set),
3090                None => return FilterSources::Empty,
3091            }
3092        }
3093
3094        if let Some(strategy_id) = strategy_id {
3095            match self.index.strategy_positions.get(strategy_id) {
3096                Some(set) => sources.push(set),
3097                None => return FilterSources::Empty,
3098            }
3099        }
3100
3101        if let Some(account_id) = account_id {
3102            match self.index.account_positions.get(account_id) {
3103                Some(set) => sources.push(set),
3104                None => return FilterSources::Empty,
3105            }
3106        }
3107
3108        if sources.is_empty() {
3109            FilterSources::Unfiltered
3110        } else {
3111            FilterSources::Sets(sources)
3112        }
3113    }
3114
3115    // Materializes the `ClientOrderId`s in `bucket` matching the optional filter parameters.
3116    //
3117    // Folds the bucket into the filter sources and runs a single size-ordered intersection,
3118    // avoiding the legacy two-step build-filter-set + bucket-intersection that allocated and
3119    // rehashed twice.
3120    fn query_orders_in_bucket(
3121        &self,
3122        bucket: &AHashSet<ClientOrderId>,
3123        venue: Option<&Venue>,
3124        instrument_id: Option<&InstrumentId>,
3125        strategy_id: Option<&StrategyId>,
3126        account_id: Option<&AccountId>,
3127    ) -> AHashSet<ClientOrderId> {
3128        match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
3129            FilterSources::Empty => AHashSet::new(),
3130            FilterSources::Unfiltered => bucket.clone(),
3131            FilterSources::Sets(sources) => intersect_pair_or_many(bucket, sources),
3132        }
3133    }
3134
3135    fn query_positions_in_bucket(
3136        &self,
3137        bucket: &AHashSet<PositionId>,
3138        venue: Option<&Venue>,
3139        instrument_id: Option<&InstrumentId>,
3140        strategy_id: Option<&StrategyId>,
3141        account_id: Option<&AccountId>,
3142    ) -> AHashSet<PositionId> {
3143        match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
3144            FilterSources::Empty => AHashSet::new(),
3145            FilterSources::Unfiltered => bucket.clone(),
3146            FilterSources::Sets(sources) => intersect_pair_or_many(bucket, sources),
3147        }
3148    }
3149
3150    // Returns a borrowed or owned view of the orders in `bucket` matching the optional filter
3151    // parameters. Avoids cloning the bucket when no filter narrows it.
3152    fn view_orders_in_bucket<'a>(
3153        &'a self,
3154        bucket: &'a AHashSet<ClientOrderId>,
3155        venue: Option<&Venue>,
3156        instrument_id: Option<&InstrumentId>,
3157        strategy_id: Option<&StrategyId>,
3158        account_id: Option<&AccountId>,
3159    ) -> Cow<'a, AHashSet<ClientOrderId>> {
3160        match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
3161            FilterSources::Empty => Cow::Owned(AHashSet::new()),
3162            FilterSources::Unfiltered => Cow::Borrowed(bucket),
3163            FilterSources::Sets(sources) => Cow::Owned(intersect_pair_or_many(bucket, sources)),
3164        }
3165    }
3166
3167    fn view_positions_in_bucket<'a>(
3168        &'a self,
3169        bucket: &'a AHashSet<PositionId>,
3170        venue: Option<&Venue>,
3171        instrument_id: Option<&InstrumentId>,
3172        strategy_id: Option<&StrategyId>,
3173        account_id: Option<&AccountId>,
3174    ) -> Cow<'a, AHashSet<PositionId>> {
3175        match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
3176            FilterSources::Empty => Cow::Owned(AHashSet::new()),
3177            FilterSources::Unfiltered => Cow::Borrowed(bucket),
3178            FilterSources::Sets(sources) => Cow::Owned(intersect_pair_or_many(bucket, sources)),
3179        }
3180    }
3181
3182    // Returns a lazy iterator yielding the [`ClientOrderId`]s in `bucket` matching the optional
3183    // filter parameters. Avoids any [`Vec`] or [`AHashSet`] materialization in the result path,
3184    // and (for multi-filter calls) drives intersection from the smallest source while looking
3185    // up membership in the rest.
3186    fn iter_orders_in_bucket<'a>(
3187        &'a self,
3188        bucket: &'a AHashSet<ClientOrderId>,
3189        venue: Option<&Venue>,
3190        instrument_id: Option<&InstrumentId>,
3191        strategy_id: Option<&StrategyId>,
3192        account_id: Option<&AccountId>,
3193    ) -> Box<dyn Iterator<Item = ClientOrderId> + 'a> {
3194        match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
3195            FilterSources::Empty => Box::new(std::iter::empty()),
3196            FilterSources::Unfiltered => Box::new(bucket.iter().copied()),
3197            FilterSources::Sets(mut sources) => {
3198                sources.push(bucket);
3199                sources.sort_unstable_by_key(|s| s.len());
3200                let driver = sources[0];
3201                let rest: Vec<&'a AHashSet<ClientOrderId>> = sources[1..].to_vec();
3202                Box::new(
3203                    driver
3204                        .iter()
3205                        .copied()
3206                        .filter(move |id| rest.iter().all(|s| s.contains(id))),
3207                )
3208            }
3209        }
3210    }
3211
3212    fn iter_positions_in_bucket<'a>(
3213        &'a self,
3214        bucket: &'a AHashSet<PositionId>,
3215        venue: Option<&Venue>,
3216        instrument_id: Option<&InstrumentId>,
3217        strategy_id: Option<&StrategyId>,
3218        account_id: Option<&AccountId>,
3219    ) -> Box<dyn Iterator<Item = PositionId> + 'a> {
3220        match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
3221            FilterSources::Empty => Box::new(std::iter::empty()),
3222            FilterSources::Unfiltered => Box::new(bucket.iter().copied()),
3223            FilterSources::Sets(mut sources) => {
3224                sources.push(bucket);
3225                sources.sort_unstable_by_key(|s| s.len());
3226                let driver = sources[0];
3227                let rest: Vec<&'a AHashSet<PositionId>> = sources[1..].to_vec();
3228                Box::new(
3229                    driver
3230                        .iter()
3231                        .copied()
3232                        .filter(move |id| rest.iter().all(|s| s.contains(id))),
3233                )
3234            }
3235        }
3236    }
3237
3238    // Counts orders in `bucket` matching the optional filter parameters.
3239    //
3240    // Drives intersection from the smallest filter source (or the bucket itself when no filter
3241    // is provided) and short-circuits by counting rather than collecting. With a side filter,
3242    // each candidate order is borrowed via its cell only long enough to inspect the side.
3243    fn count_orders_in_bucket(
3244        &self,
3245        bucket: &AHashSet<ClientOrderId>,
3246        venue: Option<&Venue>,
3247        instrument_id: Option<&InstrumentId>,
3248        strategy_id: Option<&StrategyId>,
3249        account_id: Option<&AccountId>,
3250        side: Option<OrderSide>,
3251    ) -> usize {
3252        let side = side.unwrap_or(OrderSide::NoOrderSide);
3253
3254        match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
3255            FilterSources::Empty => 0,
3256            FilterSources::Unfiltered => {
3257                if side == OrderSide::NoOrderSide {
3258                    bucket.len()
3259                } else {
3260                    bucket
3261                        .iter()
3262                        .filter(|id| self.order_side_matches(id, side))
3263                        .count()
3264                }
3265            }
3266            FilterSources::Sets(mut sources) => {
3267                sources.push(bucket);
3268                sources.sort_unstable_by_key(|s| s.len());
3269                let driver = sources[0];
3270                let rest = &sources[1..];
3271
3272                driver
3273                    .iter()
3274                    .filter(|id| rest.iter().all(|s| s.contains(id)))
3275                    .filter(|id| {
3276                        side == OrderSide::NoOrderSide || self.order_side_matches(id, side)
3277                    })
3278                    .count()
3279            }
3280        }
3281    }
3282
3283    fn count_positions_in_bucket(
3284        &self,
3285        bucket: &AHashSet<PositionId>,
3286        venue: Option<&Venue>,
3287        instrument_id: Option<&InstrumentId>,
3288        strategy_id: Option<&StrategyId>,
3289        account_id: Option<&AccountId>,
3290        side: Option<PositionSide>,
3291    ) -> usize {
3292        let side = side.unwrap_or(PositionSide::NoPositionSide);
3293
3294        match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
3295            FilterSources::Empty => 0,
3296            FilterSources::Unfiltered => {
3297                if side == PositionSide::NoPositionSide {
3298                    bucket.len()
3299                } else {
3300                    bucket
3301                        .iter()
3302                        .filter(|id| self.position_side_matches(id, side))
3303                        .count()
3304                }
3305            }
3306            FilterSources::Sets(mut sources) => {
3307                sources.push(bucket);
3308                sources.sort_unstable_by_key(|s| s.len());
3309                let driver = sources[0];
3310                let rest = &sources[1..];
3311
3312                driver
3313                    .iter()
3314                    .filter(|id| rest.iter().all(|s| s.contains(id)))
3315                    .filter(|id| {
3316                        side == PositionSide::NoPositionSide || self.position_side_matches(id, side)
3317                    })
3318                    .count()
3319            }
3320        }
3321    }
3322
3323    // Returns whether any order in `bucket` matches the optional filter parameters.
3324    //
3325    // Mirrors `count_orders_in_bucket` but short-circuits on the first match. Useful for
3326    // `is_empty`-style gating in hot paths where the caller only needs to know whether at
3327    // least one matching order exists.
3328    fn any_orders_in_bucket(
3329        &self,
3330        bucket: &AHashSet<ClientOrderId>,
3331        venue: Option<&Venue>,
3332        instrument_id: Option<&InstrumentId>,
3333        strategy_id: Option<&StrategyId>,
3334        account_id: Option<&AccountId>,
3335        side: Option<OrderSide>,
3336    ) -> bool {
3337        let side = side.unwrap_or(OrderSide::NoOrderSide);
3338
3339        match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
3340            FilterSources::Empty => false,
3341            FilterSources::Unfiltered => {
3342                if side == OrderSide::NoOrderSide {
3343                    !bucket.is_empty()
3344                } else {
3345                    bucket.iter().any(|id| self.order_side_matches(id, side))
3346                }
3347            }
3348            FilterSources::Sets(mut sources) => {
3349                sources.push(bucket);
3350                sources.sort_unstable_by_key(|s| s.len());
3351                let driver = sources[0];
3352                let rest = &sources[1..];
3353
3354                driver
3355                    .iter()
3356                    .filter(|id| rest.iter().all(|s| s.contains(id)))
3357                    .any(|id| side == OrderSide::NoOrderSide || self.order_side_matches(id, side))
3358            }
3359        }
3360    }
3361
3362    fn any_positions_in_bucket(
3363        &self,
3364        bucket: &AHashSet<PositionId>,
3365        venue: Option<&Venue>,
3366        instrument_id: Option<&InstrumentId>,
3367        strategy_id: Option<&StrategyId>,
3368        account_id: Option<&AccountId>,
3369        side: Option<PositionSide>,
3370    ) -> bool {
3371        let side = side.unwrap_or(PositionSide::NoPositionSide);
3372
3373        match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
3374            FilterSources::Empty => false,
3375            FilterSources::Unfiltered => {
3376                if side == PositionSide::NoPositionSide {
3377                    !bucket.is_empty()
3378                } else {
3379                    bucket.iter().any(|id| self.position_side_matches(id, side))
3380                }
3381            }
3382            FilterSources::Sets(mut sources) => {
3383                sources.push(bucket);
3384                sources.sort_unstable_by_key(|s| s.len());
3385                let driver = sources[0];
3386                let rest = &sources[1..];
3387
3388                driver
3389                    .iter()
3390                    .filter(|id| rest.iter().all(|s| s.contains(id)))
3391                    .any(|id| {
3392                        side == PositionSide::NoPositionSide || self.position_side_matches(id, side)
3393                    })
3394            }
3395        }
3396    }
3397
3398    fn order_side_matches(&self, client_order_id: &ClientOrderId, side: OrderSide) -> bool {
3399        self.orders
3400            .get(client_order_id)
3401            .is_some_and(|cell| cell.borrow().order_side() == side)
3402    }
3403
3404    fn position_side_matches(&self, position_id: &PositionId, side: PositionSide) -> bool {
3405        self.positions
3406            .get(position_id)
3407            .is_some_and(|cell| cell.borrow().side == side)
3408    }
3409
3410    /// Retrieves orders corresponding to the `client_order_ids`, optionally filtering by `side`.
3411    ///
3412    /// # Panics
3413    ///
3414    /// Panics if any `client_order_id` in the set is not found in the cache.
3415    fn get_orders_for_ids(
3416        &self,
3417        client_order_ids: &AHashSet<ClientOrderId>,
3418        side: Option<OrderSide>,
3419    ) -> Vec<OrderRef<'_>> {
3420        let side = side.unwrap_or(OrderSide::NoOrderSide);
3421        let mut orders = Vec::new();
3422
3423        for client_order_id in client_order_ids {
3424            let order_cell = self
3425                .orders
3426                .get(client_order_id)
3427                .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
3428            let order = OrderRef::new(order_cell.borrow());
3429
3430            if side == OrderSide::NoOrderSide || side == order.order_side() {
3431                orders.push(order);
3432            }
3433        }
3434
3435        // Sort so callers receive a deterministic Vec across runs; the
3436        // underlying client_order_ids set is AHash-backed.
3437        orders.sort_by_key(|o| o.client_order_id());
3438        orders
3439    }
3440
3441    /// Retrieves positions corresponding to the `position_ids`, optionally filtering by `side`.
3442    ///
3443    /// Each [`PositionRef`] in the returned vector borrows its underlying cell; mutating any of
3444    /// those positions while the vector is alive will panic at runtime. Drop the vector before
3445    /// issuing writes.
3446    ///
3447    /// # Panics
3448    ///
3449    /// Panics if any `position_id` in the set is not found in the cache.
3450    fn get_positions_for_ids(
3451        &self,
3452        position_ids: &AHashSet<PositionId>,
3453        side: Option<PositionSide>,
3454    ) -> Vec<PositionRef<'_>> {
3455        let side = side.unwrap_or(PositionSide::NoPositionSide);
3456        let mut positions = Vec::new();
3457
3458        for position_id in position_ids {
3459            let position_cell = self
3460                .positions
3461                .get(position_id)
3462                .unwrap_or_else(|| panic!("Position {position_id} not found"));
3463            let position = PositionRef::new(position_cell.borrow());
3464
3465            if side == PositionSide::NoPositionSide || side == position.side {
3466                positions.push(position);
3467            }
3468        }
3469
3470        // Sort so callers receive a deterministic Vec across runs; the
3471        // underlying position_ids set is AHash-backed.
3472        positions.sort_by_key(|p| p.id);
3473        positions
3474    }
3475
3476    /// Returns the `ClientOrderId`s of all orders.
3477    #[must_use]
3478    pub fn client_order_ids(
3479        &self,
3480        venue: Option<&Venue>,
3481        instrument_id: Option<&InstrumentId>,
3482        strategy_id: Option<&StrategyId>,
3483        account_id: Option<&AccountId>,
3484    ) -> AHashSet<ClientOrderId> {
3485        self.query_orders_in_bucket(
3486            &self.index.orders,
3487            venue,
3488            instrument_id,
3489            strategy_id,
3490            account_id,
3491        )
3492    }
3493
3494    /// Returns the `ClientOrderId`s of all open orders.
3495    #[must_use]
3496    pub fn client_order_ids_open(
3497        &self,
3498        venue: Option<&Venue>,
3499        instrument_id: Option<&InstrumentId>,
3500        strategy_id: Option<&StrategyId>,
3501        account_id: Option<&AccountId>,
3502    ) -> AHashSet<ClientOrderId> {
3503        self.query_orders_in_bucket(
3504            &self.index.orders_open,
3505            venue,
3506            instrument_id,
3507            strategy_id,
3508            account_id,
3509        )
3510    }
3511
3512    /// Returns the `ClientOrderId`s of all closed orders.
3513    #[must_use]
3514    pub fn client_order_ids_closed(
3515        &self,
3516        venue: Option<&Venue>,
3517        instrument_id: Option<&InstrumentId>,
3518        strategy_id: Option<&StrategyId>,
3519        account_id: Option<&AccountId>,
3520    ) -> AHashSet<ClientOrderId> {
3521        self.query_orders_in_bucket(
3522            &self.index.orders_closed,
3523            venue,
3524            instrument_id,
3525            strategy_id,
3526            account_id,
3527        )
3528    }
3529
3530    /// Returns the `ClientOrderId`s of all locally active orders.
3531    ///
3532    /// Locally active orders are in the `INITIALIZED`, `EMULATED`, or `RELEASED` state
3533    /// (a superset of emulated orders).
3534    #[must_use]
3535    pub fn client_order_ids_active_local(
3536        &self,
3537        venue: Option<&Venue>,
3538        instrument_id: Option<&InstrumentId>,
3539        strategy_id: Option<&StrategyId>,
3540        account_id: Option<&AccountId>,
3541    ) -> AHashSet<ClientOrderId> {
3542        self.query_orders_in_bucket(
3543            &self.index.orders_active_local,
3544            venue,
3545            instrument_id,
3546            strategy_id,
3547            account_id,
3548        )
3549    }
3550
3551    /// Returns the `ClientOrderId`s of all emulated orders.
3552    #[must_use]
3553    pub fn client_order_ids_emulated(
3554        &self,
3555        venue: Option<&Venue>,
3556        instrument_id: Option<&InstrumentId>,
3557        strategy_id: Option<&StrategyId>,
3558        account_id: Option<&AccountId>,
3559    ) -> AHashSet<ClientOrderId> {
3560        self.query_orders_in_bucket(
3561            &self.index.orders_emulated,
3562            venue,
3563            instrument_id,
3564            strategy_id,
3565            account_id,
3566        )
3567    }
3568
3569    /// Returns the `ClientOrderId`s of all in-flight orders.
3570    #[must_use]
3571    pub fn client_order_ids_inflight(
3572        &self,
3573        venue: Option<&Venue>,
3574        instrument_id: Option<&InstrumentId>,
3575        strategy_id: Option<&StrategyId>,
3576        account_id: Option<&AccountId>,
3577    ) -> AHashSet<ClientOrderId> {
3578        self.query_orders_in_bucket(
3579            &self.index.orders_inflight,
3580            venue,
3581            instrument_id,
3582            strategy_id,
3583            account_id,
3584        )
3585    }
3586
3587    /// Returns `PositionId`s of all positions.
3588    #[must_use]
3589    pub fn position_ids(
3590        &self,
3591        venue: Option<&Venue>,
3592        instrument_id: Option<&InstrumentId>,
3593        strategy_id: Option<&StrategyId>,
3594        account_id: Option<&AccountId>,
3595    ) -> AHashSet<PositionId> {
3596        self.query_positions_in_bucket(
3597            &self.index.positions,
3598            venue,
3599            instrument_id,
3600            strategy_id,
3601            account_id,
3602        )
3603    }
3604
3605    /// Returns the `PositionId`s of all open positions.
3606    #[must_use]
3607    pub fn position_open_ids(
3608        &self,
3609        venue: Option<&Venue>,
3610        instrument_id: Option<&InstrumentId>,
3611        strategy_id: Option<&StrategyId>,
3612        account_id: Option<&AccountId>,
3613    ) -> AHashSet<PositionId> {
3614        self.query_positions_in_bucket(
3615            &self.index.positions_open,
3616            venue,
3617            instrument_id,
3618            strategy_id,
3619            account_id,
3620        )
3621    }
3622
3623    /// Returns the `PositionId`s of all closed positions.
3624    #[must_use]
3625    pub fn position_closed_ids(
3626        &self,
3627        venue: Option<&Venue>,
3628        instrument_id: Option<&InstrumentId>,
3629        strategy_id: Option<&StrategyId>,
3630        account_id: Option<&AccountId>,
3631    ) -> AHashSet<PositionId> {
3632        self.query_positions_in_bucket(
3633            &self.index.positions_closed,
3634            venue,
3635            instrument_id,
3636            strategy_id,
3637            account_id,
3638        )
3639    }
3640
3641    /// Returns a borrowed view over the [`ClientOrderId`]s of all orders matching the optional
3642    /// filter parameters.
3643    ///
3644    /// The returned [`Cow`] borrows the underlying index when no filter is provided and only
3645    /// allocates an owned [`AHashSet`] when an intersection is required. Prefer this over
3646    /// [`Self::client_order_ids`] when the caller only needs to iterate or read membership.
3647    #[must_use]
3648    pub fn client_order_ids_view(
3649        &self,
3650        venue: Option<&Venue>,
3651        instrument_id: Option<&InstrumentId>,
3652        strategy_id: Option<&StrategyId>,
3653        account_id: Option<&AccountId>,
3654    ) -> Cow<'_, AHashSet<ClientOrderId>> {
3655        self.view_orders_in_bucket(
3656            &self.index.orders,
3657            venue,
3658            instrument_id,
3659            strategy_id,
3660            account_id,
3661        )
3662    }
3663
3664    /// Returns a borrowed view over the [`ClientOrderId`]s of all open orders.
3665    #[must_use]
3666    pub fn client_order_ids_open_view(
3667        &self,
3668        venue: Option<&Venue>,
3669        instrument_id: Option<&InstrumentId>,
3670        strategy_id: Option<&StrategyId>,
3671        account_id: Option<&AccountId>,
3672    ) -> Cow<'_, AHashSet<ClientOrderId>> {
3673        self.view_orders_in_bucket(
3674            &self.index.orders_open,
3675            venue,
3676            instrument_id,
3677            strategy_id,
3678            account_id,
3679        )
3680    }
3681
3682    /// Returns a borrowed view over the [`ClientOrderId`]s of all closed orders.
3683    #[must_use]
3684    pub fn client_order_ids_closed_view(
3685        &self,
3686        venue: Option<&Venue>,
3687        instrument_id: Option<&InstrumentId>,
3688        strategy_id: Option<&StrategyId>,
3689        account_id: Option<&AccountId>,
3690    ) -> Cow<'_, AHashSet<ClientOrderId>> {
3691        self.view_orders_in_bucket(
3692            &self.index.orders_closed,
3693            venue,
3694            instrument_id,
3695            strategy_id,
3696            account_id,
3697        )
3698    }
3699
3700    /// Returns a borrowed view over the [`ClientOrderId`]s of all locally active orders.
3701    #[must_use]
3702    pub fn client_order_ids_active_local_view(
3703        &self,
3704        venue: Option<&Venue>,
3705        instrument_id: Option<&InstrumentId>,
3706        strategy_id: Option<&StrategyId>,
3707        account_id: Option<&AccountId>,
3708    ) -> Cow<'_, AHashSet<ClientOrderId>> {
3709        self.view_orders_in_bucket(
3710            &self.index.orders_active_local,
3711            venue,
3712            instrument_id,
3713            strategy_id,
3714            account_id,
3715        )
3716    }
3717
3718    /// Returns a borrowed view over the [`ClientOrderId`]s of all emulated orders.
3719    #[must_use]
3720    pub fn client_order_ids_emulated_view(
3721        &self,
3722        venue: Option<&Venue>,
3723        instrument_id: Option<&InstrumentId>,
3724        strategy_id: Option<&StrategyId>,
3725        account_id: Option<&AccountId>,
3726    ) -> Cow<'_, AHashSet<ClientOrderId>> {
3727        self.view_orders_in_bucket(
3728            &self.index.orders_emulated,
3729            venue,
3730            instrument_id,
3731            strategy_id,
3732            account_id,
3733        )
3734    }
3735
3736    /// Returns a borrowed view over the [`ClientOrderId`]s of all in-flight orders.
3737    #[must_use]
3738    pub fn client_order_ids_inflight_view(
3739        &self,
3740        venue: Option<&Venue>,
3741        instrument_id: Option<&InstrumentId>,
3742        strategy_id: Option<&StrategyId>,
3743        account_id: Option<&AccountId>,
3744    ) -> Cow<'_, AHashSet<ClientOrderId>> {
3745        self.view_orders_in_bucket(
3746            &self.index.orders_inflight,
3747            venue,
3748            instrument_id,
3749            strategy_id,
3750            account_id,
3751        )
3752    }
3753
3754    /// Returns a borrowed view over the [`PositionId`]s of all positions.
3755    #[must_use]
3756    pub fn position_ids_view(
3757        &self,
3758        venue: Option<&Venue>,
3759        instrument_id: Option<&InstrumentId>,
3760        strategy_id: Option<&StrategyId>,
3761        account_id: Option<&AccountId>,
3762    ) -> Cow<'_, AHashSet<PositionId>> {
3763        self.view_positions_in_bucket(
3764            &self.index.positions,
3765            venue,
3766            instrument_id,
3767            strategy_id,
3768            account_id,
3769        )
3770    }
3771
3772    /// Returns a borrowed view over the [`PositionId`]s of all open positions.
3773    #[must_use]
3774    pub fn position_open_ids_view(
3775        &self,
3776        venue: Option<&Venue>,
3777        instrument_id: Option<&InstrumentId>,
3778        strategy_id: Option<&StrategyId>,
3779        account_id: Option<&AccountId>,
3780    ) -> Cow<'_, AHashSet<PositionId>> {
3781        self.view_positions_in_bucket(
3782            &self.index.positions_open,
3783            venue,
3784            instrument_id,
3785            strategy_id,
3786            account_id,
3787        )
3788    }
3789
3790    /// Returns a borrowed view over the [`PositionId`]s of all closed positions.
3791    #[must_use]
3792    pub fn position_closed_ids_view(
3793        &self,
3794        venue: Option<&Venue>,
3795        instrument_id: Option<&InstrumentId>,
3796        strategy_id: Option<&StrategyId>,
3797        account_id: Option<&AccountId>,
3798    ) -> Cow<'_, AHashSet<PositionId>> {
3799        self.view_positions_in_bucket(
3800            &self.index.positions_closed,
3801            venue,
3802            instrument_id,
3803            strategy_id,
3804            account_id,
3805        )
3806    }
3807
3808    /// Returns a lazy iterator yielding [`ClientOrderId`]s of all orders matching the optional
3809    /// filter parameters.
3810    ///
3811    /// Avoids the [`AHashSet`] allocation performed by [`Self::client_order_ids`]. Useful when
3812    /// the caller iterates the result once and discards it.
3813    pub fn iter_client_order_ids(
3814        &self,
3815        venue: Option<&Venue>,
3816        instrument_id: Option<&InstrumentId>,
3817        strategy_id: Option<&StrategyId>,
3818        account_id: Option<&AccountId>,
3819    ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3820        self.iter_orders_in_bucket(
3821            &self.index.orders,
3822            venue,
3823            instrument_id,
3824            strategy_id,
3825            account_id,
3826        )
3827    }
3828
3829    /// Returns a lazy iterator yielding [`ClientOrderId`]s of all open orders.
3830    pub fn iter_client_order_ids_open(
3831        &self,
3832        venue: Option<&Venue>,
3833        instrument_id: Option<&InstrumentId>,
3834        strategy_id: Option<&StrategyId>,
3835        account_id: Option<&AccountId>,
3836    ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3837        self.iter_orders_in_bucket(
3838            &self.index.orders_open,
3839            venue,
3840            instrument_id,
3841            strategy_id,
3842            account_id,
3843        )
3844    }
3845
3846    /// Returns a lazy iterator yielding [`ClientOrderId`]s of all closed orders.
3847    pub fn iter_client_order_ids_closed(
3848        &self,
3849        venue: Option<&Venue>,
3850        instrument_id: Option<&InstrumentId>,
3851        strategy_id: Option<&StrategyId>,
3852        account_id: Option<&AccountId>,
3853    ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3854        self.iter_orders_in_bucket(
3855            &self.index.orders_closed,
3856            venue,
3857            instrument_id,
3858            strategy_id,
3859            account_id,
3860        )
3861    }
3862
3863    /// Returns a lazy iterator yielding [`ClientOrderId`]s of all locally active orders.
3864    pub fn iter_client_order_ids_active_local(
3865        &self,
3866        venue: Option<&Venue>,
3867        instrument_id: Option<&InstrumentId>,
3868        strategy_id: Option<&StrategyId>,
3869        account_id: Option<&AccountId>,
3870    ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3871        self.iter_orders_in_bucket(
3872            &self.index.orders_active_local,
3873            venue,
3874            instrument_id,
3875            strategy_id,
3876            account_id,
3877        )
3878    }
3879
3880    /// Returns a lazy iterator yielding [`ClientOrderId`]s of all emulated orders.
3881    pub fn iter_client_order_ids_emulated(
3882        &self,
3883        venue: Option<&Venue>,
3884        instrument_id: Option<&InstrumentId>,
3885        strategy_id: Option<&StrategyId>,
3886        account_id: Option<&AccountId>,
3887    ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3888        self.iter_orders_in_bucket(
3889            &self.index.orders_emulated,
3890            venue,
3891            instrument_id,
3892            strategy_id,
3893            account_id,
3894        )
3895    }
3896
3897    /// Returns a lazy iterator yielding [`ClientOrderId`]s of all in-flight orders.
3898    pub fn iter_client_order_ids_inflight(
3899        &self,
3900        venue: Option<&Venue>,
3901        instrument_id: Option<&InstrumentId>,
3902        strategy_id: Option<&StrategyId>,
3903        account_id: Option<&AccountId>,
3904    ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3905        self.iter_orders_in_bucket(
3906            &self.index.orders_inflight,
3907            venue,
3908            instrument_id,
3909            strategy_id,
3910            account_id,
3911        )
3912    }
3913
3914    /// Returns a lazy iterator yielding [`PositionId`]s of all positions matching the filters.
3915    pub fn iter_position_ids(
3916        &self,
3917        venue: Option<&Venue>,
3918        instrument_id: Option<&InstrumentId>,
3919        strategy_id: Option<&StrategyId>,
3920        account_id: Option<&AccountId>,
3921    ) -> Box<dyn Iterator<Item = PositionId> + '_> {
3922        self.iter_positions_in_bucket(
3923            &self.index.positions,
3924            venue,
3925            instrument_id,
3926            strategy_id,
3927            account_id,
3928        )
3929    }
3930
3931    /// Returns a lazy iterator yielding [`PositionId`]s of all open positions.
3932    pub fn iter_position_open_ids(
3933        &self,
3934        venue: Option<&Venue>,
3935        instrument_id: Option<&InstrumentId>,
3936        strategy_id: Option<&StrategyId>,
3937        account_id: Option<&AccountId>,
3938    ) -> Box<dyn Iterator<Item = PositionId> + '_> {
3939        self.iter_positions_in_bucket(
3940            &self.index.positions_open,
3941            venue,
3942            instrument_id,
3943            strategy_id,
3944            account_id,
3945        )
3946    }
3947
3948    /// Returns a lazy iterator yielding [`PositionId`]s of all closed positions.
3949    pub fn iter_position_closed_ids(
3950        &self,
3951        venue: Option<&Venue>,
3952        instrument_id: Option<&InstrumentId>,
3953        strategy_id: Option<&StrategyId>,
3954        account_id: Option<&AccountId>,
3955    ) -> Box<dyn Iterator<Item = PositionId> + '_> {
3956        self.iter_positions_in_bucket(
3957            &self.index.positions_closed,
3958            venue,
3959            instrument_id,
3960            strategy_id,
3961            account_id,
3962        )
3963    }
3964
3965    /// Returns the `ComponentId`s of all actors.
3966    #[must_use]
3967    pub fn actor_ids(&self) -> AHashSet<ComponentId> {
3968        self.index.actors.clone()
3969    }
3970
3971    /// Returns the `StrategyId`s of all strategies.
3972    #[must_use]
3973    pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
3974        self.index.strategies.clone()
3975    }
3976
3977    /// Returns the `ExecAlgorithmId`s of all execution algorithms.
3978    #[must_use]
3979    pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
3980        self.index.exec_algorithms.clone()
3981    }
3982
3983    // -- ORDER QUERIES ---------------------------------------------------------------------------
3984
3985    /// Gets a borrow of the order with the `client_order_id` (if found).
3986    ///
3987    /// The returned [`OrderRef`] is tied to the cache borrow's scope and panics at runtime if
3988    /// held across a mutation of the same order. Drop the borrow before dispatching events; if
3989    /// post-event state is required, perform a fresh lookup. Use [`Self::order_owned`] when an
3990    /// owned snapshot is needed for a boundary handover.
3991    #[must_use]
3992    pub fn order(&self, client_order_id: &ClientOrderId) -> Option<OrderRef<'_>> {
3993        self.orders
3994            .get(client_order_id)
3995            .map(|order_cell| OrderRef::new(order_cell.borrow()))
3996    }
3997
3998    /// Gets an exclusive write borrow of the order with the `client_order_id` (if found).
3999    ///
4000    /// Requires `&mut Cache` so cache writes are reachable only by privileged crates that hold
4001    /// `Rc<RefCell<Cache>>` directly. Adapter-facing code receives [`CacheView`], which only
4002    /// exposes immutable cache borrows and therefore cannot reach this method.
4003    ///
4004    /// While the returned [`OrderRefMut`] is alive, no other read or write of the same order is
4005    /// permitted. Drop the borrow before dispatching events or taking any other cache borrow that
4006    /// may re-enter the same order.
4007    #[must_use]
4008    pub fn order_mut(&mut self, client_order_id: &ClientOrderId) -> Option<OrderRefMut<'_>> {
4009        self.orders
4010            .get(client_order_id)
4011            .map(|order_cell| OrderRefMut::new(order_cell.borrow_mut()))
4012    }
4013
4014    /// Gets an owned snapshot of the order with the `client_order_id` (if found).
4015    ///
4016    /// Use when downstream needs an owned [`OrderAny`] that crosses a boundary (for example, an
4017    /// adapter `get_order` API). The snapshot will not reflect later cache mutations.
4018    #[must_use]
4019    pub fn order_owned(&self, client_order_id: &ClientOrderId) -> Option<OrderAny> {
4020        self.orders
4021            .get(client_order_id)
4022            .map(|order_cell| order_cell.borrow().clone())
4023    }
4024
4025    /// Gets cloned orders for the given `client_order_ids`, logging an error for any missing.
4026    #[must_use]
4027    pub fn orders_for_ids(
4028        &self,
4029        client_order_ids: &[ClientOrderId],
4030        context: &dyn Display,
4031    ) -> Vec<OrderAny> {
4032        let mut orders = Vec::with_capacity(client_order_ids.len());
4033        for id in client_order_ids {
4034            match self.orders.get(id) {
4035                Some(order_cell) => orders.push(order_cell.borrow().clone()),
4036                None => log::error!("Order {id} not found in cache for {context}"),
4037            }
4038        }
4039        orders
4040    }
4041
4042    /// Gets a reference to the client order ID for the `venue_order_id` (if found).
4043    #[must_use]
4044    pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
4045        self.index.venue_order_ids.get(venue_order_id)
4046    }
4047
4048    /// Gets a reference to the venue order ID for the `client_order_id` (if found).
4049    #[must_use]
4050    pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
4051        self.index.client_order_ids.get(client_order_id)
4052    }
4053
4054    /// Gets a reference to the client ID indexed for then `client_order_id` (if found).
4055    #[must_use]
4056    pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
4057        self.index.order_client.get(client_order_id)
4058    }
4059
4060    /// Returns borrows of all orders matching the optional filter parameters.
4061    ///
4062    /// Each [`Ref`] in the returned vector borrows its underlying cell; mutating any of
4063    /// those orders while the vector is alive will panic at runtime. Drop the vector
4064    /// before issuing writes.
4065    #[must_use]
4066    pub fn orders(
4067        &self,
4068        venue: Option<&Venue>,
4069        instrument_id: Option<&InstrumentId>,
4070        strategy_id: Option<&StrategyId>,
4071        account_id: Option<&AccountId>,
4072        side: Option<OrderSide>,
4073    ) -> Vec<OrderRef<'_>> {
4074        let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id, account_id);
4075        self.get_orders_for_ids(&client_order_ids, side)
4076    }
4077
4078    /// Returns borrows of all open orders matching the optional filter parameters.
4079    #[must_use]
4080    pub fn orders_open(
4081        &self,
4082        venue: Option<&Venue>,
4083        instrument_id: Option<&InstrumentId>,
4084        strategy_id: Option<&StrategyId>,
4085        account_id: Option<&AccountId>,
4086        side: Option<OrderSide>,
4087    ) -> Vec<OrderRef<'_>> {
4088        let client_order_ids =
4089            self.client_order_ids_open(venue, instrument_id, strategy_id, account_id);
4090        self.get_orders_for_ids(&client_order_ids, side)
4091    }
4092
4093    /// Returns borrows of all closed orders matching the optional filter parameters.
4094    #[must_use]
4095    pub fn orders_closed(
4096        &self,
4097        venue: Option<&Venue>,
4098        instrument_id: Option<&InstrumentId>,
4099        strategy_id: Option<&StrategyId>,
4100        account_id: Option<&AccountId>,
4101        side: Option<OrderSide>,
4102    ) -> Vec<OrderRef<'_>> {
4103        let client_order_ids =
4104            self.client_order_ids_closed(venue, instrument_id, strategy_id, account_id);
4105        self.get_orders_for_ids(&client_order_ids, side)
4106    }
4107
4108    /// Returns borrows of all locally active orders matching the optional filter parameters.
4109    ///
4110    /// Locally active orders are in the `INITIALIZED`, `EMULATED`, or `RELEASED` state
4111    /// (a superset of emulated orders).
4112    #[must_use]
4113    pub fn orders_active_local(
4114        &self,
4115        venue: Option<&Venue>,
4116        instrument_id: Option<&InstrumentId>,
4117        strategy_id: Option<&StrategyId>,
4118        account_id: Option<&AccountId>,
4119        side: Option<OrderSide>,
4120    ) -> Vec<OrderRef<'_>> {
4121        let client_order_ids =
4122            self.client_order_ids_active_local(venue, instrument_id, strategy_id, account_id);
4123        self.get_orders_for_ids(&client_order_ids, side)
4124    }
4125
4126    /// Returns borrows of all emulated orders matching the optional filter parameters.
4127    #[must_use]
4128    pub fn orders_emulated(
4129        &self,
4130        venue: Option<&Venue>,
4131        instrument_id: Option<&InstrumentId>,
4132        strategy_id: Option<&StrategyId>,
4133        account_id: Option<&AccountId>,
4134        side: Option<OrderSide>,
4135    ) -> Vec<OrderRef<'_>> {
4136        let client_order_ids =
4137            self.client_order_ids_emulated(venue, instrument_id, strategy_id, account_id);
4138        self.get_orders_for_ids(&client_order_ids, side)
4139    }
4140
4141    /// Returns borrows of all in-flight orders matching the optional filter parameters.
4142    #[must_use]
4143    pub fn orders_inflight(
4144        &self,
4145        venue: Option<&Venue>,
4146        instrument_id: Option<&InstrumentId>,
4147        strategy_id: Option<&StrategyId>,
4148        account_id: Option<&AccountId>,
4149        side: Option<OrderSide>,
4150    ) -> Vec<OrderRef<'_>> {
4151        let client_order_ids =
4152            self.client_order_ids_inflight(venue, instrument_id, strategy_id, account_id);
4153        self.get_orders_for_ids(&client_order_ids, side)
4154    }
4155
4156    /// Returns borrows of all orders for the `position_id`.
4157    #[must_use]
4158    pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<OrderRef<'_>> {
4159        match self.index.position_orders.get(position_id) {
4160            Some(client_order_ids) => self.get_orders_for_ids(client_order_ids, None),
4161            None => Vec::new(),
4162        }
4163    }
4164
4165    /// Returns whether an order with the `client_order_id` exists.
4166    #[must_use]
4167    pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
4168        self.index.orders.contains(client_order_id)
4169    }
4170
4171    /// Returns whether an order with the `client_order_id` is open.
4172    #[must_use]
4173    pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
4174        self.index.orders_open.contains(client_order_id)
4175    }
4176
4177    /// Returns whether an order with the `client_order_id` is closed.
4178    #[must_use]
4179    pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
4180        self.index.orders_closed.contains(client_order_id)
4181    }
4182
4183    /// Returns whether an order with the `client_order_id` is locally active.
4184    ///
4185    /// Locally active orders are in the `INITIALIZED`, `EMULATED`, or `RELEASED` state
4186    /// (a superset of emulated orders).
4187    #[must_use]
4188    pub fn is_order_active_local(&self, client_order_id: &ClientOrderId) -> bool {
4189        self.index.orders_active_local.contains(client_order_id)
4190    }
4191
4192    /// Returns whether an order with the `client_order_id` is emulated.
4193    #[must_use]
4194    pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
4195        self.index.orders_emulated.contains(client_order_id)
4196    }
4197
4198    /// Returns whether an order with the `client_order_id` is in-flight.
4199    #[must_use]
4200    pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
4201        self.index.orders_inflight.contains(client_order_id)
4202    }
4203
4204    /// Returns whether an order with the `client_order_id` is `PENDING_CANCEL` locally.
4205    #[must_use]
4206    pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
4207        self.index.orders_pending_cancel.contains(client_order_id)
4208    }
4209
4210    /// Returns the count of all open orders.
4211    #[must_use]
4212    pub fn orders_open_count(
4213        &self,
4214        venue: Option<&Venue>,
4215        instrument_id: Option<&InstrumentId>,
4216        strategy_id: Option<&StrategyId>,
4217        account_id: Option<&AccountId>,
4218        side: Option<OrderSide>,
4219    ) -> usize {
4220        self.count_orders_in_bucket(
4221            &self.index.orders_open,
4222            venue,
4223            instrument_id,
4224            strategy_id,
4225            account_id,
4226            side,
4227        )
4228    }
4229
4230    /// Returns the count of all closed orders.
4231    #[must_use]
4232    pub fn orders_closed_count(
4233        &self,
4234        venue: Option<&Venue>,
4235        instrument_id: Option<&InstrumentId>,
4236        strategy_id: Option<&StrategyId>,
4237        account_id: Option<&AccountId>,
4238        side: Option<OrderSide>,
4239    ) -> usize {
4240        self.count_orders_in_bucket(
4241            &self.index.orders_closed,
4242            venue,
4243            instrument_id,
4244            strategy_id,
4245            account_id,
4246            side,
4247        )
4248    }
4249
4250    /// Returns the count of all locally active orders.
4251    ///
4252    /// Locally active orders are in the `INITIALIZED`, `EMULATED`, or `RELEASED` state
4253    /// (a superset of emulated orders).
4254    #[must_use]
4255    pub fn orders_active_local_count(
4256        &self,
4257        venue: Option<&Venue>,
4258        instrument_id: Option<&InstrumentId>,
4259        strategy_id: Option<&StrategyId>,
4260        account_id: Option<&AccountId>,
4261        side: Option<OrderSide>,
4262    ) -> usize {
4263        self.count_orders_in_bucket(
4264            &self.index.orders_active_local,
4265            venue,
4266            instrument_id,
4267            strategy_id,
4268            account_id,
4269            side,
4270        )
4271    }
4272
4273    /// Returns the count of all emulated orders.
4274    #[must_use]
4275    pub fn orders_emulated_count(
4276        &self,
4277        venue: Option<&Venue>,
4278        instrument_id: Option<&InstrumentId>,
4279        strategy_id: Option<&StrategyId>,
4280        account_id: Option<&AccountId>,
4281        side: Option<OrderSide>,
4282    ) -> usize {
4283        self.count_orders_in_bucket(
4284            &self.index.orders_emulated,
4285            venue,
4286            instrument_id,
4287            strategy_id,
4288            account_id,
4289            side,
4290        )
4291    }
4292
4293    /// Returns the count of all in-flight orders.
4294    #[must_use]
4295    pub fn orders_inflight_count(
4296        &self,
4297        venue: Option<&Venue>,
4298        instrument_id: Option<&InstrumentId>,
4299        strategy_id: Option<&StrategyId>,
4300        account_id: Option<&AccountId>,
4301        side: Option<OrderSide>,
4302    ) -> usize {
4303        self.count_orders_in_bucket(
4304            &self.index.orders_inflight,
4305            venue,
4306            instrument_id,
4307            strategy_id,
4308            account_id,
4309            side,
4310        )
4311    }
4312
4313    /// Returns the count of all orders.
4314    #[must_use]
4315    pub fn orders_total_count(
4316        &self,
4317        venue: Option<&Venue>,
4318        instrument_id: Option<&InstrumentId>,
4319        strategy_id: Option<&StrategyId>,
4320        account_id: Option<&AccountId>,
4321        side: Option<OrderSide>,
4322    ) -> usize {
4323        self.count_orders_in_bucket(
4324            &self.index.orders,
4325            venue,
4326            instrument_id,
4327            strategy_id,
4328            account_id,
4329            side,
4330        )
4331    }
4332
4333    /// Returns whether any open order matches the optional filter parameters.
4334    ///
4335    /// Short-circuits on the first match, avoiding the full intersection walk performed by
4336    /// [`Self::orders_open_count`]. Prefer this over `orders_open_count(...) > 0` when only
4337    /// existence matters.
4338    #[must_use]
4339    pub fn has_orders_open(
4340        &self,
4341        venue: Option<&Venue>,
4342        instrument_id: Option<&InstrumentId>,
4343        strategy_id: Option<&StrategyId>,
4344        account_id: Option<&AccountId>,
4345        side: Option<OrderSide>,
4346    ) -> bool {
4347        self.any_orders_in_bucket(
4348            &self.index.orders_open,
4349            venue,
4350            instrument_id,
4351            strategy_id,
4352            account_id,
4353            side,
4354        )
4355    }
4356
4357    /// Returns whether any closed order matches the optional filter parameters.
4358    #[must_use]
4359    pub fn has_orders_closed(
4360        &self,
4361        venue: Option<&Venue>,
4362        instrument_id: Option<&InstrumentId>,
4363        strategy_id: Option<&StrategyId>,
4364        account_id: Option<&AccountId>,
4365        side: Option<OrderSide>,
4366    ) -> bool {
4367        self.any_orders_in_bucket(
4368            &self.index.orders_closed,
4369            venue,
4370            instrument_id,
4371            strategy_id,
4372            account_id,
4373            side,
4374        )
4375    }
4376
4377    /// Returns whether any locally active order matches the optional filter parameters.
4378    ///
4379    /// Locally active orders are in the `INITIALIZED`, `EMULATED`, or `RELEASED` state.
4380    #[must_use]
4381    pub fn has_orders_active_local(
4382        &self,
4383        venue: Option<&Venue>,
4384        instrument_id: Option<&InstrumentId>,
4385        strategy_id: Option<&StrategyId>,
4386        account_id: Option<&AccountId>,
4387        side: Option<OrderSide>,
4388    ) -> bool {
4389        self.any_orders_in_bucket(
4390            &self.index.orders_active_local,
4391            venue,
4392            instrument_id,
4393            strategy_id,
4394            account_id,
4395            side,
4396        )
4397    }
4398
4399    /// Returns whether any emulated order matches the optional filter parameters.
4400    #[must_use]
4401    pub fn has_orders_emulated(
4402        &self,
4403        venue: Option<&Venue>,
4404        instrument_id: Option<&InstrumentId>,
4405        strategy_id: Option<&StrategyId>,
4406        account_id: Option<&AccountId>,
4407        side: Option<OrderSide>,
4408    ) -> bool {
4409        self.any_orders_in_bucket(
4410            &self.index.orders_emulated,
4411            venue,
4412            instrument_id,
4413            strategy_id,
4414            account_id,
4415            side,
4416        )
4417    }
4418
4419    /// Returns whether any in-flight order matches the optional filter parameters.
4420    #[must_use]
4421    pub fn has_orders_inflight(
4422        &self,
4423        venue: Option<&Venue>,
4424        instrument_id: Option<&InstrumentId>,
4425        strategy_id: Option<&StrategyId>,
4426        account_id: Option<&AccountId>,
4427        side: Option<OrderSide>,
4428    ) -> bool {
4429        self.any_orders_in_bucket(
4430            &self.index.orders_inflight,
4431            venue,
4432            instrument_id,
4433            strategy_id,
4434            account_id,
4435            side,
4436        )
4437    }
4438
4439    /// Returns whether any order (in any state) matches the optional filter parameters.
4440    #[must_use]
4441    pub fn has_orders(
4442        &self,
4443        venue: Option<&Venue>,
4444        instrument_id: Option<&InstrumentId>,
4445        strategy_id: Option<&StrategyId>,
4446        account_id: Option<&AccountId>,
4447        side: Option<OrderSide>,
4448    ) -> bool {
4449        self.any_orders_in_bucket(
4450            &self.index.orders,
4451            venue,
4452            instrument_id,
4453            strategy_id,
4454            account_id,
4455            side,
4456        )
4457    }
4458
4459    /// Returns the order list for the `order_list_id`.
4460    #[must_use]
4461    pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
4462        self.order_lists.get(order_list_id)
4463    }
4464
4465    /// Returns all order lists matching the optional filter parameters.
4466    #[must_use]
4467    pub fn order_lists(
4468        &self,
4469        venue: Option<&Venue>,
4470        instrument_id: Option<&InstrumentId>,
4471        strategy_id: Option<&StrategyId>,
4472        account_id: Option<&AccountId>,
4473    ) -> Vec<&OrderList> {
4474        let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
4475
4476        if let Some(venue) = venue {
4477            order_lists.retain(|ol| &ol.instrument_id.venue == venue);
4478        }
4479
4480        if let Some(instrument_id) = instrument_id {
4481            order_lists.retain(|ol| &ol.instrument_id == instrument_id);
4482        }
4483
4484        if let Some(strategy_id) = strategy_id {
4485            order_lists.retain(|ol| &ol.strategy_id == strategy_id);
4486        }
4487
4488        if let Some(account_id) = account_id {
4489            order_lists.retain(|ol| {
4490                ol.client_order_ids.iter().any(|client_order_id| {
4491                    self.orders.get(client_order_id).is_some_and(|order_cell| {
4492                        order_cell.borrow().account_id().as_ref() == Some(account_id)
4493                    })
4494                })
4495            });
4496        }
4497
4498        order_lists
4499    }
4500
4501    /// Returns whether an order list with the `order_list_id` exists.
4502    #[must_use]
4503    pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
4504        self.order_lists.contains_key(order_list_id)
4505    }
4506
4507    // -- EXEC ALGORITHM QUERIES ------------------------------------------------------------------
4508
4509    /// Returns references to all orders associated with the `exec_algorithm_id` matching the
4510    /// optional filter parameters.
4511    #[must_use]
4512    pub fn orders_for_exec_algorithm(
4513        &self,
4514        exec_algorithm_id: &ExecAlgorithmId,
4515        venue: Option<&Venue>,
4516        instrument_id: Option<&InstrumentId>,
4517        strategy_id: Option<&StrategyId>,
4518        account_id: Option<&AccountId>,
4519        side: Option<OrderSide>,
4520    ) -> Vec<OrderRef<'_>> {
4521        let Some(exec_algorithm_order_ids) =
4522            self.index.exec_algorithm_orders.get(exec_algorithm_id)
4523        else {
4524            return Vec::new();
4525        };
4526
4527        let filtered = self.query_orders_in_bucket(
4528            exec_algorithm_order_ids,
4529            venue,
4530            instrument_id,
4531            strategy_id,
4532            account_id,
4533        );
4534        self.get_orders_for_ids(&filtered, side)
4535    }
4536
4537    /// Returns references to all orders with the `exec_spawn_id`.
4538    #[must_use]
4539    pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<OrderRef<'_>> {
4540        match self.index.exec_spawn_orders.get(exec_spawn_id) {
4541            Some(ids) => self.get_orders_for_ids(ids, None),
4542            None => Vec::new(),
4543        }
4544    }
4545
4546    /// Returns the total order quantity for the `exec_spawn_id`.
4547    #[must_use]
4548    pub fn exec_spawn_total_quantity(
4549        &self,
4550        exec_spawn_id: &ClientOrderId,
4551        active_only: bool,
4552    ) -> Option<Quantity> {
4553        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
4554
4555        let mut total_quantity: Option<Quantity> = None;
4556
4557        for spawn_order in exec_spawn_orders {
4558            if active_only && spawn_order.is_closed() {
4559                continue;
4560            }
4561
4562            match total_quantity.as_mut() {
4563                Some(total) => *total = *total + spawn_order.quantity(),
4564                None => total_quantity = Some(spawn_order.quantity()),
4565            }
4566        }
4567
4568        total_quantity
4569    }
4570
4571    /// Returns the total filled quantity for all orders with the `exec_spawn_id`.
4572    #[must_use]
4573    pub fn exec_spawn_total_filled_qty(
4574        &self,
4575        exec_spawn_id: &ClientOrderId,
4576        active_only: bool,
4577    ) -> Option<Quantity> {
4578        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
4579
4580        let mut total_quantity: Option<Quantity> = None;
4581
4582        for spawn_order in exec_spawn_orders {
4583            if active_only && spawn_order.is_closed() {
4584                continue;
4585            }
4586
4587            match total_quantity.as_mut() {
4588                Some(total) => *total = *total + spawn_order.filled_qty(),
4589                None => total_quantity = Some(spawn_order.filled_qty()),
4590            }
4591        }
4592
4593        total_quantity
4594    }
4595
4596    /// Returns the total leaves quantity for all orders with the `exec_spawn_id`.
4597    #[must_use]
4598    pub fn exec_spawn_total_leaves_qty(
4599        &self,
4600        exec_spawn_id: &ClientOrderId,
4601        active_only: bool,
4602    ) -> Option<Quantity> {
4603        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
4604
4605        let mut total_quantity: Option<Quantity> = None;
4606
4607        for spawn_order in exec_spawn_orders {
4608            if active_only && spawn_order.is_closed() {
4609                continue;
4610            }
4611
4612            match total_quantity.as_mut() {
4613                Some(total) => *total = *total + spawn_order.leaves_qty(),
4614                None => total_quantity = Some(spawn_order.leaves_qty()),
4615            }
4616        }
4617
4618        total_quantity
4619    }
4620
4621    // -- POSITION QUERIES ------------------------------------------------------------------------
4622
4623    /// Returns a borrow of the position with the `position_id` (if found).
4624    #[must_use]
4625    pub fn position(&self, position_id: &PositionId) -> Option<PositionRef<'_>> {
4626        self.positions
4627            .get(position_id)
4628            .map(|position_cell| PositionRef::new(position_cell.borrow()))
4629    }
4630
4631    /// Gets an exclusive write borrow of the position with the `position_id` (if found).
4632    ///
4633    /// Requires `&mut Cache` so cache writes are reachable only by privileged crates that hold
4634    /// `Rc<RefCell<Cache>>` directly. Adapter-facing code receives [`CacheView`], which only
4635    /// exposes immutable cache borrows and therefore cannot reach this method.
4636    ///
4637    /// While the returned [`PositionRefMut`] is alive, no other read or write of the same position
4638    /// is permitted. Drop the borrow before dispatching events or taking any other cache borrow
4639    /// that may re-enter the same position.
4640    #[must_use]
4641    pub fn position_mut(&mut self, position_id: &PositionId) -> Option<PositionRefMut<'_>> {
4642        self.positions
4643            .get(position_id)
4644            .map(|position_cell| PositionRefMut::new(position_cell.borrow_mut()))
4645    }
4646
4647    /// Gets an owned snapshot of the position with the `position_id` (if found).
4648    ///
4649    /// Use when downstream needs an owned [`Position`] that crosses a boundary. The snapshot will
4650    /// not reflect later cache mutations.
4651    #[must_use]
4652    pub fn position_owned(&self, position_id: &PositionId) -> Option<Position> {
4653        self.positions
4654            .get(position_id)
4655            .map(|position_cell| position_cell.borrow().clone())
4656    }
4657
4658    /// Returns a borrow of the position for the `client_order_id` (if found).
4659    #[must_use]
4660    pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<PositionRef<'_>> {
4661        self.index
4662            .order_position
4663            .get(client_order_id)
4664            .and_then(|position_id| self.positions.get(position_id))
4665            .map(|position_cell| PositionRef::new(position_cell.borrow()))
4666    }
4667
4668    /// Returns a reference to the position ID for the `client_order_id` (if found).
4669    #[must_use]
4670    pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
4671        self.index.order_position.get(client_order_id)
4672    }
4673
4674    /// Returns borrows of all positions matching the optional filter parameters.
4675    ///
4676    /// Each [`PositionRef`] in the returned vector borrows its underlying cell; mutating any of
4677    /// those positions while the vector is alive will panic at runtime. Drop the vector before
4678    /// issuing writes.
4679    #[must_use]
4680    pub fn positions(
4681        &self,
4682        venue: Option<&Venue>,
4683        instrument_id: Option<&InstrumentId>,
4684        strategy_id: Option<&StrategyId>,
4685        account_id: Option<&AccountId>,
4686        side: Option<PositionSide>,
4687    ) -> Vec<PositionRef<'_>> {
4688        let position_ids = self.position_ids(venue, instrument_id, strategy_id, account_id);
4689        self.get_positions_for_ids(&position_ids, side)
4690    }
4691
4692    /// Returns borrows of all open positions matching the optional filter parameters.
4693    #[must_use]
4694    pub fn positions_open(
4695        &self,
4696        venue: Option<&Venue>,
4697        instrument_id: Option<&InstrumentId>,
4698        strategy_id: Option<&StrategyId>,
4699        account_id: Option<&AccountId>,
4700        side: Option<PositionSide>,
4701    ) -> Vec<PositionRef<'_>> {
4702        let position_ids = self.position_open_ids(venue, instrument_id, strategy_id, account_id);
4703        self.get_positions_for_ids(&position_ids, side)
4704    }
4705
4706    /// Returns borrows of all closed positions matching the optional filter parameters.
4707    #[must_use]
4708    pub fn positions_closed(
4709        &self,
4710        venue: Option<&Venue>,
4711        instrument_id: Option<&InstrumentId>,
4712        strategy_id: Option<&StrategyId>,
4713        account_id: Option<&AccountId>,
4714        side: Option<PositionSide>,
4715    ) -> Vec<PositionRef<'_>> {
4716        let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id, account_id);
4717        self.get_positions_for_ids(&position_ids, side)
4718    }
4719
4720    /// Returns whether a position with the `position_id` exists.
4721    #[must_use]
4722    pub fn position_exists(&self, position_id: &PositionId) -> bool {
4723        self.index.positions.contains(position_id)
4724    }
4725
4726    /// Returns whether a position with the `position_id` is open.
4727    #[must_use]
4728    pub fn is_position_open(&self, position_id: &PositionId) -> bool {
4729        self.index.positions_open.contains(position_id)
4730    }
4731
4732    /// Returns whether a position with the `position_id` is closed.
4733    #[must_use]
4734    pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
4735        self.index.positions_closed.contains(position_id)
4736    }
4737
4738    /// Returns the count of all open positions.
4739    #[must_use]
4740    pub fn positions_open_count(
4741        &self,
4742        venue: Option<&Venue>,
4743        instrument_id: Option<&InstrumentId>,
4744        strategy_id: Option<&StrategyId>,
4745        account_id: Option<&AccountId>,
4746        side: Option<PositionSide>,
4747    ) -> usize {
4748        self.count_positions_in_bucket(
4749            &self.index.positions_open,
4750            venue,
4751            instrument_id,
4752            strategy_id,
4753            account_id,
4754            side,
4755        )
4756    }
4757
4758    /// Returns the count of all closed positions.
4759    #[must_use]
4760    pub fn positions_closed_count(
4761        &self,
4762        venue: Option<&Venue>,
4763        instrument_id: Option<&InstrumentId>,
4764        strategy_id: Option<&StrategyId>,
4765        account_id: Option<&AccountId>,
4766        side: Option<PositionSide>,
4767    ) -> usize {
4768        self.count_positions_in_bucket(
4769            &self.index.positions_closed,
4770            venue,
4771            instrument_id,
4772            strategy_id,
4773            account_id,
4774            side,
4775        )
4776    }
4777
4778    /// Returns the count of all positions.
4779    #[must_use]
4780    pub fn positions_total_count(
4781        &self,
4782        venue: Option<&Venue>,
4783        instrument_id: Option<&InstrumentId>,
4784        strategy_id: Option<&StrategyId>,
4785        account_id: Option<&AccountId>,
4786        side: Option<PositionSide>,
4787    ) -> usize {
4788        self.count_positions_in_bucket(
4789            &self.index.positions,
4790            venue,
4791            instrument_id,
4792            strategy_id,
4793            account_id,
4794            side,
4795        )
4796    }
4797
4798    /// Returns whether any open position matches the optional filter parameters.
4799    ///
4800    /// Short-circuits on the first match, avoiding the full intersection walk performed by
4801    /// [`Self::positions_open_count`]. Prefer this over `positions_open_count(...) > 0` when
4802    /// only existence matters.
4803    #[must_use]
4804    pub fn has_positions_open(
4805        &self,
4806        venue: Option<&Venue>,
4807        instrument_id: Option<&InstrumentId>,
4808        strategy_id: Option<&StrategyId>,
4809        account_id: Option<&AccountId>,
4810        side: Option<PositionSide>,
4811    ) -> bool {
4812        self.any_positions_in_bucket(
4813            &self.index.positions_open,
4814            venue,
4815            instrument_id,
4816            strategy_id,
4817            account_id,
4818            side,
4819        )
4820    }
4821
4822    /// Returns whether any closed position matches the optional filter parameters.
4823    #[must_use]
4824    pub fn has_positions_closed(
4825        &self,
4826        venue: Option<&Venue>,
4827        instrument_id: Option<&InstrumentId>,
4828        strategy_id: Option<&StrategyId>,
4829        account_id: Option<&AccountId>,
4830        side: Option<PositionSide>,
4831    ) -> bool {
4832        self.any_positions_in_bucket(
4833            &self.index.positions_closed,
4834            venue,
4835            instrument_id,
4836            strategy_id,
4837            account_id,
4838            side,
4839        )
4840    }
4841
4842    /// Returns whether any position (open or closed) matches the optional filter parameters.
4843    #[must_use]
4844    pub fn has_positions(
4845        &self,
4846        venue: Option<&Venue>,
4847        instrument_id: Option<&InstrumentId>,
4848        strategy_id: Option<&StrategyId>,
4849        account_id: Option<&AccountId>,
4850        side: Option<PositionSide>,
4851    ) -> bool {
4852        self.any_positions_in_bucket(
4853            &self.index.positions,
4854            venue,
4855            instrument_id,
4856            strategy_id,
4857            account_id,
4858            side,
4859        )
4860    }
4861
4862    // -- STRATEGY QUERIES ------------------------------------------------------------------------
4863
4864    /// Gets a reference to the strategy ID for the `client_order_id` (if found).
4865    #[must_use]
4866    pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
4867        self.index.order_strategy.get(client_order_id)
4868    }
4869
4870    /// Gets a reference to the strategy ID for the `position_id` (if found).
4871    #[must_use]
4872    pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
4873        self.index.position_strategy.get(position_id)
4874    }
4875
4876    // -- GENERAL ---------------------------------------------------------------------------------
4877
4878    /// Gets a reference to the general value for the `key` (if found).
4879    ///
4880    /// # Errors
4881    ///
4882    /// Returns an error if the `key` is invalid.
4883    pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
4884        check_valid_string_ascii(key, stringify!(key))?;
4885
4886        Ok(self.general.get(key))
4887    }
4888
4889    // -- DATA QUERIES ----------------------------------------------------------------------------
4890
4891    /// Returns the price for the `instrument_id` and `price_type` (if found).
4892    #[must_use]
4893    pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
4894        match price_type {
4895            PriceType::Bid => self
4896                .quotes
4897                .get(instrument_id)
4898                .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
4899            PriceType::Ask => self
4900                .quotes
4901                .get(instrument_id)
4902                .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
4903            PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
4904                quotes.front().map(|quote| {
4905                    Price::new(
4906                        f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
4907                        quote.bid_price.precision + 1,
4908                    )
4909                })
4910            }),
4911            PriceType::Last => self
4912                .trades
4913                .get(instrument_id)
4914                .and_then(|trades| trades.front().map(|trade| trade.price)),
4915            PriceType::Mark => self
4916                .mark_prices
4917                .get(instrument_id)
4918                .and_then(|marks| marks.front().map(|mark| mark.value)),
4919        }
4920    }
4921
4922    /// Gets all quotes for the `instrument_id`.
4923    #[must_use]
4924    pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
4925        self.quotes
4926            .get(instrument_id)
4927            .map(|quotes| quotes.iter().copied().collect())
4928    }
4929
4930    /// Gets all trades for the `instrument_id`.
4931    #[must_use]
4932    pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
4933        self.trades
4934            .get(instrument_id)
4935            .map(|trades| trades.iter().copied().collect())
4936    }
4937
4938    /// Gets all mark price updates for the `instrument_id`.
4939    #[must_use]
4940    pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
4941        self.mark_prices
4942            .get(instrument_id)
4943            .map(|mark_prices| mark_prices.iter().copied().collect())
4944    }
4945
4946    /// Gets all index price updates for the `instrument_id`.
4947    #[must_use]
4948    pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
4949        self.index_prices
4950            .get(instrument_id)
4951            .map(|index_prices| index_prices.iter().copied().collect())
4952    }
4953
4954    /// Gets all funding rate updates for the `instrument_id`.
4955    #[must_use]
4956    pub fn funding_rates(&self, instrument_id: &InstrumentId) -> Option<Vec<FundingRateUpdate>> {
4957        self.funding_rates
4958            .get(instrument_id)
4959            .map(|funding_rates| funding_rates.iter().copied().collect())
4960    }
4961
4962    /// Gets all instrument status updates for the `instrument_id`.
4963    #[must_use]
4964    pub fn instrument_statuses(
4965        &self,
4966        instrument_id: &InstrumentId,
4967    ) -> Option<Vec<InstrumentStatus>> {
4968        self.instrument_statuses
4969            .get(instrument_id)
4970            .map(|statuses| statuses.iter().copied().collect())
4971    }
4972
4973    /// Gets all bars for the `bar_type`.
4974    #[must_use]
4975    pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
4976        self.bars
4977            .get(bar_type)
4978            .map(|bars| bars.iter().copied().collect())
4979    }
4980
4981    /// Gets a reference to the order book for the `instrument_id`.
4982    #[must_use]
4983    pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
4984        self.books.get(instrument_id)
4985    }
4986
4987    /// Gets a reference to the order book for the `instrument_id`.
4988    #[must_use]
4989    pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
4990        self.books.get_mut(instrument_id)
4991    }
4992
4993    /// Gets a reference to the own order book for the `instrument_id`.
4994    #[must_use]
4995    pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
4996        self.own_books.get(instrument_id)
4997    }
4998
4999    /// Gets a reference to the own order book for the `instrument_id`.
5000    #[must_use]
5001    pub fn own_order_book_mut(
5002        &mut self,
5003        instrument_id: &InstrumentId,
5004    ) -> Option<&mut OwnOrderBook> {
5005        self.own_books.get_mut(instrument_id)
5006    }
5007
5008    /// Gets a reference to the latest quote for the `instrument_id`.
5009    #[must_use]
5010    pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
5011        self.quotes
5012            .get(instrument_id)
5013            .and_then(|quotes| quotes.front())
5014    }
5015
5016    /// Gets a reference to the quote at `index` for the `instrument_id`.
5017    ///
5018    /// Index 0 is the most recent.
5019    #[must_use]
5020    pub fn quote_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<&QuoteTick> {
5021        self.quotes
5022            .get(instrument_id)
5023            .and_then(|quotes| quotes.get(index))
5024    }
5025
5026    /// Gets a reference to the latest trade for the `instrument_id`.
5027    #[must_use]
5028    pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
5029        self.trades
5030            .get(instrument_id)
5031            .and_then(|trades| trades.front())
5032    }
5033
5034    /// Gets a reference to the trade at `index` for the `instrument_id`.
5035    ///
5036    /// Index 0 is the most recent.
5037    #[must_use]
5038    pub fn trade_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<&TradeTick> {
5039        self.trades
5040            .get(instrument_id)
5041            .and_then(|trades| trades.get(index))
5042    }
5043
5044    /// Gets a reference to the latest mark price update for the `instrument_id`.
5045    #[must_use]
5046    pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
5047        self.mark_prices
5048            .get(instrument_id)
5049            .and_then(|mark_prices| mark_prices.front())
5050    }
5051
5052    /// Gets a reference to the latest index price update for the `instrument_id`.
5053    #[must_use]
5054    pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
5055        self.index_prices
5056            .get(instrument_id)
5057            .and_then(|index_prices| index_prices.front())
5058    }
5059
5060    /// Gets a reference to the latest funding rate update for the `instrument_id`.
5061    #[must_use]
5062    pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
5063        self.funding_rates
5064            .get(instrument_id)
5065            .and_then(|funding_rates| funding_rates.front())
5066    }
5067
5068    /// Gets a reference to the latest instrument status update for the `instrument_id`.
5069    #[must_use]
5070    pub fn instrument_status(&self, instrument_id: &InstrumentId) -> Option<&InstrumentStatus> {
5071        self.instrument_statuses
5072            .get(instrument_id)
5073            .and_then(|statuses| statuses.front())
5074    }
5075
5076    /// Gets a reference to the latest bar for the `bar_type`.
5077    #[must_use]
5078    pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
5079        self.bars.get(bar_type).and_then(|bars| bars.front())
5080    }
5081
5082    /// Gets a reference to the bar at `index` for the `bar_type`.
5083    ///
5084    /// Index 0 is the most recent.
5085    #[must_use]
5086    pub fn bar_at_index(&self, bar_type: &BarType, index: usize) -> Option<&Bar> {
5087        self.bars.get(bar_type).and_then(|bars| bars.get(index))
5088    }
5089
5090    /// Gets the order book update count for the `instrument_id`.
5091    #[must_use]
5092    pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
5093        self.books
5094            .get(instrument_id)
5095            .map_or(0, |book| book.update_count) as usize
5096    }
5097
5098    /// Gets the quote tick count for the `instrument_id`.
5099    #[must_use]
5100    pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
5101        self.quotes
5102            .get(instrument_id)
5103            .map_or(0, std::collections::VecDeque::len)
5104    }
5105
5106    /// Gets the trade tick count for the `instrument_id`.
5107    #[must_use]
5108    pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
5109        self.trades
5110            .get(instrument_id)
5111            .map_or(0, std::collections::VecDeque::len)
5112    }
5113
5114    /// Gets the bar count for the `instrument_id`.
5115    #[must_use]
5116    pub fn bar_count(&self, bar_type: &BarType) -> usize {
5117        self.bars
5118            .get(bar_type)
5119            .map_or(0, std::collections::VecDeque::len)
5120    }
5121
5122    /// Returns whether the cache contains an order book for the `instrument_id`.
5123    #[must_use]
5124    pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
5125        self.books.contains_key(instrument_id)
5126    }
5127
5128    /// Returns whether the cache contains quotes for the `instrument_id`.
5129    #[must_use]
5130    pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
5131        self.quote_count(instrument_id) > 0
5132    }
5133
5134    /// Returns whether the cache contains trades for the `instrument_id`.
5135    #[must_use]
5136    pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
5137        self.trade_count(instrument_id) > 0
5138    }
5139
5140    /// Returns whether the cache contains bars for the `bar_type`.
5141    #[must_use]
5142    pub fn has_bars(&self, bar_type: &BarType) -> bool {
5143        self.bar_count(bar_type) > 0
5144    }
5145
5146    #[must_use]
5147    pub fn get_xrate(
5148        &self,
5149        venue: Venue,
5150        from_currency: Currency,
5151        to_currency: Currency,
5152        price_type: PriceType,
5153    ) -> Option<f64> {
5154        if from_currency == to_currency {
5155            // When the source and target currencies are identical,
5156            // no conversion is needed; return an exchange rate of 1.0.
5157            return Some(1.0);
5158        }
5159
5160        let (bid_quote, ask_quote) = self.build_quote_table(&venue);
5161
5162        match get_exchange_rate(
5163            from_currency.code,
5164            to_currency.code,
5165            price_type,
5166            bid_quote,
5167            ask_quote,
5168        ) {
5169            Ok(rate) => rate,
5170            Err(e) => {
5171                log::error!("Failed to calculate xrate: {e}");
5172                None
5173            }
5174        }
5175    }
5176
5177    fn build_quote_table(&self, venue: &Venue) -> (AHashMap<String, f64>, AHashMap<String, f64>) {
5178        let mut bid_quotes = AHashMap::new();
5179        let mut ask_quotes = AHashMap::new();
5180
5181        for instrument_id in self.instruments.keys() {
5182            if instrument_id.venue != *venue {
5183                continue;
5184            }
5185
5186            let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
5187                if let Some(tick) = ticks.front() {
5188                    (tick.bid_price, tick.ask_price)
5189                } else {
5190                    continue; // Empty ticks vector
5191                }
5192            } else {
5193                let bid_bar = self
5194                    .bars
5195                    .iter()
5196                    .find(|(k, _)| {
5197                        k.instrument_id() == *instrument_id
5198                            && matches!(k.spec().price_type, PriceType::Bid)
5199                    })
5200                    .map(|(_, v)| v);
5201
5202                let ask_bar = self
5203                    .bars
5204                    .iter()
5205                    .find(|(k, _)| {
5206                        k.instrument_id() == *instrument_id
5207                            && matches!(k.spec().price_type, PriceType::Ask)
5208                    })
5209                    .map(|(_, v)| v);
5210
5211                match (bid_bar, ask_bar) {
5212                    (Some(bid), Some(ask)) => {
5213                        match (bid.front(), ask.front()) {
5214                            (Some(bid_bar), Some(ask_bar)) => (bid_bar.close, ask_bar.close),
5215                            _ => {
5216                                // Empty bar VecDeques
5217                                continue;
5218                            }
5219                        }
5220                    }
5221                    _ => continue,
5222                }
5223            };
5224
5225            bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
5226            ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
5227        }
5228
5229        (bid_quotes, ask_quotes)
5230    }
5231
5232    /// Returns the mark exchange rate for the given currency pair, or `None` if not set.
5233    #[must_use]
5234    pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
5235        self.mark_xrates.get(&(from_currency, to_currency)).copied()
5236    }
5237
5238    /// Sets the mark exchange rate for the given currency pair and automatically sets the inverse rate.
5239    ///
5240    /// # Panics
5241    ///
5242    /// Panics if `xrate` is not positive.
5243    pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
5244        assert!(xrate > 0.0, "xrate was zero");
5245        self.mark_xrates.insert((from_currency, to_currency), xrate);
5246        self.mark_xrates
5247            .insert((to_currency, from_currency), 1.0 / xrate);
5248    }
5249
5250    /// Clears the mark exchange rate for the given currency pair.
5251    pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
5252        let _ = self.mark_xrates.remove(&(from_currency, to_currency));
5253    }
5254
5255    /// Clears all mark exchange rates.
5256    pub fn clear_mark_xrates(&mut self) {
5257        self.mark_xrates.clear();
5258    }
5259
5260    // -- INSTRUMENT QUERIES ----------------------------------------------------------------------
5261
5262    /// Returns a reference to the instrument for the `instrument_id` (if found).
5263    #[must_use]
5264    pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
5265        self.instruments.get(instrument_id)
5266    }
5267
5268    /// Returns references to all instrument IDs for the `venue`.
5269    #[must_use]
5270    pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
5271        match venue {
5272            Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
5273            None => self.instruments.keys().collect(),
5274        }
5275    }
5276
5277    /// Returns references to all instruments for the `venue`.
5278    #[must_use]
5279    pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
5280        self.instruments
5281            .values()
5282            .filter(|i| &i.id().venue == venue)
5283            .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
5284            .collect()
5285    }
5286
5287    /// Returns references to all instruments for the `venue` whose underlying
5288    /// equals `root` and whose [`InstrumentClass`] equals `class`.
5289    ///
5290    /// Use when expanding a parent-symbol subscription: filtering by class as
5291    /// well as root prevents leaves of a different class (e.g. options when
5292    /// the user asked for futures, or vice versa) from being pulled in.
5293    #[must_use]
5294    pub fn instruments_by_parent(
5295        &self,
5296        venue: &Venue,
5297        root: &Ustr,
5298        class: InstrumentClass,
5299    ) -> Vec<&InstrumentAny> {
5300        self.instruments
5301            .values()
5302            .filter(|i| &i.id().venue == venue)
5303            .filter(|i| i.underlying() == Some(*root))
5304            .filter(|i| i.instrument_class() == class)
5305            .collect()
5306    }
5307
5308    /// Returns references to all bar types contained in the cache.
5309    #[must_use]
5310    pub fn bar_types(
5311        &self,
5312        instrument_id: Option<&InstrumentId>,
5313        price_type: Option<&PriceType>,
5314        aggregation_source: AggregationSource,
5315    ) -> Vec<&BarType> {
5316        let mut bar_types = self
5317            .bars
5318            .keys()
5319            .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
5320            .collect::<Vec<&BarType>>();
5321
5322        if let Some(instrument_id) = instrument_id {
5323            bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
5324        }
5325
5326        if let Some(price_type) = price_type {
5327            bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
5328        }
5329
5330        bar_types
5331    }
5332
5333    // -- SYNTHETIC QUERIES -----------------------------------------------------------------------
5334
5335    /// Returns a reference to the synthetic instrument for the `instrument_id` (if found).
5336    #[must_use]
5337    pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
5338        self.synthetics.get(instrument_id)
5339    }
5340
5341    /// Returns references to instrument IDs for all synthetic instruments contained in the cache.
5342    #[must_use]
5343    pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
5344        self.synthetics.keys().collect()
5345    }
5346
5347    /// Returns references to all synthetic instruments contained in the cache.
5348    #[must_use]
5349    pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
5350        self.synthetics.values().collect()
5351    }
5352
5353    // -- ACCOUNT QUERIES -----------------------------------------------------------------------
5354
5355    /// Returns a borrow of the account for the `account_id` (if found).
5356    #[must_use]
5357    pub fn account(&self, account_id: &AccountId) -> Option<AccountRef<'_>> {
5358        self.accounts
5359            .get(account_id)
5360            .map(|account_cell| AccountRef::new(account_cell.borrow()))
5361    }
5362
5363    /// Gets an exclusive write borrow of the account with the `account_id` (if found).
5364    ///
5365    /// Requires `&mut Cache` so cache writes are reachable only by privileged crates that hold
5366    /// `Rc<RefCell<Cache>>` directly. Adapter-facing code receives [`CacheView`], which only
5367    /// exposes immutable cache borrows and therefore cannot reach this method.
5368    ///
5369    /// While the returned [`AccountRefMut`] is alive, no other read or write of the same account
5370    /// is permitted. Drop the borrow before dispatching events or taking any other cache borrow
5371    /// that may re-enter the same account.
5372    #[must_use]
5373    pub fn account_mut(&mut self, account_id: &AccountId) -> Option<AccountRefMut<'_>> {
5374        self.accounts
5375            .get(account_id)
5376            .map(|account_cell| AccountRefMut::new(account_cell.borrow_mut()))
5377    }
5378
5379    /// Gets an owned snapshot of the account with the `account_id` (if found).
5380    ///
5381    /// Use when downstream needs an owned [`AccountAny`] that crosses a boundary. The snapshot
5382    /// will not reflect later cache mutations.
5383    #[must_use]
5384    pub fn account_owned(&self, account_id: &AccountId) -> Option<AccountAny> {
5385        self.accounts
5386            .get(account_id)
5387            .map(|account_cell| account_cell.borrow().clone())
5388    }
5389
5390    /// Returns a borrow of the account for the `venue` (if found).
5391    #[must_use]
5392    pub fn account_for_venue(&self, venue: &Venue) -> Option<AccountRef<'_>> {
5393        self.index
5394            .venue_account
5395            .get(venue)
5396            .and_then(|account_id| self.accounts.get(account_id))
5397            .map(|account_cell| AccountRef::new(account_cell.borrow()))
5398    }
5399
5400    /// Returns an owned snapshot of the account for the `venue` (if found).
5401    ///
5402    /// Use when downstream needs an owned [`AccountAny`] that crosses a boundary. The snapshot
5403    /// will not reflect later cache mutations.
5404    #[must_use]
5405    pub fn account_for_venue_owned(&self, venue: &Venue) -> Option<AccountAny> {
5406        self.index
5407            .venue_account
5408            .get(venue)
5409            .and_then(|account_id| self.accounts.get(account_id))
5410            .map(|account_cell| account_cell.borrow().clone())
5411    }
5412
5413    /// Returns a reference to the account ID for the `venue` (if found).
5414    #[must_use]
5415    pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
5416        self.index.venue_account.get(venue)
5417    }
5418
5419    /// Returns borrows of all accounts for the `account_id`.
5420    ///
5421    /// Each [`AccountRef`] in the returned vector borrows its underlying cell; mutating any of
5422    /// those accounts while the vector is alive will panic at runtime. Drop the vector before
5423    /// issuing writes.
5424    #[must_use]
5425    pub fn accounts(&self, account_id: &AccountId) -> Vec<AccountRef<'_>> {
5426        self.accounts
5427            .values()
5428            .filter(|account_cell| &account_cell.borrow().id() == account_id)
5429            .map(|account_cell| AccountRef::new(account_cell.borrow()))
5430            .collect()
5431    }
5432
5433    /// Updates the own order book with an order.
5434    ///
5435    /// This method adds, updates, or removes an order from the own order book
5436    /// based on the order's current state.
5437    ///
5438    /// Orders without prices (MARKET, etc.) are skipped as they cannot be
5439    /// represented in own books.
5440    pub fn update_own_order_book(&mut self, order: &OrderAny) {
5441        if !order.has_price() {
5442            return;
5443        }
5444
5445        let instrument_id = order.instrument_id();
5446
5447        if !self.own_books.contains_key(&instrument_id) {
5448            if order.is_closed() {
5449                return;
5450            }
5451
5452            self.own_books
5453                .insert(instrument_id, OwnOrderBook::new(instrument_id));
5454        }
5455
5456        let Some(own_book) = self.own_books.get_mut(&instrument_id) else {
5457            return;
5458        };
5459
5460        let own_book_order = order.to_own_book_order();
5461
5462        if order.is_closed() {
5463            if let Err(e) = own_book.delete(own_book_order) {
5464                log::debug!(
5465                    "Failed to delete order {} from own book: {e}",
5466                    order.client_order_id(),
5467                );
5468            } else {
5469                log::debug!("Deleted order {} from own book", order.client_order_id());
5470            }
5471        } else {
5472            // Add or update the order in the own book
5473            if let Err(e) = own_book.update(own_book_order) {
5474                log::debug!(
5475                    "Failed to update order {} in own book: {e}; inserting instead",
5476                    order.client_order_id(),
5477                );
5478                own_book.add(own_book_order);
5479            }
5480            log::debug!("Updated order {} in own book", order.client_order_id());
5481        }
5482    }
5483
5484    /// Force removal of an order from own order books and clean up all indexes.
5485    ///
5486    /// This method is used when order event application fails and we need to ensure
5487    /// terminal orders are properly cleaned up from own books and all relevant indexes.
5488    /// Replicates the index cleanup that `update_order` performs for closed orders.
5489    pub fn force_remove_from_own_order_book(&mut self, client_order_id: &ClientOrderId) {
5490        let Some(order_cell) = self.orders.get(client_order_id) else {
5491            return;
5492        };
5493        let order = order_cell.borrow();
5494        let instrument_id = order.instrument_id();
5495        let own_book_order = if order.has_price() {
5496            Some(order.to_own_book_order())
5497        } else {
5498            None
5499        };
5500        drop(order);
5501
5502        self.index.orders_open.remove(client_order_id);
5503        self.index.orders_pending_cancel.remove(client_order_id);
5504        self.index.orders_inflight.remove(client_order_id);
5505        self.index.orders_emulated.remove(client_order_id);
5506        self.index.orders_active_local.remove(client_order_id);
5507
5508        if let Some(own_book) = self.own_books.get_mut(&instrument_id)
5509            && let Some(own_book_order) = own_book_order
5510        {
5511            if let Err(e) = own_book.delete(own_book_order) {
5512                log::debug!("Could not force delete {client_order_id} from own book: {e}");
5513            } else {
5514                log::debug!("Force deleted {client_order_id} from own book");
5515            }
5516        }
5517
5518        self.index.orders_closed.insert(*client_order_id);
5519    }
5520
5521    /// Audit all own order books against open and inflight order indexes.
5522    ///
5523    /// Ensures closed orders are removed from own order books. This includes both
5524    /// orders tracked in `orders_open` (`ACCEPTED`, `TRIGGERED`, `PENDING_*`, `PARTIALLY_FILLED`)
5525    /// and `orders_inflight` (`INITIALIZED`, `SUBMITTED`) to prevent false positives
5526    /// during venue latency windows.
5527    pub fn audit_own_order_books(&mut self) {
5528        log::debug!("Starting own books audit");
5529        let start = std::time::Instant::now();
5530
5531        // Build union of open and inflight orders for audit,
5532        // this prevents false positives for SUBMITTED orders during venue latency.
5533        let valid_order_ids: AHashSet<ClientOrderId> = self
5534            .index
5535            .orders_open
5536            .union(&self.index.orders_inflight)
5537            .copied()
5538            .collect();
5539
5540        for own_book in self.own_books.values_mut() {
5541            own_book.audit_open_orders(&valid_order_ids);
5542        }
5543
5544        log::debug!("Completed own books audit in {:?}", start.elapsed());
5545    }
5546}
5547
5548fn parse_position_snapshot_blob_ref(blob_ref: &str) -> anyhow::Result<(PositionId, usize)> {
5549    let Some(rest) = blob_ref.strip_prefix("cache://position-snapshots/") else {
5550        anyhow::bail!("unsupported cache snapshot blob_ref {blob_ref}");
5551    };
5552
5553    let Some((position_id, snapshot_index)) = rest.rsplit_once('/') else {
5554        anyhow::bail!("malformed position snapshot blob_ref {blob_ref}");
5555    };
5556
5557    if position_id.is_empty() {
5558        anyhow::bail!("position snapshot blob_ref {blob_ref} has empty position id");
5559    }
5560
5561    let snapshot_index = snapshot_index.parse::<usize>().map_err(|e| {
5562        anyhow::anyhow!("position snapshot blob_ref {blob_ref} has invalid frame index: {e}")
5563    })?;
5564
5565    Ok((PositionId::new(position_id), snapshot_index))
5566}
5567
5568fn validate_position_snapshot_blob(position_id: &PositionId, blob: &[u8]) -> anyhow::Result<()> {
5569    let snapshot = serde_json::from_slice::<Position>(blob)?;
5570    let expected_prefix = format!("{}-", position_id.as_str());
5571
5572    let Some(snapshot_uuid) = snapshot.id.as_str().strip_prefix(&expected_prefix) else {
5573        anyhow::bail!(
5574            "position snapshot id {} does not match blob_ref position {position_id}",
5575            snapshot.id
5576        );
5577    };
5578
5579    if UUID4::from_str(snapshot_uuid).is_err() {
5580        anyhow::bail!(
5581            "position snapshot id {} does not match blob_ref position {position_id}",
5582            snapshot.id
5583        );
5584    }
5585
5586    Ok(())
5587}