Skip to main content

nautilus_common/cache/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! In-memory cache for market and execution data, with optional persistent backing.
17//!
18//! Provides methods to load, query, and update cached data such as instruments, orders, and prices.
19
20pub mod config;
21pub mod database;
22pub mod fifo;
23pub mod quote;
24pub mod refs;
25
26mod bounded;
27mod index;
28
29#[cfg(test)]
30mod tests;
31
32use std::{
33    borrow::Cow,
34    cell::{Ref, RefCell},
35    fmt::{Debug, Display},
36    rc::Rc,
37    str::FromStr,
38    time::{SystemTime, UNIX_EPOCH},
39};
40
41use ahash::{AHashMap, AHashSet};
42use bounded::BoundedVecDeque;
43use bytes::Bytes;
44pub use config::CacheConfig; // Re-export
45use database::{CacheDatabaseAdapter, CacheMap};
46use index::CacheIndex;
47use nautilus_core::{
48    SharedCell, UUID4, UnixNanos,
49    correctness::{
50        check_key_not_in_map, check_predicate_false, check_slice_not_empty,
51        check_valid_string_ascii,
52    },
53    datetime::secs_to_nanos_unchecked,
54};
55use nautilus_model::{
56    accounts::{Account, AccountAny},
57    data::{
58        Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, InstrumentStatus,
59        MarkPriceUpdate, QuoteTick, TradeTick, YieldCurveData, option_chain::OptionGreeks,
60    },
61    enums::{
62        AggregationSource, ContingencyType, InstrumentClass, OmsType, OrderSide, PositionSide,
63        PriceType, TriggerType,
64    },
65    events::{AccountState, OrderEventAny},
66    identifiers::{
67        AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
68        OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
69    },
70    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
71    orderbook::{
72        OrderBook,
73        own::{OwnOrderBook, should_handle_own_book_order},
74    },
75    orders::{Order, OrderAny, OrderError, OrderList},
76    position::Position,
77    types::{Currency, Money, Price, Quantity},
78};
79pub use refs::{AccountRef, AccountRefMut, OrderRef, OrderRefMut, PositionRef, PositionRefMut};
80use rust_decimal::Decimal;
81use ustr::Ustr;
82
83use crate::xrate::get_exchange_rate;
84
85/// Cache-owned reference to a snapshot blob.
86///
87/// The cache writes and later fetches the blob; external systems persist this opaque reference
88/// and may hash the bytes before recording a durable anchor.
89#[derive(Clone, Debug, PartialEq, Eq)]
90pub struct CacheSnapshotRef {
91    /// Opaque cache-owned snapshot location.
92    pub blob_ref: String,
93    /// Snapshot bytes stored under [`Self::blob_ref`].
94    pub blob: Bytes,
95}
96
97impl CacheSnapshotRef {
98    /// Creates a new [`CacheSnapshotRef`].
99    #[must_use]
100    pub fn new(blob_ref: impl Into<String>, blob: impl Into<Bytes>) -> Self {
101        Self {
102            blob_ref: blob_ref.into(),
103            blob: blob.into(),
104        }
105    }
106}
107
108/// Read-only view over the platform cache.
109///
110/// Adapter-facing code receives this type instead of the mutable cache handle so cache writes stay
111/// owned by the data and execution engines.
112#[derive(Clone, Debug)]
113pub struct CacheView {
114    inner: Rc<RefCell<Cache>>,
115}
116
117impl CacheView {
118    /// Creates a new [`CacheView`] from a cache handle.
119    #[must_use]
120    pub fn new(inner: Rc<RefCell<Cache>>) -> Self {
121        Self { inner }
122    }
123
124    /// Borrows the cache immutably.
125    ///
126    /// # Panics
127    ///
128    /// Panics if the cache is already mutably borrowed.
129    pub fn borrow(&self) -> Ref<'_, Cache> {
130        self.inner.borrow()
131    }
132}
133
134impl From<Rc<RefCell<Cache>>> for CacheView {
135    fn from(inner: Rc<RefCell<Cache>>) -> Self {
136        Self::new(inner)
137    }
138}
139
140// Filter sources resolved from an order or position query.
141//
142// Captures the three states of a multi-key index intersection without committing to an owned
143// result set: no filters at all (the caller iterates the bucket directly), one or more filter
144// sources resolved successfully (intersect them lazily), or one filter resolved to no entries
145// at all (the result is unconditionally empty).
146enum FilterSources<'a, K> {
147    Unfiltered,
148    Empty,
149    Sets(Vec<&'a AHashSet<K>>),
150}
151
152// Intersects a non-empty collection of filter sources by sorting them ascending by length and
153// driving the loop from the smallest set, collecting one `AHashSet` of matching keys.
154//
155// Single-source inputs short-circuit to a direct `AHashSet::clone` (memcopy of the bucket
156// table) rather than rehashing each entry through `iter().copied().collect()`.
157fn intersect_filter_sources<K>(mut sources: Vec<&AHashSet<K>>) -> AHashSet<K>
158where
159    K: Copy + Eq + std::hash::Hash,
160{
161    debug_assert!(!sources.is_empty());
162    sources.sort_unstable_by_key(|s| s.len());
163    let driver = sources[0];
164    let rest = &sources[1..];
165
166    if rest.is_empty() {
167        return driver.clone();
168    }
169
170    driver
171        .iter()
172        .filter(|id| rest.iter().all(|s| s.contains(id)))
173        .copied()
174        .collect()
175}
176
177// Intersects `bucket` with one or more filter sources.
178//
179// For exactly one filter source, iterates the larger of (bucket, filter) and looks up in the
180// smaller. The larger set scans linearly (HW-prefetcher friendly) and the smaller stays hot in
181// cache, which empirically beats the size-ordered approach when the smaller filter is too
182// large to fit in L1 (e.g., a 20k-entry venue filter against a 100k-entry bucket). For two or
183// more filters the size-ordered driver is reinstated and the bucket joins the source list.
184fn intersect_pair_or_many<'a, K>(
185    bucket: &'a AHashSet<K>,
186    mut sources: Vec<&'a AHashSet<K>>,
187) -> AHashSet<K>
188where
189    K: Copy + Eq + std::hash::Hash,
190{
191    debug_assert!(!sources.is_empty());
192    if sources.len() == 1 {
193        let filter = sources[0];
194        let (larger, smaller) = if bucket.len() >= filter.len() {
195            (bucket, filter)
196        } else {
197            (filter, bucket)
198        };
199        return larger.intersection(smaller).copied().collect();
200    }
201
202    sources.push(bucket);
203    intersect_filter_sources(sources)
204}
205
206/// A common in-memory `Cache` for market and execution related data.
207#[cfg_attr(
208    feature = "python",
209    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
210)]
211pub struct Cache {
212    config: CacheConfig,
213    index: CacheIndex,
214    database: Option<Box<dyn CacheDatabaseAdapter>>,
215    general: AHashMap<String, Bytes>,
216    currencies: AHashMap<Ustr, Currency>,
217    instruments: AHashMap<InstrumentId, InstrumentAny>,
218    synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
219    books: AHashMap<InstrumentId, OrderBook>,
220    own_books: AHashMap<InstrumentId, OwnOrderBook>,
221    quotes: AHashMap<InstrumentId, BoundedVecDeque<QuoteTick>>,
222    trades: AHashMap<InstrumentId, BoundedVecDeque<TradeTick>>,
223    mark_xrates: AHashMap<(Currency, Currency), f64>,
224    mark_prices: AHashMap<InstrumentId, BoundedVecDeque<MarkPriceUpdate>>,
225    index_prices: AHashMap<InstrumentId, BoundedVecDeque<IndexPriceUpdate>>,
226    funding_rates: AHashMap<InstrumentId, BoundedVecDeque<FundingRateUpdate>>,
227    instrument_statuses: AHashMap<InstrumentId, BoundedVecDeque<InstrumentStatus>>,
228    bars: AHashMap<BarType, BoundedVecDeque<Bar>>,
229    greeks: AHashMap<InstrumentId, GreeksData>,
230    option_greeks: AHashMap<InstrumentId, OptionGreeks>,
231    yield_curves: AHashMap<String, YieldCurveData>,
232    accounts: AHashMap<AccountId, SharedCell<AccountAny>>,
233    orders: AHashMap<ClientOrderId, SharedCell<OrderAny>>,
234    order_lists: AHashMap<OrderListId, OrderList>,
235    positions: AHashMap<PositionId, SharedCell<Position>>,
236    position_snapshots: AHashMap<PositionId, Vec<Bytes>>,
237    #[cfg(feature = "defi")]
238    pub(crate) defi: crate::defi::cache::DefiCache,
239}
240
241impl Debug for Cache {
242    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243        f.debug_struct(stringify!(Cache))
244            .field("config", &self.config)
245            .field("index", &self.index)
246            .field("general", &self.general)
247            .field("currencies", &self.currencies)
248            .field("instruments", &self.instruments)
249            .field("synthetics", &self.synthetics)
250            .field("books", &self.books)
251            .field("own_books", &self.own_books)
252            .field("quotes", &self.quotes)
253            .field("trades", &self.trades)
254            .field("mark_xrates", &self.mark_xrates)
255            .field("mark_prices", &self.mark_prices)
256            .field("index_prices", &self.index_prices)
257            .field("funding_rates", &self.funding_rates)
258            .field("instrument_statuses", &self.instrument_statuses)
259            .field("bars", &self.bars)
260            .field("greeks", &self.greeks)
261            .field("option_greeks", &self.option_greeks)
262            .field("yield_curves", &self.yield_curves)
263            .field("accounts", &self.accounts)
264            .field("orders", &self.orders)
265            .field("order_lists", &self.order_lists)
266            .field("positions", &self.positions)
267            .field("position_snapshots", &self.position_snapshots)
268            .finish()
269    }
270}
271
272impl Default for Cache {
273    /// Creates a new default [`Cache`] instance.
274    fn default() -> Self {
275        Self::new(Some(CacheConfig::default()), None)
276    }
277}
278
279impl Cache {
280    /// Creates a new [`Cache`] instance with optional configuration and database adapter.
281    #[must_use]
282    /// # Note
283    ///
284    /// Uses provided `CacheConfig` or defaults, and optional `CacheDatabaseAdapter` for persistence.
285    ///
286    /// # Panics
287    ///
288    /// Panics if the cache config has a zero tick or bar capacity.
289    pub fn new(
290        config: Option<CacheConfig>,
291        database: Option<Box<dyn CacheDatabaseAdapter>>,
292    ) -> Self {
293        let config = config.unwrap_or_default();
294        config.validate().expect("invalid `CacheConfig`");
295
296        Self {
297            config,
298            index: CacheIndex::default(),
299            database,
300            general: AHashMap::new(),
301            currencies: AHashMap::new(),
302            instruments: AHashMap::new(),
303            synthetics: AHashMap::new(),
304            books: AHashMap::new(),
305            own_books: AHashMap::new(),
306            quotes: AHashMap::new(),
307            trades: AHashMap::new(),
308            mark_xrates: AHashMap::new(),
309            mark_prices: AHashMap::new(),
310            index_prices: AHashMap::new(),
311            funding_rates: AHashMap::new(),
312            instrument_statuses: AHashMap::new(),
313            bars: AHashMap::new(),
314            greeks: AHashMap::new(),
315            option_greeks: AHashMap::new(),
316            yield_curves: AHashMap::new(),
317            accounts: AHashMap::new(),
318            orders: AHashMap::new(),
319            order_lists: AHashMap::new(),
320            positions: AHashMap::new(),
321            position_snapshots: AHashMap::new(),
322            #[cfg(feature = "defi")]
323            defi: crate::defi::cache::DefiCache::default(),
324        }
325    }
326
327    /// Returns the cache instances memory address.
328    #[must_use]
329    pub fn memory_address(&self) -> String {
330        format!("{:?}", std::ptr::from_ref(self))
331    }
332
333    /// Sets the cache database adapter for persistence.
334    ///
335    /// This allows setting or replacing the database adapter after cache construction.
336    pub fn set_database(&mut self, database: Box<dyn CacheDatabaseAdapter>) {
337        let type_name = std::any::type_name_of_val(&*database);
338        log::info!("Cache database adapter set: {type_name}");
339        self.database = Some(database);
340    }
341
342    // -- COMMANDS --------------------------------------------------------------------------------
343
344    /// Clears and reloads general entries from the database into the cache.
345    ///
346    /// # Errors
347    ///
348    /// Returns an error if loading general cache data fails.
349    pub fn cache_general(&mut self) -> anyhow::Result<()> {
350        self.general = match &mut self.database {
351            Some(db) => db.load()?,
352            None => AHashMap::new(),
353        };
354
355        log::info!(
356            "Cached {} general object(s) from database",
357            self.general.len()
358        );
359        Ok(())
360    }
361
362    /// Loads all core caches (currencies, instruments, accounts, orders, positions) from the database.
363    ///
364    /// # Errors
365    ///
366    /// Returns an error if loading all cache data fails.
367    pub async fn cache_all(&mut self) -> anyhow::Result<()> {
368        let cache_map = match &self.database {
369            Some(db) => db.load_all().await?,
370            None => CacheMap::default(),
371        };
372
373        self.currencies = cache_map.currencies;
374        self.instruments = cache_map.instruments;
375        self.synthetics = cache_map.synthetics;
376        self.accounts = cache_map
377            .accounts
378            .into_iter()
379            .map(|(id, account)| (id, SharedCell::new(account)))
380            .collect();
381        self.orders = cache_map
382            .orders
383            .into_iter()
384            .map(|(id, order)| (id, SharedCell::new(order)))
385            .collect();
386        self.positions = cache_map
387            .positions
388            .into_iter()
389            .map(|(id, position)| (id, SharedCell::new(position)))
390            .collect();
391
392        self.assign_position_ids_to_contingencies();
393        Ok(())
394    }
395
396    /// Clears and reloads the currency cache from the database.
397    ///
398    /// # Errors
399    ///
400    /// Returns an error if loading currencies cache fails.
401    pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
402        self.currencies = match &mut self.database {
403            Some(db) => db.load_currencies().await?,
404            None => AHashMap::new(),
405        };
406
407        log::info!("Cached {} currencies from database", self.general.len());
408        Ok(())
409    }
410
411    /// Clears and reloads the instrument cache from the database.
412    ///
413    /// # Errors
414    ///
415    /// Returns an error if loading instruments cache fails.
416    pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
417        self.instruments = match &mut self.database {
418            Some(db) => db.load_instruments().await?,
419            None => AHashMap::new(),
420        };
421
422        log::info!("Cached {} instruments from database", self.general.len());
423        Ok(())
424    }
425
426    /// Clears and reloads the synthetic instrument cache from the database.
427    ///
428    /// # Errors
429    ///
430    /// Returns an error if loading synthetic instruments cache fails.
431    pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
432        self.synthetics = match &mut self.database {
433            Some(db) => db.load_synthetics().await?,
434            None => AHashMap::new(),
435        };
436
437        log::info!(
438            "Cached {} synthetic instruments from database",
439            self.general.len()
440        );
441        Ok(())
442    }
443
444    /// Clears and reloads the account cache from the database.
445    ///
446    /// # Errors
447    ///
448    /// Returns an error if loading accounts cache fails.
449    pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
450        self.accounts = match &mut self.database {
451            Some(db) => db
452                .load_accounts()
453                .await?
454                .into_iter()
455                .map(|(id, account)| (id, SharedCell::new(account)))
456                .collect(),
457            None => AHashMap::new(),
458        };
459
460        log::info!(
461            "Cached {} synthetic instruments from database",
462            self.general.len()
463        );
464        Ok(())
465    }
466
467    /// Clears and reloads the order cache from the database.
468    ///
469    /// # Errors
470    ///
471    /// Returns an error if loading orders cache fails.
472    pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
473        self.orders = match &mut self.database {
474            Some(db) => db
475                .load_orders()
476                .await?
477                .into_iter()
478                .map(|(id, order)| (id, SharedCell::new(order)))
479                .collect(),
480            None => AHashMap::new(),
481        };
482
483        log::info!("Cached {} orders from database", self.general.len());
484
485        self.assign_position_ids_to_contingencies();
486        Ok(())
487    }
488
489    /// Clears and reloads the position cache from the database.
490    ///
491    /// # Errors
492    ///
493    /// Returns an error if loading positions cache fails.
494    pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
495        self.positions = match &mut self.database {
496            Some(db) => db
497                .load_positions()
498                .await?
499                .into_iter()
500                .map(|(id, position)| (id, SharedCell::new(position)))
501                .collect(),
502            None => AHashMap::new(),
503        };
504
505        log::info!("Cached {} positions from database", self.general.len());
506        Ok(())
507    }
508
509    /// Clears the current cache index and re-build.
510    pub fn build_index(&mut self) {
511        log::debug!("Building index");
512
513        // Index accounts
514        for account_id in self.accounts.keys() {
515            self.index
516                .venue_account
517                .insert(account_id.get_issuer(), *account_id);
518        }
519
520        // Index orders
521        for (client_order_id, order_cell) in &self.orders {
522            let order = order_cell.borrow();
523            let instrument_id = order.instrument_id();
524            let venue = instrument_id.venue;
525            let strategy_id = order.strategy_id();
526
527            // 1: Build index.venue_orders -> {Venue, {ClientOrderId}}
528            self.index
529                .venue_orders
530                .entry(venue)
531                .or_default()
532                .insert(*client_order_id);
533
534            // 2: Build index.order_ids -> {VenueOrderId, ClientOrderId}
535            if let Some(venue_order_id) = order.venue_order_id() {
536                self.index
537                    .venue_order_ids
538                    .insert(venue_order_id, *client_order_id);
539            }
540
541            // 3: Build index.order_position -> {ClientOrderId, PositionId}
542            if let Some(position_id) = order.position_id() {
543                self.index
544                    .order_position
545                    .insert(*client_order_id, position_id);
546            }
547
548            // 4: Build index.order_strategy -> {ClientOrderId, StrategyId}
549            self.index
550                .order_strategy
551                .insert(*client_order_id, order.strategy_id());
552
553            // 5: Build index.instrument_orders -> {InstrumentId, {ClientOrderId}}
554            self.index
555                .instrument_orders
556                .entry(instrument_id)
557                .or_default()
558                .insert(*client_order_id);
559
560            // 6: Build index.strategy_orders -> {StrategyId, {ClientOrderId}}
561            self.index
562                .strategy_orders
563                .entry(strategy_id)
564                .or_default()
565                .insert(*client_order_id);
566
567            // 7: Build index.account_orders -> {AccountId, {ClientOrderId}}
568            if let Some(account_id) = order.account_id() {
569                self.index
570                    .account_orders
571                    .entry(account_id)
572                    .or_default()
573                    .insert(*client_order_id);
574            }
575
576            // 8: Build index.exec_algorithm_orders -> {ExecAlgorithmId, {ClientOrderId}}
577            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
578                self.index
579                    .exec_algorithm_orders
580                    .entry(exec_algorithm_id)
581                    .or_default()
582                    .insert(*client_order_id);
583            }
584
585            // 8: Build index.exec_spawn_orders -> {ClientOrderId, {ClientOrderId}}
586            if let Some(exec_spawn_id) = order.exec_spawn_id() {
587                self.index
588                    .exec_spawn_orders
589                    .entry(exec_spawn_id)
590                    .or_default()
591                    .insert(*client_order_id);
592            }
593
594            // 9: Build index.orders -> {ClientOrderId}
595            self.index.orders.insert(*client_order_id);
596
597            // 10: Build index.orders_active_local -> {ClientOrderId}
598            if order.is_active_local() {
599                self.index.orders_active_local.insert(*client_order_id);
600            }
601
602            // 11: Build index.orders_open -> {ClientOrderId}
603            if order.is_open() {
604                self.index.orders_open.insert(*client_order_id);
605            }
606
607            // 12: Build index.orders_closed -> {ClientOrderId}
608            if order.is_closed() {
609                self.index.orders_closed.insert(*client_order_id);
610            }
611
612            // 13: Build index.orders_emulated -> {ClientOrderId}
613            if let Some(emulation_trigger) = order.emulation_trigger()
614                && emulation_trigger != TriggerType::NoTrigger
615                && !order.is_closed()
616            {
617                self.index.orders_emulated.insert(*client_order_id);
618            }
619
620            // 14: Build index.orders_inflight -> {ClientOrderId}
621            if order.is_inflight() {
622                self.index.orders_inflight.insert(*client_order_id);
623            }
624
625            // 15: Build index.strategies -> {StrategyId}
626            self.index.strategies.insert(strategy_id);
627
628            // 16: Build index.strategies -> {ExecAlgorithmId}
629            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
630                self.index.exec_algorithms.insert(exec_algorithm_id);
631            }
632        }
633
634        // Index positions
635        for (position_id, position_cell) in &self.positions {
636            let position = position_cell.borrow();
637            let instrument_id = position.instrument_id;
638            let venue = instrument_id.venue;
639            let strategy_id = position.strategy_id;
640
641            // 1: Build index.venue_positions -> {Venue, {PositionId}}
642            self.index
643                .venue_positions
644                .entry(venue)
645                .or_default()
646                .insert(*position_id);
647
648            // 2: Build index.position_strategy -> {PositionId, StrategyId}
649            self.index
650                .position_strategy
651                .insert(*position_id, position.strategy_id);
652
653            // 3: Build index.position_orders -> {PositionId, {ClientOrderId}}
654            self.index
655                .position_orders
656                .entry(*position_id)
657                .or_default()
658                .extend(position.client_order_ids());
659
660            // 4: Build index.instrument_positions -> {InstrumentId, {PositionId}}
661            self.index
662                .instrument_positions
663                .entry(instrument_id)
664                .or_default()
665                .insert(*position_id);
666
667            // 5: Build index.strategy_positions -> {StrategyId, {PositionId}}
668            self.index
669                .strategy_positions
670                .entry(strategy_id)
671                .or_default()
672                .insert(*position_id);
673
674            // 6: Build index.account_positions -> {AccountId, {PositionId}}
675            self.index
676                .account_positions
677                .entry(position.account_id)
678                .or_default()
679                .insert(*position_id);
680
681            // 7: Build index.positions -> {PositionId}
682            self.index.positions.insert(*position_id);
683
684            // 8: Build index.positions_open -> {PositionId}
685            if position.is_open() {
686                self.index.positions_open.insert(*position_id);
687            }
688
689            // 9: Build index.positions_closed -> {PositionId}
690            if position.is_closed() {
691                self.index.positions_closed.insert(*position_id);
692            }
693
694            // 10: Build index.strategies -> {StrategyId}
695            self.index.strategies.insert(strategy_id);
696        }
697    }
698
699    /// Returns whether the cache has a backing database.
700    #[must_use]
701    pub const fn has_backing(&self) -> bool {
702        self.database.is_some()
703    }
704
705    // Calculate the unrealized profit and loss (PnL) for `position`.
706    #[must_use]
707    pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
708        let Some(quote) = self.quote(&position.instrument_id) else {
709            log::warn!(
710                "Cannot calculate unrealized PnL for {}, no quotes for {}",
711                position.id,
712                position.instrument_id
713            );
714            return None;
715        };
716
717        // Use exit price for mark-to-market: longs exit at bid, shorts exit at ask
718        let last = match position.side {
719            PositionSide::Flat | PositionSide::NoPositionSide => {
720                return Some(Money::new(0.0, position.settlement_currency));
721            }
722            PositionSide::Long => quote.bid_price,
723            PositionSide::Short => quote.ask_price,
724        };
725
726        Some(position.unrealized_pnl(last))
727    }
728
729    /// Checks integrity of data within the cache.
730    ///
731    /// All data should be loaded from the database prior to this call.
732    /// If an error is found then a log error message will also be produced.
733    ///
734    /// # Panics
735    ///
736    /// Panics if failure calling system clock.
737    #[must_use]
738    pub fn check_integrity(&mut self) -> bool {
739        let mut error_count = 0;
740        let failure = "Integrity failure";
741
742        // Get current timestamp in microseconds
743        let timestamp_us = SystemTime::now()
744            .duration_since(UNIX_EPOCH)
745            .expect("Time went backwards")
746            .as_micros();
747
748        log::info!("Checking data integrity");
749
750        // Check object caches
751        for account_id in self.accounts.keys() {
752            if !self
753                .index
754                .venue_account
755                .contains_key(&account_id.get_issuer())
756            {
757                log::error!(
758                    "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
759                );
760                error_count += 1;
761            }
762        }
763
764        for (client_order_id, order_cell) in &self.orders {
765            let order = order_cell.borrow();
766
767            if !self.index.order_strategy.contains_key(client_order_id) {
768                log::error!(
769                    "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
770                );
771                error_count += 1;
772            }
773
774            if !self.index.orders.contains(client_order_id) {
775                log::error!(
776                    "{failure} in orders: {client_order_id} not found in `self.index.orders`",
777                );
778                error_count += 1;
779            }
780
781            if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
782                log::error!(
783                    "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
784                );
785                error_count += 1;
786            }
787
788            if order.is_active_local() && !self.index.orders_active_local.contains(client_order_id)
789            {
790                log::error!(
791                    "{failure} in orders: {client_order_id} not found in `self.index.orders_active_local`",
792                );
793                error_count += 1;
794            }
795
796            if order.is_open() && !self.index.orders_open.contains(client_order_id) {
797                log::error!(
798                    "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
799                );
800                error_count += 1;
801            }
802
803            if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
804                log::error!(
805                    "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
806                );
807                error_count += 1;
808            }
809
810            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
811                if !self
812                    .index
813                    .exec_algorithm_orders
814                    .contains_key(&exec_algorithm_id)
815                {
816                    log::error!(
817                        "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
818                    );
819                    error_count += 1;
820                }
821
822                if order.exec_spawn_id().is_none()
823                    && !self.index.exec_spawn_orders.contains_key(client_order_id)
824                {
825                    log::error!(
826                        "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
827                    );
828                    error_count += 1;
829                }
830            }
831        }
832
833        for (position_id, position_cell) in &self.positions {
834            let position = position_cell.borrow();
835
836            if !self.index.position_strategy.contains_key(position_id) {
837                log::error!(
838                    "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
839                );
840                error_count += 1;
841            }
842
843            if !self.index.position_orders.contains_key(position_id) {
844                log::error!(
845                    "{failure} in positions: {position_id} not found in `self.index.position_orders`",
846                );
847                error_count += 1;
848            }
849
850            if !self.index.positions.contains(position_id) {
851                log::error!(
852                    "{failure} in positions: {position_id} not found in `self.index.positions`",
853                );
854                error_count += 1;
855            }
856
857            if position.is_open() && !self.index.positions_open.contains(position_id) {
858                log::error!(
859                    "{failure} in positions: {position_id} not found in `self.index.positions_open`",
860                );
861                error_count += 1;
862            }
863
864            if position.is_closed() && !self.index.positions_closed.contains(position_id) {
865                log::error!(
866                    "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
867                );
868                error_count += 1;
869            }
870        }
871
872        // Check indexes
873        for account_id in self.index.venue_account.values() {
874            if !self.accounts.contains_key(account_id) {
875                log::error!(
876                    "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
877                );
878                error_count += 1;
879            }
880        }
881
882        for client_order_id in self.index.venue_order_ids.values() {
883            if !self.orders.contains_key(client_order_id) {
884                log::error!(
885                    "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
886                );
887                error_count += 1;
888            }
889        }
890
891        for client_order_id in self.index.client_order_ids.keys() {
892            if !self.orders.contains_key(client_order_id) {
893                log::error!(
894                    "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
895                );
896                error_count += 1;
897            }
898        }
899
900        for client_order_id in self.index.order_position.keys() {
901            if !self.orders.contains_key(client_order_id) {
902                log::error!(
903                    "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
904                );
905                error_count += 1;
906            }
907        }
908
909        // Check indexes
910        for client_order_id in self.index.order_strategy.keys() {
911            if !self.orders.contains_key(client_order_id) {
912                log::error!(
913                    "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
914                );
915                error_count += 1;
916            }
917        }
918
919        for position_id in self.index.position_strategy.keys() {
920            if !self.positions.contains_key(position_id) {
921                log::error!(
922                    "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
923                );
924                error_count += 1;
925            }
926        }
927
928        for position_id in self.index.position_orders.keys() {
929            if !self.positions.contains_key(position_id) {
930                log::error!(
931                    "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
932                );
933                error_count += 1;
934            }
935        }
936
937        for (instrument_id, client_order_ids) in &self.index.instrument_orders {
938            for client_order_id in client_order_ids {
939                if !self.orders.contains_key(client_order_id) {
940                    log::error!(
941                        "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
942                    );
943                    error_count += 1;
944                }
945            }
946        }
947
948        for instrument_id in self.index.instrument_positions.keys() {
949            if !self.index.instrument_orders.contains_key(instrument_id) {
950                log::error!(
951                    "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
952                );
953                error_count += 1;
954            }
955        }
956
957        for client_order_ids in self.index.strategy_orders.values() {
958            for client_order_id in client_order_ids {
959                if !self.orders.contains_key(client_order_id) {
960                    log::error!(
961                        "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
962                    );
963                    error_count += 1;
964                }
965            }
966        }
967
968        for position_ids in self.index.strategy_positions.values() {
969            for position_id in position_ids {
970                if !self.positions.contains_key(position_id) {
971                    log::error!(
972                        "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
973                    );
974                    error_count += 1;
975                }
976            }
977        }
978
979        for client_order_id in &self.index.orders {
980            if !self.orders.contains_key(client_order_id) {
981                log::error!(
982                    "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
983                );
984                error_count += 1;
985            }
986        }
987
988        for client_order_id in &self.index.orders_emulated {
989            if !self.orders.contains_key(client_order_id) {
990                log::error!(
991                    "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
992                );
993                error_count += 1;
994            }
995        }
996
997        for client_order_id in &self.index.orders_active_local {
998            if !self.orders.contains_key(client_order_id) {
999                log::error!(
1000                    "{failure} in `index.orders_active_local`: {client_order_id} not found in `self.orders`",
1001                );
1002                error_count += 1;
1003            }
1004        }
1005
1006        for client_order_id in &self.index.orders_inflight {
1007            if !self.orders.contains_key(client_order_id) {
1008                log::error!(
1009                    "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
1010                );
1011                error_count += 1;
1012            }
1013        }
1014
1015        for client_order_id in &self.index.orders_open {
1016            if !self.orders.contains_key(client_order_id) {
1017                log::error!(
1018                    "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
1019                );
1020                error_count += 1;
1021            }
1022        }
1023
1024        for client_order_id in &self.index.orders_closed {
1025            if !self.orders.contains_key(client_order_id) {
1026                log::error!(
1027                    "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
1028                );
1029                error_count += 1;
1030            }
1031        }
1032
1033        for position_id in &self.index.positions {
1034            if !self.positions.contains_key(position_id) {
1035                log::error!(
1036                    "{failure} in `index.positions`: {position_id} not found in `self.positions`",
1037                );
1038                error_count += 1;
1039            }
1040        }
1041
1042        for position_id in &self.index.positions_open {
1043            if !self.positions.contains_key(position_id) {
1044                log::error!(
1045                    "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
1046                );
1047                error_count += 1;
1048            }
1049        }
1050
1051        for position_id in &self.index.positions_closed {
1052            if !self.positions.contains_key(position_id) {
1053                log::error!(
1054                    "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
1055                );
1056                error_count += 1;
1057            }
1058        }
1059
1060        for strategy_id in &self.index.strategies {
1061            if !self.index.strategy_orders.contains_key(strategy_id) {
1062                log::error!(
1063                    "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
1064                );
1065                error_count += 1;
1066            }
1067        }
1068
1069        for exec_algorithm_id in &self.index.exec_algorithms {
1070            if !self
1071                .index
1072                .exec_algorithm_orders
1073                .contains_key(exec_algorithm_id)
1074            {
1075                log::error!(
1076                    "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
1077                );
1078                error_count += 1;
1079            }
1080        }
1081
1082        let total_us = SystemTime::now()
1083            .duration_since(UNIX_EPOCH)
1084            .expect("Time went backwards")
1085            .as_micros()
1086            - timestamp_us;
1087
1088        if error_count == 0 {
1089            log::info!("Integrity check passed in {total_us}μs");
1090            true
1091        } else {
1092            log::error!(
1093                "Integrity check failed with {error_count} error{} in {total_us}μs",
1094                if error_count == 1 { "" } else { "s" },
1095            );
1096            false
1097        }
1098    }
1099
1100    /// Checks for any residual open state and log warnings if any are found.
1101    ///
1102    ///'Open state' is considered to be open orders and open positions.
1103    #[must_use]
1104    pub fn check_residuals(&self) -> bool {
1105        log::debug!("Checking residuals");
1106
1107        let mut residuals = false;
1108
1109        // Check for any open orders
1110        for order in self.orders_open(None, None, None, None, None) {
1111            residuals = true;
1112            log::warn!("Residual {order}");
1113        }
1114
1115        // Check for any open positions
1116        for position in self.positions_open(None, None, None, None, None) {
1117            residuals = true;
1118            log::warn!("Residual {position}");
1119        }
1120
1121        residuals
1122    }
1123
1124    /// Purges all closed orders from the cache that are older than `buffer_secs`.
1125    ///
1126    ///
1127    /// Only orders that have been closed for at least this amount of time will be purged.
1128    /// A value of 0 means purge all closed orders regardless of when they were closed.
1129    pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
1130        log::debug!(
1131            "Purging closed orders{}",
1132            if buffer_secs > 0 {
1133                format!(" with buffer_secs={buffer_secs}")
1134            } else {
1135                String::new()
1136            }
1137        );
1138
1139        let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
1140
1141        let mut affected_order_list_ids: AHashSet<OrderListId> = AHashSet::new();
1142
1143        'outer: for client_order_id in self.index.orders_closed.clone() {
1144            let purge_target = self.orders.get(&client_order_id).and_then(|order_cell| {
1145                let order = order_cell.borrow();
1146                if order.is_closed()
1147                    && let Some(ts_closed) = order.ts_closed()
1148                    && ts_closed + buffer_ns <= ts_now
1149                {
1150                    let linked = order.linked_order_ids().map(<[_]>::to_vec);
1151                    let order_list_id = order.order_list_id();
1152                    Some((linked, order_list_id))
1153                } else {
1154                    None
1155                }
1156            });
1157
1158            let Some((linked, order_list_id)) = purge_target else {
1159                continue;
1160            };
1161
1162            // Check any linked orders (contingency orders)
1163            if let Some(linked_order_ids) = linked {
1164                for linked_order_id in &linked_order_ids {
1165                    if let Some(linked_order_cell) = self.orders.get(linked_order_id)
1166                        && linked_order_cell.borrow().is_open()
1167                    {
1168                        // Do not purge if linked order still open
1169                        continue 'outer;
1170                    }
1171                }
1172            }
1173
1174            if let Some(order_list_id) = order_list_id {
1175                affected_order_list_ids.insert(order_list_id);
1176            }
1177
1178            self.purge_order(client_order_id);
1179        }
1180
1181        for order_list_id in affected_order_list_ids {
1182            if let Some(order_list) = self.order_lists.get(&order_list_id) {
1183                let all_purged = order_list
1184                    .client_order_ids
1185                    .iter()
1186                    .all(|id| !self.orders.contains_key(id));
1187
1188                if all_purged {
1189                    self.order_lists.remove(&order_list_id);
1190                    log::info!("Purged {order_list_id}");
1191                }
1192            }
1193        }
1194    }
1195
1196    /// Purges all closed positions from the cache that are older than `buffer_secs`.
1197    pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
1198        log::debug!(
1199            "Purging closed positions{}",
1200            if buffer_secs > 0 {
1201                format!(" with buffer_secs={buffer_secs}")
1202            } else {
1203                String::new()
1204            }
1205        );
1206
1207        let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
1208
1209        for position_id in self.index.positions_closed.clone() {
1210            let should_purge = self.positions.get(&position_id).is_some_and(|cell| {
1211                let position = cell.borrow();
1212                position.is_closed()
1213                    && position
1214                        .ts_closed
1215                        .is_some_and(|ts_closed| ts_closed + buffer_ns <= ts_now)
1216            });
1217
1218            if should_purge {
1219                self.purge_position(position_id);
1220            }
1221        }
1222    }
1223
1224    /// Purges the order with the `client_order_id` from the cache (if found).
1225    ///
1226    /// For safety, an order is prevented from being purged if it's open.
1227    pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
1228        // Check if order exists and is safe to purge before removing
1229        let order_cell = self.orders.get(&client_order_id).cloned();
1230
1231        // Prevent purging open orders
1232        if let Some(ref order_cell) = order_cell
1233            && order_cell.borrow().is_open()
1234        {
1235            log::warn!("Order {client_order_id} found open when purging, skipping purge");
1236            return;
1237        }
1238
1239        // If order exists in cache, remove it and clean up order-specific indices
1240        if let Some(ref order_cell) = order_cell {
1241            let order = order_cell.borrow();
1242            // Safe to purge
1243            self.orders.remove(&client_order_id);
1244
1245            // Remove order from venue index
1246            if let Some(venue_orders) = self
1247                .index
1248                .venue_orders
1249                .get_mut(&order.instrument_id().venue)
1250            {
1251                venue_orders.remove(&client_order_id);
1252                if venue_orders.is_empty() {
1253                    self.index.venue_orders.remove(&order.instrument_id().venue);
1254                }
1255            }
1256
1257            // Remove venue order ID index if exists
1258            if let Some(venue_order_id) = order.venue_order_id() {
1259                self.index.venue_order_ids.remove(&venue_order_id);
1260            }
1261
1262            // Remove from instrument orders index
1263            if let Some(instrument_orders) =
1264                self.index.instrument_orders.get_mut(&order.instrument_id())
1265            {
1266                instrument_orders.remove(&client_order_id);
1267                if instrument_orders.is_empty() {
1268                    self.index.instrument_orders.remove(&order.instrument_id());
1269                }
1270            }
1271
1272            // Remove from position orders index if associated with a position
1273            if let Some(position_id) = order.position_id()
1274                && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
1275            {
1276                position_orders.remove(&client_order_id);
1277                if position_orders.is_empty() {
1278                    self.index.position_orders.remove(&position_id);
1279                }
1280            }
1281
1282            // Remove from exec algorithm orders index if it has an exec algorithm
1283            if let Some(exec_algorithm_id) = order.exec_algorithm_id()
1284                && let Some(exec_algorithm_orders) =
1285                    self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
1286            {
1287                exec_algorithm_orders.remove(&client_order_id);
1288                if exec_algorithm_orders.is_empty() {
1289                    self.index.exec_algorithm_orders.remove(&exec_algorithm_id);
1290                }
1291            }
1292
1293            // Clean up strategy orders reverse index
1294            if let Some(strategy_orders) = self.index.strategy_orders.get_mut(&order.strategy_id())
1295            {
1296                strategy_orders.remove(&client_order_id);
1297                if strategy_orders.is_empty() {
1298                    self.index.strategy_orders.remove(&order.strategy_id());
1299                }
1300            }
1301
1302            // Clean up account orders index
1303            if let Some(account_id) = order.account_id()
1304                && let Some(account_orders) = self.index.account_orders.get_mut(&account_id)
1305            {
1306                account_orders.remove(&client_order_id);
1307                if account_orders.is_empty() {
1308                    self.index.account_orders.remove(&account_id);
1309                }
1310            }
1311
1312            // Clean up exec spawn reverse index (if this order is a spawned child)
1313            if let Some(exec_spawn_id) = order.exec_spawn_id()
1314                && let Some(spawn_orders) = self.index.exec_spawn_orders.get_mut(&exec_spawn_id)
1315            {
1316                spawn_orders.remove(&client_order_id);
1317                if spawn_orders.is_empty() {
1318                    self.index.exec_spawn_orders.remove(&exec_spawn_id);
1319                }
1320            }
1321
1322            log::info!("Purged order {client_order_id}");
1323        } else {
1324            log::warn!("Order {client_order_id} not found when purging");
1325        }
1326
1327        // Always clean up order indices (even if order was not in cache)
1328        self.index.order_position.remove(&client_order_id);
1329        let strategy_id = self.index.order_strategy.remove(&client_order_id);
1330        self.index.order_client.remove(&client_order_id);
1331        self.index.client_order_ids.remove(&client_order_id);
1332
1333        // Clean up reverse index when order not in cache (using forward index)
1334        if let Some(strategy_id) = strategy_id
1335            && let Some(strategy_orders) = self.index.strategy_orders.get_mut(&strategy_id)
1336        {
1337            strategy_orders.remove(&client_order_id);
1338            if strategy_orders.is_empty() {
1339                self.index.strategy_orders.remove(&strategy_id);
1340            }
1341        }
1342
1343        // Remove spawn parent entry if this order was a spawn root
1344        self.index.exec_spawn_orders.remove(&client_order_id);
1345
1346        self.index.orders.remove(&client_order_id);
1347        self.index.orders_active_local.remove(&client_order_id);
1348        self.index.orders_open.remove(&client_order_id);
1349        self.index.orders_closed.remove(&client_order_id);
1350        self.index.orders_emulated.remove(&client_order_id);
1351        self.index.orders_inflight.remove(&client_order_id);
1352        self.index.orders_pending_cancel.remove(&client_order_id);
1353    }
1354
1355    /// Purges the position with the `position_id` from the cache (if found).
1356    ///
1357    /// For safety, a position is prevented from being purged if it's open.
1358    pub fn purge_position(&mut self, position_id: PositionId) {
1359        // Snapshot the position so we can release the borrow before mutating indexes.
1360        let position = self
1361            .positions
1362            .get(&position_id)
1363            .map(|cell| cell.borrow().clone());
1364
1365        // Prevent purging open positions
1366        if let Some(ref pos) = position
1367            && pos.is_open()
1368        {
1369            log::warn!("Position {position_id} found open when purging, skipping purge");
1370            return;
1371        }
1372
1373        // If position exists in cache, remove it and clean up position-specific indices
1374        if let Some(ref pos) = position {
1375            self.positions.remove(&position_id);
1376
1377            // Remove from venue positions index
1378            if let Some(venue_positions) =
1379                self.index.venue_positions.get_mut(&pos.instrument_id.venue)
1380            {
1381                venue_positions.remove(&position_id);
1382                if venue_positions.is_empty() {
1383                    self.index.venue_positions.remove(&pos.instrument_id.venue);
1384                }
1385            }
1386
1387            // Remove from instrument positions index
1388            if let Some(instrument_positions) =
1389                self.index.instrument_positions.get_mut(&pos.instrument_id)
1390            {
1391                instrument_positions.remove(&position_id);
1392                if instrument_positions.is_empty() {
1393                    self.index.instrument_positions.remove(&pos.instrument_id);
1394                }
1395            }
1396
1397            // Remove from strategy positions index
1398            if let Some(strategy_positions) =
1399                self.index.strategy_positions.get_mut(&pos.strategy_id)
1400            {
1401                strategy_positions.remove(&position_id);
1402                if strategy_positions.is_empty() {
1403                    self.index.strategy_positions.remove(&pos.strategy_id);
1404                }
1405            }
1406
1407            // Remove from account positions index
1408            if let Some(account_positions) = self.index.account_positions.get_mut(&pos.account_id) {
1409                account_positions.remove(&position_id);
1410                if account_positions.is_empty() {
1411                    self.index.account_positions.remove(&pos.account_id);
1412                }
1413            }
1414
1415            // Remove position ID from orders that reference it
1416            for client_order_id in pos.client_order_ids() {
1417                self.index.order_position.remove(&client_order_id);
1418            }
1419
1420            log::info!("Purged position {position_id}");
1421        } else {
1422            log::warn!("Position {position_id} not found when purging");
1423        }
1424
1425        // Always clean up position indices (even if position not in cache)
1426        self.index.position_strategy.remove(&position_id);
1427        self.index.position_orders.remove(&position_id);
1428        self.index.positions.remove(&position_id);
1429        self.index.positions_open.remove(&position_id);
1430        self.index.positions_closed.remove(&position_id);
1431
1432        // Always clean up position snapshots (even if position not in cache)
1433        self.position_snapshots.remove(&position_id);
1434    }
1435
1436    /// Purges the instrument with the `instrument_id` from the cache (if found).
1437    ///
1438    /// All cache-owned data keyed by the instrument is removed: the instrument record,
1439    /// any synthetic with the same id, order book and own-order-book state, quote/trade
1440    /// histories, mark/index/funding price histories, instrument status, bars for any
1441    /// `BarType` referencing the instrument, and the `instrument_orders` /
1442    /// `instrument_positions` index entries.
1443    ///
1444    /// For safety, an instrument is prevented from being purged while any associated
1445    /// order is non-terminal (anything not in `orders_closed`, including
1446    /// initialized, submitted, accepted, emulated, released, or inflight states) or
1447    /// any associated position is non-closed.
1448    ///
1449    /// Active subscriptions and other live data-engine state are not touched here;
1450    /// those belong to the data and execution engines.
1451    ///
1452    /// # Warning
1453    ///
1454    /// Intended for actors and strategies that have their own lifecycle logic for
1455    /// deciding when an instrument is no longer needed. Purging an instrument that any
1456    /// other actor, strategy, or engine still relies on may cause incorrect behavior
1457    /// (missing instrument lookups, lost market-data history). The caller is
1458    /// responsible for ensuring the instrument is no longer in use before purging.
1459    pub fn purge_instrument(&mut self, instrument_id: InstrumentId) {
1460        #[cfg(feature = "defi")]
1461        let defi_found = self.defi.pools.contains_key(&instrument_id)
1462            || self.defi.pool_profilers.contains_key(&instrument_id);
1463        #[cfg(not(feature = "defi"))]
1464        let defi_found = false;
1465
1466        let found = self.instruments.contains_key(&instrument_id)
1467            || self.synthetics.contains_key(&instrument_id)
1468            || defi_found;
1469
1470        if !found {
1471            log::warn!("Instrument {instrument_id} not found when purging");
1472            return;
1473        }
1474
1475        if let Some(orders) = self.index.instrument_orders.get(&instrument_id) {
1476            let has_non_terminal = orders
1477                .iter()
1478                .any(|client_order_id| !self.index.orders_closed.contains(client_order_id));
1479
1480            if has_non_terminal {
1481                log::warn!(
1482                    "Instrument {instrument_id} has non-terminal orders when purging, skipping purge"
1483                );
1484                return;
1485            }
1486        }
1487
1488        if let Some(positions) = self.index.instrument_positions.get(&instrument_id) {
1489            let has_non_closed = positions
1490                .iter()
1491                .any(|position_id| !self.index.positions_closed.contains(position_id));
1492
1493            if has_non_closed {
1494                log::warn!(
1495                    "Instrument {instrument_id} has non-closed positions when purging, skipping purge"
1496                );
1497                return;
1498            }
1499        }
1500
1501        self.instruments.remove(&instrument_id);
1502        self.synthetics.remove(&instrument_id);
1503        self.books.remove(&instrument_id);
1504        self.own_books.remove(&instrument_id);
1505        self.quotes.remove(&instrument_id);
1506        self.trades.remove(&instrument_id);
1507        self.mark_prices.remove(&instrument_id);
1508        self.index_prices.remove(&instrument_id);
1509        self.funding_rates.remove(&instrument_id);
1510        self.instrument_statuses.remove(&instrument_id);
1511        self.greeks.remove(&instrument_id);
1512        self.option_greeks.remove(&instrument_id);
1513
1514        self.bars
1515            .retain(|bar_type, _| bar_type.instrument_id() != instrument_id);
1516
1517        #[cfg(feature = "defi")]
1518        {
1519            self.defi.pools.remove(&instrument_id);
1520            self.defi.pool_profilers.remove(&instrument_id);
1521        }
1522
1523        self.index.instrument_orders.remove(&instrument_id);
1524        self.index.instrument_positions.remove(&instrument_id);
1525
1526        log::info!("Purged instrument {instrument_id}");
1527    }
1528
1529    /// Purges all account state events which are outside the lookback window.
1530    ///
1531    /// Only events which are outside the lookback window will be purged.
1532    /// A value of 0 means purge all account state events.
1533    pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1534        log::debug!(
1535            "Purging account events{}",
1536            if lookback_secs > 0 {
1537                format!(" with lookback_secs={lookback_secs}")
1538            } else {
1539                String::new()
1540            }
1541        );
1542
1543        for account_cell in self.accounts.values() {
1544            let mut account = account_cell.borrow_mut();
1545            let event_count = account.event_count();
1546            account.purge_account_events(ts_now, lookback_secs);
1547            let count_diff = event_count - account.event_count();
1548            if count_diff > 0 {
1549                log::info!(
1550                    "Purged {} event(s) from account {}",
1551                    count_diff,
1552                    account.id()
1553                );
1554            }
1555        }
1556    }
1557
1558    /// Clears the caches index.
1559    pub fn clear_index(&mut self) {
1560        self.index.clear();
1561        log::debug!("Cleared index");
1562    }
1563
1564    /// Resets the cache.
1565    ///
1566    /// All stateful fields are reset to their initial value. Instruments,
1567    /// currencies and synthetics are retained when `drop_instruments_on_reset`
1568    /// is `false` so that repeated backtest runs can reuse the same dataset.
1569    pub fn reset(&mut self) {
1570        log::debug!("Resetting cache");
1571
1572        self.general.clear();
1573        self.books.clear();
1574        self.own_books.clear();
1575        self.quotes.clear();
1576        self.trades.clear();
1577        self.mark_xrates.clear();
1578        self.mark_prices.clear();
1579        self.index_prices.clear();
1580        self.funding_rates.clear();
1581        self.instrument_statuses.clear();
1582        self.bars.clear();
1583        self.accounts.clear();
1584        self.orders.clear();
1585        self.order_lists.clear();
1586        self.positions.clear();
1587        self.position_snapshots.clear();
1588        self.greeks.clear();
1589        self.yield_curves.clear();
1590
1591        if self.config.drop_instruments_on_reset {
1592            self.currencies.clear();
1593            self.instruments.clear();
1594            self.synthetics.clear();
1595        }
1596
1597        #[cfg(feature = "defi")]
1598        {
1599            self.defi.pools.clear();
1600            self.defi.pool_profilers.clear();
1601        }
1602
1603        self.clear_index();
1604
1605        log::info!("Reset cache");
1606    }
1607
1608    /// Dispose of the cache which will close any underlying database adapter.
1609    ///
1610    /// If closing the database connection fails, an error is logged.
1611    pub fn dispose(&mut self) {
1612        self.reset();
1613
1614        if let Some(database) = &mut self.database
1615            && let Err(e) = database.close()
1616        {
1617            log::error!("Failed to close database during dispose: {e}");
1618        }
1619    }
1620
1621    /// Flushes the caches database which permanently removes all persisted data.
1622    ///
1623    /// If flushing the database connection fails, an error is logged.
1624    pub fn flush_db(&mut self) {
1625        if let Some(database) = &mut self.database
1626            && let Err(e) = database.flush()
1627        {
1628            log::error!("Failed to flush database: {e}");
1629        }
1630    }
1631
1632    /// Adds a raw bytes `value` to the cache under the `key`.
1633    ///
1634    /// The cache stores only raw bytes; interpretation is the caller's responsibility.
1635    ///
1636    /// # Errors
1637    ///
1638    /// Returns an error if persisting the entry to the backing database fails.
1639    pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1640        check_valid_string_ascii(key, stringify!(key))?;
1641        check_predicate_false(value.is_empty(), stringify!(value))?;
1642
1643        log::debug!("Adding general {key}");
1644        self.general.insert(key.to_string(), value.clone());
1645
1646        if let Some(database) = &mut self.database {
1647            database.add(key.to_string(), value)?;
1648        }
1649        Ok(())
1650    }
1651
1652    /// Adds an `OrderBook` to the cache.
1653    ///
1654    /// # Errors
1655    ///
1656    /// Returns an error if persisting the order book to the backing database fails.
1657    pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1658        log::debug!("Adding `OrderBook` {}", book.instrument_id);
1659
1660        if self.config.save_market_data
1661            && let Some(database) = &mut self.database
1662        {
1663            database.add_order_book(&book)?;
1664        }
1665
1666        self.books.insert(book.instrument_id, book);
1667        Ok(())
1668    }
1669
1670    /// Adds an `OwnOrderBook` to the cache.
1671    ///
1672    /// # Errors
1673    ///
1674    /// Returns an error if persisting the own order book fails.
1675    pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1676        log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1677
1678        self.own_books.insert(own_book.instrument_id, own_book);
1679        Ok(())
1680    }
1681
1682    /// Adds the `mark_price` update to the cache.
1683    ///
1684    /// # Errors
1685    ///
1686    /// Returns an error if persisting the mark price to the backing database fails.
1687    pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1688        log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1689
1690        if self.config.save_market_data {
1691            // TODO: Placeholder and return Result for consistency
1692        }
1693
1694        let mark_prices_deque = self
1695            .mark_prices
1696            .entry(mark_price.instrument_id)
1697            .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
1698        mark_prices_deque.push_front(mark_price);
1699        Ok(())
1700    }
1701
1702    /// Adds the `index_price` update to the cache.
1703    ///
1704    /// # Errors
1705    ///
1706    /// Returns an error if persisting the index price to the backing database fails.
1707    pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1708        log::debug!(
1709            "Adding `IndexPriceUpdate` for {}",
1710            index_price.instrument_id
1711        );
1712
1713        if self.config.save_market_data {
1714            // TODO: Placeholder and return Result for consistency
1715        }
1716
1717        let index_prices_deque = self
1718            .index_prices
1719            .entry(index_price.instrument_id)
1720            .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
1721        index_prices_deque.push_front(index_price);
1722        Ok(())
1723    }
1724
1725    /// Adds the `funding_rate` update to the cache.
1726    ///
1727    /// # Errors
1728    ///
1729    /// Returns an error if persisting the funding rate update to the backing database fails.
1730    pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
1731        log::debug!(
1732            "Adding `FundingRateUpdate` for {}",
1733            funding_rate.instrument_id
1734        );
1735
1736        if self.config.save_market_data {
1737            // TODO: Placeholder and return Result for consistency
1738        }
1739
1740        let funding_rates_deque = self
1741            .funding_rates
1742            .entry(funding_rate.instrument_id)
1743            .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
1744        funding_rates_deque.push_front(funding_rate);
1745        Ok(())
1746    }
1747
1748    /// Adds the given `funding rates` to the cache.
1749    ///
1750    /// # Errors
1751    ///
1752    /// Returns an error if persisting the trade ticks to the backing database fails.
1753    pub fn add_funding_rates(&mut self, funding_rates: &[FundingRateUpdate]) -> anyhow::Result<()> {
1754        check_slice_not_empty(funding_rates, stringify!(funding_rates))?;
1755
1756        let instrument_id = funding_rates[0].instrument_id;
1757        log::debug!(
1758            "Adding `FundingRateUpdate`[{}] {instrument_id}",
1759            funding_rates.len()
1760        );
1761
1762        if self.config.save_market_data
1763            && let Some(database) = &mut self.database
1764        {
1765            for funding_rate in funding_rates {
1766                database.add_funding_rate(funding_rate)?;
1767            }
1768        }
1769
1770        let funding_rate_deque = self
1771            .funding_rates
1772            .entry(instrument_id)
1773            .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
1774
1775        for funding_rate in funding_rates {
1776            funding_rate_deque.push_front(*funding_rate);
1777        }
1778        Ok(())
1779    }
1780
1781    /// Adds the `instrument_status` update to the cache.
1782    ///
1783    /// # Errors
1784    ///
1785    /// Returns an error if persisting the instrument status to the backing database fails.
1786    pub fn add_instrument_status(&mut self, status: InstrumentStatus) -> anyhow::Result<()> {
1787        log::debug!("Adding `InstrumentStatus` for {}", status.instrument_id);
1788
1789        if self.config.save_market_data {
1790            // TODO: Placeholder and return Result for consistency
1791        }
1792
1793        let statuses_deque = self
1794            .instrument_statuses
1795            .entry(status.instrument_id)
1796            .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
1797        statuses_deque.push_front(status);
1798        Ok(())
1799    }
1800
1801    /// Adds the `quote` tick to the cache.
1802    ///
1803    /// # Errors
1804    ///
1805    /// Returns an error if persisting the quote tick to the backing database fails.
1806    pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1807        log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1808
1809        if self.config.save_market_data
1810            && let Some(database) = &mut self.database
1811        {
1812            database.add_quote(&quote)?;
1813        }
1814
1815        let quotes_deque = self
1816            .quotes
1817            .entry(quote.instrument_id)
1818            .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
1819        quotes_deque.push_front(quote);
1820        Ok(())
1821    }
1822
1823    /// Adds the `quotes` to the cache.
1824    ///
1825    /// # Errors
1826    ///
1827    /// Returns an error if persisting the quote ticks to the backing database fails.
1828    pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1829        check_slice_not_empty(quotes, stringify!(quotes))?;
1830
1831        let instrument_id = quotes[0].instrument_id;
1832        log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1833
1834        if self.config.save_market_data
1835            && let Some(database) = &mut self.database
1836        {
1837            for quote in quotes {
1838                database.add_quote(quote)?;
1839            }
1840        }
1841
1842        let quotes_deque = self
1843            .quotes
1844            .entry(instrument_id)
1845            .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
1846
1847        for quote in quotes {
1848            quotes_deque.push_front(*quote);
1849        }
1850        Ok(())
1851    }
1852
1853    /// Adds the `trade` tick to the cache.
1854    ///
1855    /// # Errors
1856    ///
1857    /// Returns an error if persisting the trade tick to the backing database fails.
1858    pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1859        log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1860
1861        if self.config.save_market_data
1862            && let Some(database) = &mut self.database
1863        {
1864            database.add_trade(&trade)?;
1865        }
1866
1867        let trades_deque = self
1868            .trades
1869            .entry(trade.instrument_id)
1870            .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
1871        trades_deque.push_front(trade);
1872        Ok(())
1873    }
1874
1875    /// Adds the give `trades` to the cache.
1876    ///
1877    /// # Errors
1878    ///
1879    /// Returns an error if persisting the trade ticks to the backing database fails.
1880    pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1881        check_slice_not_empty(trades, stringify!(trades))?;
1882
1883        let instrument_id = trades[0].instrument_id;
1884        log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1885
1886        if self.config.save_market_data
1887            && let Some(database) = &mut self.database
1888        {
1889            for trade in trades {
1890                database.add_trade(trade)?;
1891            }
1892        }
1893
1894        let trades_deque = self
1895            .trades
1896            .entry(instrument_id)
1897            .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
1898
1899        for trade in trades {
1900            trades_deque.push_front(*trade);
1901        }
1902        Ok(())
1903    }
1904
1905    /// Adds the `bar` to the cache.
1906    ///
1907    /// # Errors
1908    ///
1909    /// Returns an error if persisting the bar to the backing database fails.
1910    pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1911        log::debug!("Adding `Bar` {}", bar.bar_type);
1912
1913        if self.config.save_market_data
1914            && let Some(database) = &mut self.database
1915        {
1916            database.add_bar(&bar)?;
1917        }
1918
1919        let bars = self
1920            .bars
1921            .entry(bar.bar_type)
1922            .or_insert_with(|| BoundedVecDeque::new(self.config.bar_capacity));
1923        bars.push_front(bar);
1924        Ok(())
1925    }
1926
1927    /// Adds the `bars` to the cache.
1928    ///
1929    /// # Errors
1930    ///
1931    /// Returns an error if persisting the bars to the backing database fails.
1932    pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1933        check_slice_not_empty(bars, stringify!(bars))?;
1934
1935        let bar_type = bars[0].bar_type;
1936        log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1937
1938        if self.config.save_market_data
1939            && let Some(database) = &mut self.database
1940        {
1941            for bar in bars {
1942                database.add_bar(bar)?;
1943            }
1944        }
1945
1946        let bars_deque = self
1947            .bars
1948            .entry(bar_type)
1949            .or_insert_with(|| BoundedVecDeque::new(self.config.bar_capacity));
1950
1951        for bar in bars {
1952            bars_deque.push_front(*bar);
1953        }
1954        Ok(())
1955    }
1956
1957    /// Adds the `greeks` data to the cache.
1958    ///
1959    /// # Errors
1960    ///
1961    /// Returns an error if persisting the greeks data to the backing database fails.
1962    pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1963        log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1964
1965        if self.config.save_market_data
1966            && let Some(_database) = &mut self.database
1967        {
1968            // TODO: Implement database.add_greeks(&greeks) when database adapter is updated
1969        }
1970
1971        self.greeks.insert(greeks.instrument_id, greeks);
1972        Ok(())
1973    }
1974
1975    /// Gets the greeks data for the `instrument_id`.
1976    pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1977        self.greeks.get(instrument_id).cloned()
1978    }
1979
1980    /// Adds exchange-provided option greeks to the cache.
1981    pub fn add_option_greeks(&mut self, greeks: OptionGreeks) {
1982        log::debug!("Adding `OptionGreeks` {}", greeks.instrument_id);
1983        self.option_greeks.insert(greeks.instrument_id, greeks);
1984    }
1985
1986    /// Gets a reference to the exchange-provided option greeks for the `instrument_id`.
1987    #[must_use]
1988    pub fn option_greeks(&self, instrument_id: &InstrumentId) -> Option<&OptionGreeks> {
1989        self.option_greeks.get(instrument_id)
1990    }
1991
1992    /// Adds the `yield_curve` data to the cache.
1993    ///
1994    /// # Errors
1995    ///
1996    /// Returns an error if persisting the yield curve data to the backing database fails.
1997    pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1998        log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1999
2000        if self.config.save_market_data
2001            && let Some(_database) = &mut self.database
2002        {
2003            // TODO: Implement database.add_yield_curve(&yield_curve) when database adapter is updated
2004        }
2005
2006        self.yield_curves
2007            .insert(yield_curve.curve_name.clone(), yield_curve);
2008        Ok(())
2009    }
2010
2011    /// Gets the yield curve for the `key`.
2012    pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
2013        self.yield_curves.get(key).map(|curve| {
2014            let curve_clone = curve.clone();
2015            Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
2016                as Box<dyn Fn(f64) -> f64>
2017        })
2018    }
2019
2020    /// Adds the `currency` to the cache.
2021    ///
2022    /// # Errors
2023    ///
2024    /// Returns an error if persisting the currency to the backing database fails.
2025    pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
2026        if self.currencies.contains_key(&currency.code) {
2027            return Ok(());
2028        }
2029        log::debug!("Adding `Currency` {}", currency.code);
2030
2031        if let Some(database) = &mut self.database {
2032            database.add_currency(&currency)?;
2033        }
2034
2035        self.currencies.insert(currency.code, currency);
2036        Ok(())
2037    }
2038
2039    /// Adds the `instrument` to the cache.
2040    ///
2041    /// # Errors
2042    ///
2043    /// Returns an error if persisting the instrument to the backing database fails.
2044    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
2045        log::debug!("Adding `Instrument` {}", instrument.id());
2046
2047        // Ensure currencies exist in cache - safe to call repeatedly as add_currency is idempotent
2048        if let Some(base_currency) = instrument.base_currency() {
2049            self.add_currency(base_currency)?;
2050        }
2051        self.add_currency(instrument.quote_currency())?;
2052        self.add_currency(instrument.settlement_currency())?;
2053
2054        if let Some(database) = &mut self.database {
2055            database.add_instrument(&instrument)?;
2056        }
2057
2058        self.instruments.insert(instrument.id(), instrument);
2059        Ok(())
2060    }
2061
2062    /// Adds the `synthetic` instrument to the cache.
2063    ///
2064    /// # Errors
2065    ///
2066    /// Returns an error if persisting the synthetic instrument to the backing database fails.
2067    pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
2068        log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
2069
2070        if let Some(database) = &mut self.database {
2071            database.add_synthetic(&synthetic)?;
2072        }
2073
2074        self.synthetics.insert(synthetic.id, synthetic);
2075        Ok(())
2076    }
2077
2078    /// Adds the `account` to the cache.
2079    ///
2080    /// # Errors
2081    ///
2082    /// Returns an error if persisting the account to the backing database fails.
2083    pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
2084        log::debug!("Adding `Account` {}", account.id());
2085
2086        if let Some(database) = &mut self.database {
2087            database.add_account(&account)?;
2088        }
2089
2090        let account_id = account.id();
2091        self.accounts.insert(account_id, SharedCell::new(account));
2092        self.index
2093            .venue_account
2094            .insert(account_id.get_issuer(), account_id);
2095        Ok(())
2096    }
2097
2098    /// Indexes the `client_order_id` with the `venue_order_id`.
2099    ///
2100    /// The `overwrite` parameter determines whether to overwrite any existing cached identifier.
2101    ///
2102    /// # Errors
2103    ///
2104    /// Returns an error if the existing venue order ID conflicts and overwrite is false.
2105    pub fn add_venue_order_id(
2106        &mut self,
2107        client_order_id: &ClientOrderId,
2108        venue_order_id: &VenueOrderId,
2109        overwrite: bool,
2110    ) -> anyhow::Result<()> {
2111        if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
2112            && !overwrite
2113            && existing_venue_order_id != venue_order_id
2114        {
2115            anyhow::bail!(
2116                "Existing {existing_venue_order_id} for {client_order_id}
2117                    did not match the given {venue_order_id}.
2118                    If you are writing a test then try a different `venue_order_id`,
2119                    otherwise this is probably a bug."
2120            );
2121        }
2122
2123        self.index
2124            .client_order_ids
2125            .insert(*client_order_id, *venue_order_id);
2126        self.index
2127            .venue_order_ids
2128            .insert(*venue_order_id, *client_order_id);
2129
2130        Ok(())
2131    }
2132
2133    /// Adds the `order` to the cache indexed with any given identifiers.
2134    ///
2135    /// # Parameters
2136    ///
2137    /// `override_existing`: If the added order should 'override' any existing order and replace
2138    /// it in the cache. This is currently used for emulated orders which are
2139    /// being released and transformed into another type.
2140    ///
2141    /// # Errors
2142    ///
2143    /// Returns an error if not `replace_existing` and the `order.client_order_id` is already contained in the cache.
2144    pub fn add_order(
2145        &mut self,
2146        order: OrderAny,
2147        position_id: Option<PositionId>,
2148        client_id: Option<ClientId>,
2149        replace_existing: bool,
2150    ) -> anyhow::Result<()> {
2151        let instrument_id = order.instrument_id();
2152        let venue = instrument_id.venue;
2153        let client_order_id = order.client_order_id();
2154        let strategy_id = order.strategy_id();
2155        let exec_algorithm_id = order.exec_algorithm_id();
2156        let exec_spawn_id = order.exec_spawn_id();
2157
2158        if !replace_existing {
2159            check_key_not_in_map(
2160                &client_order_id,
2161                &self.orders,
2162                stringify!(client_order_id),
2163                stringify!(orders),
2164            )?;
2165        }
2166
2167        log::debug!("Adding {order:?}");
2168
2169        self.index.orders.insert(client_order_id);
2170
2171        if order.is_active_local() {
2172            self.index.orders_active_local.insert(client_order_id);
2173        }
2174        self.index
2175            .order_strategy
2176            .insert(client_order_id, strategy_id);
2177        self.index.strategies.insert(strategy_id);
2178
2179        // Update venue -> orders index
2180        self.index
2181            .venue_orders
2182            .entry(venue)
2183            .or_default()
2184            .insert(client_order_id);
2185
2186        // Update instrument -> orders index
2187        self.index
2188            .instrument_orders
2189            .entry(instrument_id)
2190            .or_default()
2191            .insert(client_order_id);
2192
2193        // Update strategy -> orders index
2194        self.index
2195            .strategy_orders
2196            .entry(strategy_id)
2197            .or_default()
2198            .insert(client_order_id);
2199
2200        // Update account -> orders index (if account_id known at creation)
2201        if let Some(account_id) = order.account_id() {
2202            self.index
2203                .account_orders
2204                .entry(account_id)
2205                .or_default()
2206                .insert(client_order_id);
2207        }
2208
2209        // Update exec_algorithm -> orders index
2210        if let Some(exec_algorithm_id) = exec_algorithm_id {
2211            self.index.exec_algorithms.insert(exec_algorithm_id);
2212
2213            self.index
2214                .exec_algorithm_orders
2215                .entry(exec_algorithm_id)
2216                .or_default()
2217                .insert(client_order_id);
2218        }
2219
2220        // Update exec_spawn -> orders index
2221        if let Some(exec_spawn_id) = exec_spawn_id {
2222            self.index
2223                .exec_spawn_orders
2224                .entry(exec_spawn_id)
2225                .or_default()
2226                .insert(client_order_id);
2227        }
2228
2229        // Update emulation index
2230        if let Some(emulation_trigger) = order.emulation_trigger()
2231            && emulation_trigger != TriggerType::NoTrigger
2232        {
2233            self.index.orders_emulated.insert(client_order_id);
2234        }
2235
2236        // Index position ID if provided
2237        if let Some(position_id) = position_id {
2238            self.add_position_id(
2239                &position_id,
2240                &order.instrument_id().venue,
2241                &client_order_id,
2242                &strategy_id,
2243            )?;
2244        }
2245
2246        // Index client ID if provided
2247        if let Some(client_id) = client_id {
2248            self.index.order_client.insert(client_order_id, client_id);
2249            log::debug!("Indexed {client_id:?}");
2250        }
2251
2252        if let Some(database) = &mut self.database {
2253            database.add_order(&order, client_id)?;
2254            // TODO: Implement
2255            // if self.config.snapshot_orders {
2256            //     database.snapshot_order_state(order)?;
2257            // }
2258        }
2259
2260        match self.orders.get(&client_order_id) {
2261            // Reuse the existing cell on replace so the canonical entry stays in place
2262            // rather than orphaning a stale cell.
2263            Some(order_cell) => *order_cell.borrow_mut() = order,
2264            None => {
2265                self.orders.insert(client_order_id, SharedCell::new(order));
2266            }
2267        }
2268
2269        Ok(())
2270    }
2271
2272    /// Adds the `order_list` to the cache.
2273    ///
2274    /// # Errors
2275    ///
2276    /// Returns an error if the order list ID is already contained in the cache.
2277    pub fn add_order_list(&mut self, order_list: OrderList) -> anyhow::Result<()> {
2278        let order_list_id = order_list.id;
2279        check_key_not_in_map(
2280            &order_list_id,
2281            &self.order_lists,
2282            stringify!(order_list_id),
2283            stringify!(order_lists),
2284        )?;
2285
2286        log::debug!("Adding {order_list:?}");
2287        self.order_lists.insert(order_list_id, order_list);
2288        Ok(())
2289    }
2290
2291    /// Indexes the `position_id` with the other given IDs.
2292    ///
2293    /// # Errors
2294    ///
2295    /// Returns an error if indexing position ID in the backing database fails.
2296    pub fn add_position_id(
2297        &mut self,
2298        position_id: &PositionId,
2299        venue: &Venue,
2300        client_order_id: &ClientOrderId,
2301        strategy_id: &StrategyId,
2302    ) -> anyhow::Result<()> {
2303        self.index
2304            .order_position
2305            .insert(*client_order_id, *position_id);
2306
2307        // Index: ClientOrderId -> PositionId
2308        if let Some(database) = &mut self.database {
2309            database.index_order_position(*client_order_id, *position_id)?;
2310        }
2311
2312        // Index: PositionId -> StrategyId
2313        self.index
2314            .position_strategy
2315            .insert(*position_id, *strategy_id);
2316
2317        // Index: PositionId -> set[ClientOrderId]
2318        self.index
2319            .position_orders
2320            .entry(*position_id)
2321            .or_default()
2322            .insert(*client_order_id);
2323
2324        // Index: StrategyId -> set[PositionId]
2325        self.index
2326            .strategy_positions
2327            .entry(*strategy_id)
2328            .or_default()
2329            .insert(*position_id);
2330
2331        // Index: Venue -> set[PositionId]
2332        self.index
2333            .venue_positions
2334            .entry(*venue)
2335            .or_default()
2336            .insert(*position_id);
2337
2338        Ok(())
2339    }
2340
2341    // Propagates parent OTO `position_id` to contingent children that are missing one.
2342    //
2343    // Recovers from a partial-write window during fill handling: the fill-time path in the
2344    // execution engine assigns `position_id` to each contingent child in a non-atomic loop
2345    // (`set_position_id` then `add_position_id`), so a crash mid-loop can leave the database
2346    // with the parent updated and some children un-updated. This pass re-applies any missing
2347    // assignments after load. Mirrors the Cython behaviour at
2348    // `nautilus_trader/cache/cache.pyx::_assign_position_id_to_contingencies`.
2349    fn assign_position_ids_to_contingencies(&mut self) {
2350        let mut assignments: Vec<(PositionId, ClientOrderId)> = Vec::new();
2351
2352        for parent_order_cell in self.orders.values() {
2353            let parent = parent_order_cell.borrow();
2354            if parent.contingency_type() != Some(ContingencyType::Oto) {
2355                continue;
2356            }
2357            let Some(parent_position_id) = parent.position_id() else {
2358                continue;
2359            };
2360            let Some(linked_order_ids) = parent.linked_order_ids() else {
2361                continue;
2362            };
2363
2364            for client_order_id in linked_order_ids {
2365                match self.orders.get(client_order_id) {
2366                    None => {
2367                        log::error!("Contingency order {client_order_id} not found");
2368                    }
2369                    Some(contingent_order_cell) => {
2370                        if contingent_order_cell.borrow().position_id().is_none() {
2371                            assignments.push((parent_position_id, *client_order_id));
2372                        }
2373                    }
2374                }
2375            }
2376        }
2377
2378        for (position_id, client_order_id) in assignments {
2379            let Some((venue, strategy_id)) = self.orders.get(&client_order_id).map(|order_cell| {
2380                let mut contingent = order_cell.borrow_mut();
2381                contingent.set_position_id(Some(position_id));
2382                (contingent.instrument_id().venue, contingent.strategy_id())
2383            }) else {
2384                continue;
2385            };
2386
2387            // In-memory index updates only. The persistent index entry (if any) was written by
2388            // the original fill-time `add_position_id` call; replaying the database write here
2389            // would invoke `CacheDatabaseAdapter::index_order_position`, which is currently
2390            // `todo!()` on both the Redis and SQL adapters. Until those land, the load-time
2391            // recovery is in-memory-only: sufficient for the current process to operate, but
2392            // not durable across another restart.
2393            self.index
2394                .order_position
2395                .insert(client_order_id, position_id);
2396            self.index
2397                .position_strategy
2398                .insert(position_id, strategy_id);
2399            self.index
2400                .position_orders
2401                .entry(position_id)
2402                .or_default()
2403                .insert(client_order_id);
2404            self.index
2405                .strategy_positions
2406                .entry(strategy_id)
2407                .or_default()
2408                .insert(position_id);
2409            self.index
2410                .venue_positions
2411                .entry(venue)
2412                .or_default()
2413                .insert(position_id);
2414        }
2415    }
2416
2417    /// Adds the `position` to the cache.
2418    ///
2419    /// # Errors
2420    ///
2421    /// Returns an error if persisting the position to the backing database fails.
2422    pub fn add_position(&mut self, position: &Position, _oms_type: OmsType) -> anyhow::Result<()> {
2423        self.positions
2424            .insert(position.id, SharedCell::new(position.clone()));
2425        self.index.positions.insert(position.id);
2426        self.index.positions_open.insert(position.id);
2427        self.index.positions_closed.remove(&position.id); // Cleanup for NETTING reopen
2428
2429        log::debug!("Adding {position}");
2430
2431        self.add_position_id(
2432            &position.id,
2433            &position.instrument_id.venue,
2434            &position.opening_order_id,
2435            &position.strategy_id,
2436        )?;
2437
2438        let venue = position.instrument_id.venue;
2439        let venue_positions = self.index.venue_positions.entry(venue).or_default();
2440        venue_positions.insert(position.id);
2441
2442        // Index: InstrumentId -> AHashSet
2443        let instrument_id = position.instrument_id;
2444        let instrument_positions = self
2445            .index
2446            .instrument_positions
2447            .entry(instrument_id)
2448            .or_default();
2449        instrument_positions.insert(position.id);
2450
2451        // Index: AccountId -> AHashSet<PositionId>
2452        self.index
2453            .account_positions
2454            .entry(position.account_id)
2455            .or_default()
2456            .insert(position.id);
2457
2458        if let Some(database) = &mut self.database {
2459            database.add_position(position)?;
2460            // TODO: Implement position snapshots
2461            // if self.snapshot_positions {
2462            //     database.snapshot_position_state(
2463            //         position,
2464            //         position.ts_last,
2465            //         self.calculate_unrealized_pnl(&position),
2466            //     )?;
2467            // }
2468        }
2469
2470        Ok(())
2471    }
2472
2473    /// Updates the `account` in the cache.
2474    ///
2475    /// Reuses the existing cell when present so any held [`AccountRef`] handles continue to point
2476    /// at the canonical entry; only inserts a new cell when the account is unknown.
2477    ///
2478    /// # Errors
2479    ///
2480    /// Returns an error if updating the account in the database fails.
2481    pub fn update_account(&mut self, account: &AccountAny) -> anyhow::Result<()> {
2482        let account_id = account.id();
2483        match self.accounts.get(&account_id) {
2484            Some(account_cell) => *account_cell.borrow_mut() = account.clone(),
2485            None => {
2486                self.accounts
2487                    .insert(account_id, SharedCell::new(account.clone()));
2488            }
2489        }
2490
2491        if let Some(database) = &mut self.database {
2492            database.update_account(account)?;
2493        }
2494        Ok(())
2495    }
2496
2497    /// Removes the `account` from the cache and returns it.
2498    ///
2499    /// This supports hot paths which need owned account mutation without
2500    /// cloning the account event history. The cache is the sole owner of the
2501    /// account cell (the field is private and accessors only hand out
2502    /// lifetime-scoped [`AccountRef`] borrows), so the value is moved out of
2503    /// its cell rather than cloned.
2504    ///
2505    /// # Panics
2506    ///
2507    /// Panics if the cache no longer holds the only strong handle to the
2508    /// account cell. This indicates an internal invariant violation: some
2509    /// component cloned the underlying [`SharedCell`] and held it past the
2510    /// scope of a single cache method.
2511    #[must_use]
2512    pub fn take_account(&mut self, account_id: &AccountId) -> Option<AccountAny> {
2513        self.accounts.remove(account_id).map(|cell| {
2514            let rc: Rc<RefCell<AccountAny>> = cell.into();
2515            Rc::try_unwrap(rc).map_or_else(
2516                |_| panic!("take_account: cache must be sole owner of {account_id} cell"),
2517                RefCell::into_inner,
2518            )
2519        })
2520    }
2521
2522    /// Caches the `account` in memory without updating the database.
2523    pub fn cache_account_owned(&mut self, account: AccountAny) {
2524        let account_id = account.id();
2525        self.index
2526            .venue_account
2527            .insert(account_id.get_issuer(), account_id);
2528        match self.accounts.get(&account_id) {
2529            Some(account_cell) => *account_cell.borrow_mut() = account,
2530            None => {
2531                self.accounts.insert(account_id, SharedCell::new(account));
2532            }
2533        }
2534    }
2535
2536    /// Updates the `account` in the cache, taking ownership of the updated account.
2537    ///
2538    /// # Errors
2539    ///
2540    /// Returns an error if updating the account in the database fails.
2541    pub fn update_account_owned(&mut self, account: AccountAny) -> anyhow::Result<()> {
2542        let account_id = account.id();
2543        self.cache_account_owned(account);
2544
2545        if let Some(database) = &mut self.database {
2546            let Some(account_cell) = self.accounts.get(&account_id) else {
2547                anyhow::bail!("Account {account_id} not found after cache update");
2548            };
2549            database.update_account(&account_cell.borrow())?;
2550        }
2551        Ok(())
2552    }
2553
2554    /// Applies an account state event to the cached account.
2555    ///
2556    /// Mutates the cached account in place to avoid cloning the account event
2557    /// history on the hot path; long-running sessions accumulate many events
2558    /// per account, so a snapshot-clone here would be O(history) per update.
2559    ///
2560    /// # Errors
2561    ///
2562    /// Returns an error if applying or persisting the account state fails.
2563    pub fn update_account_state(&mut self, event: &AccountState) -> anyhow::Result<()> {
2564        let Some(cell) = self.accounts.get(&event.account_id) else {
2565            return self.add_account(AccountAny::from_events(std::slice::from_ref(event))?);
2566        };
2567
2568        cell.borrow_mut().apply(event.clone())?;
2569
2570        if let Some(database) = &mut self.database {
2571            database.update_account(&cell.borrow())?;
2572        }
2573        Ok(())
2574    }
2575
2576    /// Replaces the cached `order` from a non-event snapshot.
2577    ///
2578    /// Prefer [`Self::update_order`] for lifecycle state changes. Use this only for order state
2579    /// that is not represented by [`OrderEventAny`].
2580    ///
2581    /// # Errors
2582    ///
2583    /// Returns an error if updating the order indexes or database fails.
2584    pub fn replace_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
2585        self.refresh_order(order)?;
2586
2587        let client_order_id = order.client_order_id();
2588        match self.orders.get(&client_order_id) {
2589            // Reuse the existing cell so the canonical entry stays in place rather than
2590            // orphaning a stale cell.
2591            Some(order_cell) => *order_cell.borrow_mut() = order.clone(),
2592            None => {
2593                self.orders
2594                    .insert(client_order_id, SharedCell::new(order.clone()));
2595            }
2596        }
2597
2598        Ok(())
2599    }
2600
2601    /// Updates the cached order by applying an event and refreshing derived cache state.
2602    ///
2603    /// # Errors
2604    ///
2605    /// Returns an error if the order is not found or rejects the event.
2606    pub fn update_order(&mut self, event: &OrderEventAny) -> anyhow::Result<OrderAny> {
2607        let event_client_order_id = event.client_order_id();
2608        let client_order_id = if self.order_exists(&event_client_order_id) {
2609            event_client_order_id
2610        } else if let Some(venue_order_id) = event.venue_order_id() {
2611            self.index
2612                .venue_order_ids
2613                .get(&venue_order_id)
2614                .copied()
2615                .ok_or(OrderError::NotFound(event_client_order_id))?
2616        } else {
2617            return Err(OrderError::NotFound(event_client_order_id).into());
2618        };
2619
2620        let order_cell = self
2621            .orders
2622            .get(&client_order_id)
2623            .cloned()
2624            .ok_or(OrderError::NotFound(client_order_id))?;
2625
2626        // Apply on a snapshot first so a fallible `apply` (e.g. invalid state
2627        // transition) leaves the canonical cell untouched. On success we swap the
2628        // post-event value back into the cell so subsequent reads see the new state.
2629        let mut snapshot = order_cell.borrow().clone();
2630        snapshot.apply(event.clone())?;
2631        *order_cell.borrow_mut() = snapshot.clone();
2632
2633        if let Err(e) = self.refresh_order(&snapshot) {
2634            log::error!("Error updating order in cache: {e}");
2635        }
2636
2637        Ok(snapshot)
2638    }
2639
2640    fn refresh_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
2641        let client_order_id = order.client_order_id();
2642
2643        if order.is_active_local() {
2644            self.index.orders_active_local.insert(client_order_id);
2645        } else {
2646            self.index.orders_active_local.remove(&client_order_id);
2647        }
2648
2649        // Update venue order ID
2650        if let Some(venue_order_id) = order.venue_order_id() {
2651            // If the order is being modified then we allow a changing `VenueOrderId` to accommodate
2652            // venues which use a cancel+replace update strategy.
2653            if !self.index.venue_order_ids.contains_key(&venue_order_id) {
2654                let overwrite = matches!(order.last_event(), OrderEventAny::Updated(_));
2655                if let Err(e) =
2656                    self.add_venue_order_id(&order.client_order_id(), &venue_order_id, overwrite)
2657                {
2658                    log::error!("Error indexing venue order ID in cache: {e}");
2659                }
2660            }
2661        }
2662
2663        // Update in-flight state
2664        if order.is_inflight() {
2665            self.index.orders_inflight.insert(client_order_id);
2666        } else {
2667            self.index.orders_inflight.remove(&client_order_id);
2668        }
2669
2670        // Update open/closed state
2671        if order.is_open() {
2672            self.index.orders_closed.remove(&client_order_id);
2673            self.index.orders_open.insert(client_order_id);
2674        } else if order.is_closed() {
2675            self.index.orders_open.remove(&client_order_id);
2676            self.index.orders_pending_cancel.remove(&client_order_id);
2677            self.index.orders_closed.insert(client_order_id);
2678        }
2679
2680        // Update emulation index
2681        if let Some(emulation_trigger) = order.emulation_trigger()
2682            && emulation_trigger != TriggerType::NoTrigger
2683            && !order.is_closed()
2684        {
2685            self.index.orders_emulated.insert(client_order_id);
2686        } else {
2687            self.index.orders_emulated.remove(&client_order_id);
2688        }
2689
2690        // Update account orders index when account_id becomes available
2691        if let Some(account_id) = order.account_id() {
2692            self.index
2693                .account_orders
2694                .entry(account_id)
2695                .or_default()
2696                .insert(client_order_id);
2697        }
2698
2699        // Update own book
2700        if !self.own_books.is_empty() {
2701            let own_book = self.own_order_book(&order.instrument_id());
2702            if (own_book.is_some() && order.is_closed()) || should_handle_own_book_order(order) {
2703                self.update_own_order_book(order);
2704            }
2705        }
2706
2707        if let Some(database) = &mut self.database {
2708            database.update_order(order.last_event())?;
2709            // TODO: Implement order snapshots
2710            // if self.snapshot_orders {
2711            //     database.snapshot_order_state(order)?;
2712            // }
2713        }
2714
2715        Ok(())
2716    }
2717
2718    /// Updates the `order` as pending cancel locally.
2719    pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
2720        self.index
2721            .orders_pending_cancel
2722            .insert(order.client_order_id());
2723    }
2724
2725    /// Updates the `position` in the cache.
2726    ///
2727    /// Reuses the existing cell when present so any held [`PositionRef`] handles continue to point
2728    /// at the canonical entry; only inserts a new cell when the position is unknown.
2729    ///
2730    /// # Errors
2731    ///
2732    /// Returns an error if updating the position in the database fails.
2733    pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
2734        // Update open/closed state
2735
2736        if position.is_open() {
2737            self.index.positions_open.insert(position.id);
2738            self.index.positions_closed.remove(&position.id);
2739        } else {
2740            self.index.positions_closed.insert(position.id);
2741            self.index.positions_open.remove(&position.id);
2742        }
2743
2744        if let Some(database) = &mut self.database {
2745            database.update_position(position)?;
2746            // TODO: Implement order snapshots
2747            // if self.snapshot_orders {
2748            //     database.snapshot_order_state(order)?;
2749            // }
2750        }
2751
2752        match self.positions.get(&position.id) {
2753            Some(position_cell) => *position_cell.borrow_mut() = position.clone(),
2754            None => {
2755                self.positions
2756                    .insert(position.id, SharedCell::new(position.clone()));
2757            }
2758        }
2759
2760        Ok(())
2761    }
2762
2763    /// Creates a snapshot of the `position` by cloning it, assigning a new ID,
2764    /// serializing it, and storing it in the position snapshots.
2765    ///
2766    /// # Errors
2767    ///
2768    /// Returns an error if serializing or storing the position snapshot fails.
2769    pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<CacheSnapshotRef> {
2770        let position_id = position.id;
2771
2772        let mut copied_position = position.clone();
2773        let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
2774        copied_position.id = PositionId::new(new_id);
2775
2776        // Serialize the position (TODO: temporarily just to JSON to remove a dependency)
2777        let position_serialized = serde_json::to_vec(&copied_position)?;
2778        let snapshot_index = self.position_snapshot_count(&position_id);
2779        let blob_ref = format!(
2780            "cache://position-snapshots/{}/{}",
2781            position_id.as_str(),
2782            snapshot_index,
2783        );
2784        let snapshot_blob = Bytes::from(position_serialized);
2785
2786        self.add(&blob_ref, snapshot_blob.clone())?;
2787        self.position_snapshots
2788            .entry(position_id)
2789            .or_default()
2790            .push(snapshot_blob.clone());
2791
2792        log::debug!("Snapshot {copied_position}");
2793        Ok(CacheSnapshotRef::new(blob_ref, snapshot_blob))
2794    }
2795
2796    /// Loads the cache-owned snapshot blob stored under `blob_ref`.
2797    ///
2798    /// The cache first checks in-memory snapshot state. When the blob is not present and a
2799    /// database adapter exists, the generic cache entries are loaded and checked for the same
2800    /// opaque reference.
2801    ///
2802    /// # Errors
2803    ///
2804    /// Returns an error if loading generic cache entries from the backing database fails.
2805    pub fn load_snapshot_blob(&mut self, blob_ref: &str) -> anyhow::Result<Option<Bytes>> {
2806        if let Some(blob) = self.snapshot_blob(blob_ref) {
2807            return Ok(Some(blob));
2808        }
2809
2810        if self.database.is_some() {
2811            self.cache_general()?;
2812        }
2813
2814        Ok(self.snapshot_blob(blob_ref))
2815    }
2816
2817    /// Restores the cache-owned snapshot blob stored under `blob_ref`.
2818    ///
2819    /// Only cache-owned `cache://position-snapshots/...` blobs are currently supported.
2820    ///
2821    /// # Errors
2822    ///
2823    /// Returns an error if the blob reference is unsupported, malformed, skips earlier
2824    /// snapshot frames, conflicts with an existing frame, or does not decode to the expected
2825    /// position snapshot.
2826    pub fn restore_snapshot_blob(&mut self, blob_ref: &str, blob: Bytes) -> anyhow::Result<()> {
2827        let (position_id, snapshot_index) = parse_position_snapshot_blob_ref(blob_ref)?;
2828        validate_position_snapshot_blob(&position_id, blob.as_ref())?;
2829
2830        let frames = self.position_snapshots.entry(position_id).or_default();
2831        match frames.get(snapshot_index) {
2832            Some(existing) if existing == &blob => {}
2833            Some(_) => {
2834                anyhow::bail!(
2835                    "position snapshot frame {snapshot_index} for {position_id} already exists with different bytes"
2836                );
2837            }
2838            None if frames.len() == snapshot_index => frames.push(blob.clone()),
2839            None => {
2840                anyhow::bail!(
2841                    "position snapshot blob_ref {blob_ref} skips missing frame {}",
2842                    frames.len()
2843                );
2844            }
2845        }
2846
2847        self.general.insert(blob_ref.to_string(), blob);
2848        Ok(())
2849    }
2850
2851    fn snapshot_blob(&self, blob_ref: &str) -> Option<Bytes> {
2852        if let Some(blob) = self.general.get(blob_ref) {
2853            return Some(blob.clone());
2854        }
2855
2856        let (position_id, snapshot_index) = parse_position_snapshot_blob_ref(blob_ref).ok()?;
2857        self.position_snapshots
2858            .get(&position_id)
2859            .and_then(|frames| frames.get(snapshot_index))
2860            .cloned()
2861    }
2862
2863    /// Creates a snapshot of the `position` state in the database.
2864    ///
2865    /// # Errors
2866    ///
2867    /// Returns an error if snapshotting the position state fails.
2868    pub fn snapshot_position_state(
2869        &mut self,
2870        position: &Position,
2871        // ts_snapshot: u64,
2872        // unrealized_pnl: Option<Money>,
2873        open_only: Option<bool>,
2874    ) -> anyhow::Result<()> {
2875        let open_only = open_only.unwrap_or(true);
2876
2877        if open_only && !position.is_open() {
2878            return Ok(());
2879        }
2880
2881        if let Some(database) = &mut self.database {
2882            database.snapshot_position_state(position).map_err(|e| {
2883                log::error!(
2884                    "Failed to snapshot position state for {}: {e:?}",
2885                    position.id
2886                );
2887                e
2888            })?;
2889        } else {
2890            log::warn!(
2891                "Cannot snapshot position state for {} (no database configured)",
2892                position.id
2893            );
2894        }
2895
2896        // Ok(())
2897        todo!()
2898    }
2899
2900    /// Gets the OMS type for the `position_id`.
2901    #[must_use]
2902    pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
2903        // Get OMS type from the index
2904        if self.index.position_strategy.contains_key(position_id) {
2905            // For now, we'll default to NETTING
2906            // TODO: Store and retrieve actual OMS type per position
2907            Some(OmsType::Netting)
2908        } else {
2909            None
2910        }
2911    }
2912
2913    /// Gets the serialized position snapshot frames for the `position_id`.
2914    ///
2915    /// Each element in the returned vector is one JSON-encoded [`Position`] snapshot,
2916    /// in the order they were taken.
2917    #[must_use]
2918    pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<Vec<u8>>> {
2919        self.position_snapshots
2920            .get(position_id)
2921            .map(|frames| frames.iter().map(|b| b.to_vec()).collect())
2922    }
2923
2924    /// Returns the number of stored snapshot frames for the `position_id`.
2925    ///
2926    /// Returns `0` when no frames are stored. Does not allocate or copy frame bytes.
2927    #[must_use]
2928    pub fn position_snapshot_count(&self, position_id: &PositionId) -> usize {
2929        self.position_snapshots.get(position_id).map_or(0, Vec::len)
2930    }
2931
2932    /// Returns all position snapshots with the given optional filters.
2933    ///
2934    /// When `position_id` is `Some`, only snapshots for that position are returned.
2935    /// When `account_id` is `Some`, snapshots are filtered to that account.
2936    /// Frames that fail to deserialize are skipped with a warning.
2937    #[must_use]
2938    pub fn position_snapshots(
2939        &self,
2940        position_id: Option<&PositionId>,
2941        account_id: Option<&AccountId>,
2942    ) -> Vec<Position> {
2943        let frames: Box<dyn Iterator<Item = &Bytes> + '_> = match position_id {
2944            Some(pid) => match self.position_snapshots.get(pid) {
2945                Some(v) => Box::new(v.iter()),
2946                None => Box::new(std::iter::empty()),
2947            },
2948            None => Box::new(self.position_snapshots.values().flat_map(|v| v.iter())),
2949        };
2950
2951        let mut results: Vec<Position> = frames
2952            .filter_map(|bytes| match serde_json::from_slice::<Position>(bytes) {
2953                Ok(position) => Some(position),
2954                Err(e) => {
2955                    log::warn!("Failed to decode position snapshot: {e}");
2956                    None
2957                }
2958            })
2959            .collect();
2960
2961        if let Some(aid) = account_id {
2962            results.retain(|p| p.account_id == *aid);
2963        }
2964
2965        results
2966    }
2967
2968    /// Returns position snapshots for `position_id` starting from the `skip`th frame.
2969    ///
2970    /// Use this to deserialize only newly appended snapshots when the caller already
2971    /// processed earlier frames. Returns an empty vector when no frames or fewer than
2972    /// `skip` frames are stored. Frames that fail to deserialize are skipped with a warning.
2973    #[must_use]
2974    pub fn position_snapshots_from(&self, position_id: &PositionId, skip: usize) -> Vec<Position> {
2975        let Some(frames) = self.position_snapshots.get(position_id) else {
2976            return Vec::new();
2977        };
2978
2979        frames
2980            .iter()
2981            .skip(skip)
2982            .filter_map(|bytes| match serde_json::from_slice::<Position>(bytes) {
2983                Ok(position) => Some(position),
2984                Err(e) => {
2985                    log::warn!("Failed to decode position snapshot: {e}");
2986                    None
2987                }
2988            })
2989            .collect()
2990    }
2991
2992    /// Gets position snapshot IDs for the `instrument_id`.
2993    #[must_use]
2994    pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> AHashSet<PositionId> {
2995        // Get snapshot position IDs that match the instrument
2996        let mut result = AHashSet::new();
2997
2998        for (position_id, _) in &self.position_snapshots {
2999            // Check if this position is for the requested instrument
3000            if let Some(position_cell) = self.positions.get(position_id)
3001                && position_cell.borrow().instrument_id == *instrument_id
3002            {
3003                result.insert(*position_id);
3004            }
3005        }
3006        result
3007    }
3008
3009    /// Snapshots the `order` state in the database.
3010    ///
3011    /// # Errors
3012    ///
3013    /// Returns an error if snapshotting the order state fails.
3014    pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
3015        let Some(database) = &self.database else {
3016            log::warn!(
3017                "Cannot snapshot order state for {} (no database configured)",
3018                order.client_order_id()
3019            );
3020            return Ok(());
3021        };
3022
3023        database.snapshot_order_state(order)
3024    }
3025
3026    // -- IDENTIFIER QUERIES ----------------------------------------------------------------------
3027
3028    // Collects references to the index sets that constrain an order query.
3029    //
3030    // Returns:
3031    // - `FilterSources::Unfiltered` when no filter is provided (the caller should iterate
3032    //   the full bucket).
3033    // - `FilterSources::Empty` when a filter is provided but the index has no entry for it
3034    //   (the resolved set is unconditionally empty, no further work needed).
3035    // - `FilterSources::Sets` with borrowed references to each filter source set.
3036    fn collect_order_filter_sources<'a>(
3037        &'a self,
3038        venue: Option<&Venue>,
3039        instrument_id: Option<&InstrumentId>,
3040        strategy_id: Option<&StrategyId>,
3041        account_id: Option<&AccountId>,
3042    ) -> FilterSources<'a, ClientOrderId> {
3043        let mut sources: Vec<&AHashSet<ClientOrderId>> = Vec::with_capacity(4);
3044
3045        if let Some(venue) = venue {
3046            match self.index.venue_orders.get(venue) {
3047                Some(set) => sources.push(set),
3048                None => return FilterSources::Empty,
3049            }
3050        }
3051
3052        if let Some(instrument_id) = instrument_id {
3053            match self.index.instrument_orders.get(instrument_id) {
3054                Some(set) => sources.push(set),
3055                None => return FilterSources::Empty,
3056            }
3057        }
3058
3059        if let Some(strategy_id) = strategy_id {
3060            match self.index.strategy_orders.get(strategy_id) {
3061                Some(set) => sources.push(set),
3062                None => return FilterSources::Empty,
3063            }
3064        }
3065
3066        if let Some(account_id) = account_id {
3067            match self.index.account_orders.get(account_id) {
3068                Some(set) => sources.push(set),
3069                None => return FilterSources::Empty,
3070            }
3071        }
3072
3073        if sources.is_empty() {
3074            FilterSources::Unfiltered
3075        } else {
3076            FilterSources::Sets(sources)
3077        }
3078    }
3079
3080    fn collect_position_filter_sources<'a>(
3081        &'a self,
3082        venue: Option<&Venue>,
3083        instrument_id: Option<&InstrumentId>,
3084        strategy_id: Option<&StrategyId>,
3085        account_id: Option<&AccountId>,
3086    ) -> FilterSources<'a, PositionId> {
3087        let mut sources: Vec<&AHashSet<PositionId>> = Vec::with_capacity(4);
3088
3089        if let Some(venue) = venue {
3090            match self.index.venue_positions.get(venue) {
3091                Some(set) => sources.push(set),
3092                None => return FilterSources::Empty,
3093            }
3094        }
3095
3096        if let Some(instrument_id) = instrument_id {
3097            match self.index.instrument_positions.get(instrument_id) {
3098                Some(set) => sources.push(set),
3099                None => return FilterSources::Empty,
3100            }
3101        }
3102
3103        if let Some(strategy_id) = strategy_id {
3104            match self.index.strategy_positions.get(strategy_id) {
3105                Some(set) => sources.push(set),
3106                None => return FilterSources::Empty,
3107            }
3108        }
3109
3110        if let Some(account_id) = account_id {
3111            match self.index.account_positions.get(account_id) {
3112                Some(set) => sources.push(set),
3113                None => return FilterSources::Empty,
3114            }
3115        }
3116
3117        if sources.is_empty() {
3118            FilterSources::Unfiltered
3119        } else {
3120            FilterSources::Sets(sources)
3121        }
3122    }
3123
3124    // Materializes the `ClientOrderId`s in `bucket` matching the optional filter parameters.
3125    //
3126    // Folds the bucket into the filter sources and runs a single size-ordered intersection,
3127    // avoiding the legacy two-step build-filter-set + bucket-intersection that allocated and
3128    // rehashed twice.
3129    fn query_orders_in_bucket(
3130        &self,
3131        bucket: &AHashSet<ClientOrderId>,
3132        venue: Option<&Venue>,
3133        instrument_id: Option<&InstrumentId>,
3134        strategy_id: Option<&StrategyId>,
3135        account_id: Option<&AccountId>,
3136    ) -> AHashSet<ClientOrderId> {
3137        match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
3138            FilterSources::Empty => AHashSet::new(),
3139            FilterSources::Unfiltered => bucket.clone(),
3140            FilterSources::Sets(sources) => intersect_pair_or_many(bucket, sources),
3141        }
3142    }
3143
3144    fn query_positions_in_bucket(
3145        &self,
3146        bucket: &AHashSet<PositionId>,
3147        venue: Option<&Venue>,
3148        instrument_id: Option<&InstrumentId>,
3149        strategy_id: Option<&StrategyId>,
3150        account_id: Option<&AccountId>,
3151    ) -> AHashSet<PositionId> {
3152        match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
3153            FilterSources::Empty => AHashSet::new(),
3154            FilterSources::Unfiltered => bucket.clone(),
3155            FilterSources::Sets(sources) => intersect_pair_or_many(bucket, sources),
3156        }
3157    }
3158
3159    // Returns a borrowed or owned view of the orders in `bucket` matching the optional filter
3160    // parameters. Avoids cloning the bucket when no filter narrows it.
3161    fn view_orders_in_bucket<'a>(
3162        &'a self,
3163        bucket: &'a AHashSet<ClientOrderId>,
3164        venue: Option<&Venue>,
3165        instrument_id: Option<&InstrumentId>,
3166        strategy_id: Option<&StrategyId>,
3167        account_id: Option<&AccountId>,
3168    ) -> Cow<'a, AHashSet<ClientOrderId>> {
3169        match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
3170            FilterSources::Empty => Cow::Owned(AHashSet::new()),
3171            FilterSources::Unfiltered => Cow::Borrowed(bucket),
3172            FilterSources::Sets(sources) => Cow::Owned(intersect_pair_or_many(bucket, sources)),
3173        }
3174    }
3175
3176    fn view_positions_in_bucket<'a>(
3177        &'a self,
3178        bucket: &'a AHashSet<PositionId>,
3179        venue: Option<&Venue>,
3180        instrument_id: Option<&InstrumentId>,
3181        strategy_id: Option<&StrategyId>,
3182        account_id: Option<&AccountId>,
3183    ) -> Cow<'a, AHashSet<PositionId>> {
3184        match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
3185            FilterSources::Empty => Cow::Owned(AHashSet::new()),
3186            FilterSources::Unfiltered => Cow::Borrowed(bucket),
3187            FilterSources::Sets(sources) => Cow::Owned(intersect_pair_or_many(bucket, sources)),
3188        }
3189    }
3190
3191    // Returns a lazy iterator yielding the [`ClientOrderId`]s in `bucket` matching the optional
3192    // filter parameters. Avoids any [`Vec`] or [`AHashSet`] materialization in the result path,
3193    // and (for multi-filter calls) drives intersection from the smallest source while looking
3194    // up membership in the rest.
3195    fn iter_orders_in_bucket<'a>(
3196        &'a self,
3197        bucket: &'a AHashSet<ClientOrderId>,
3198        venue: Option<&Venue>,
3199        instrument_id: Option<&InstrumentId>,
3200        strategy_id: Option<&StrategyId>,
3201        account_id: Option<&AccountId>,
3202    ) -> Box<dyn Iterator<Item = ClientOrderId> + 'a> {
3203        match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
3204            FilterSources::Empty => Box::new(std::iter::empty()),
3205            FilterSources::Unfiltered => Box::new(bucket.iter().copied()),
3206            FilterSources::Sets(mut sources) => {
3207                sources.push(bucket);
3208                sources.sort_unstable_by_key(|s| s.len());
3209                let driver = sources[0];
3210                let rest: Vec<&'a AHashSet<ClientOrderId>> = sources[1..].to_vec();
3211                Box::new(
3212                    driver
3213                        .iter()
3214                        .copied()
3215                        .filter(move |id| rest.iter().all(|s| s.contains(id))),
3216                )
3217            }
3218        }
3219    }
3220
3221    fn iter_positions_in_bucket<'a>(
3222        &'a self,
3223        bucket: &'a AHashSet<PositionId>,
3224        venue: Option<&Venue>,
3225        instrument_id: Option<&InstrumentId>,
3226        strategy_id: Option<&StrategyId>,
3227        account_id: Option<&AccountId>,
3228    ) -> Box<dyn Iterator<Item = PositionId> + 'a> {
3229        match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
3230            FilterSources::Empty => Box::new(std::iter::empty()),
3231            FilterSources::Unfiltered => Box::new(bucket.iter().copied()),
3232            FilterSources::Sets(mut sources) => {
3233                sources.push(bucket);
3234                sources.sort_unstable_by_key(|s| s.len());
3235                let driver = sources[0];
3236                let rest: Vec<&'a AHashSet<PositionId>> = sources[1..].to_vec();
3237                Box::new(
3238                    driver
3239                        .iter()
3240                        .copied()
3241                        .filter(move |id| rest.iter().all(|s| s.contains(id))),
3242                )
3243            }
3244        }
3245    }
3246
3247    // Counts orders in `bucket` matching the optional filter parameters.
3248    //
3249    // Drives intersection from the smallest filter source (or the bucket itself when no filter
3250    // is provided) and short-circuits by counting rather than collecting. With a side filter,
3251    // each candidate order is borrowed via its cell only long enough to inspect the side.
3252    fn count_orders_in_bucket(
3253        &self,
3254        bucket: &AHashSet<ClientOrderId>,
3255        venue: Option<&Venue>,
3256        instrument_id: Option<&InstrumentId>,
3257        strategy_id: Option<&StrategyId>,
3258        account_id: Option<&AccountId>,
3259        side: Option<OrderSide>,
3260    ) -> usize {
3261        let side = side.unwrap_or(OrderSide::NoOrderSide);
3262
3263        match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
3264            FilterSources::Empty => 0,
3265            FilterSources::Unfiltered => {
3266                if side == OrderSide::NoOrderSide {
3267                    bucket.len()
3268                } else {
3269                    bucket
3270                        .iter()
3271                        .filter(|id| self.order_side_matches(id, side))
3272                        .count()
3273                }
3274            }
3275            FilterSources::Sets(mut sources) => {
3276                sources.push(bucket);
3277                sources.sort_unstable_by_key(|s| s.len());
3278                let driver = sources[0];
3279                let rest = &sources[1..];
3280
3281                driver
3282                    .iter()
3283                    .filter(|id| rest.iter().all(|s| s.contains(id)))
3284                    .filter(|id| {
3285                        side == OrderSide::NoOrderSide || self.order_side_matches(id, side)
3286                    })
3287                    .count()
3288            }
3289        }
3290    }
3291
3292    fn count_positions_in_bucket(
3293        &self,
3294        bucket: &AHashSet<PositionId>,
3295        venue: Option<&Venue>,
3296        instrument_id: Option<&InstrumentId>,
3297        strategy_id: Option<&StrategyId>,
3298        account_id: Option<&AccountId>,
3299        side: Option<PositionSide>,
3300    ) -> usize {
3301        let side = side.unwrap_or(PositionSide::NoPositionSide);
3302
3303        match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
3304            FilterSources::Empty => 0,
3305            FilterSources::Unfiltered => {
3306                if side == PositionSide::NoPositionSide {
3307                    bucket.len()
3308                } else {
3309                    bucket
3310                        .iter()
3311                        .filter(|id| self.position_side_matches(id, side))
3312                        .count()
3313                }
3314            }
3315            FilterSources::Sets(mut sources) => {
3316                sources.push(bucket);
3317                sources.sort_unstable_by_key(|s| s.len());
3318                let driver = sources[0];
3319                let rest = &sources[1..];
3320
3321                driver
3322                    .iter()
3323                    .filter(|id| rest.iter().all(|s| s.contains(id)))
3324                    .filter(|id| {
3325                        side == PositionSide::NoPositionSide || self.position_side_matches(id, side)
3326                    })
3327                    .count()
3328            }
3329        }
3330    }
3331
3332    // Returns whether any order in `bucket` matches the optional filter parameters.
3333    //
3334    // Mirrors `count_orders_in_bucket` but short-circuits on the first match. Useful for
3335    // `is_empty`-style gating in hot paths where the caller only needs to know whether at
3336    // least one matching order exists.
3337    fn any_orders_in_bucket(
3338        &self,
3339        bucket: &AHashSet<ClientOrderId>,
3340        venue: Option<&Venue>,
3341        instrument_id: Option<&InstrumentId>,
3342        strategy_id: Option<&StrategyId>,
3343        account_id: Option<&AccountId>,
3344        side: Option<OrderSide>,
3345    ) -> bool {
3346        let side = side.unwrap_or(OrderSide::NoOrderSide);
3347
3348        match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
3349            FilterSources::Empty => false,
3350            FilterSources::Unfiltered => {
3351                if side == OrderSide::NoOrderSide {
3352                    !bucket.is_empty()
3353                } else {
3354                    bucket.iter().any(|id| self.order_side_matches(id, side))
3355                }
3356            }
3357            FilterSources::Sets(mut sources) => {
3358                sources.push(bucket);
3359                sources.sort_unstable_by_key(|s| s.len());
3360                let driver = sources[0];
3361                let rest = &sources[1..];
3362
3363                driver
3364                    .iter()
3365                    .filter(|id| rest.iter().all(|s| s.contains(id)))
3366                    .any(|id| side == OrderSide::NoOrderSide || self.order_side_matches(id, side))
3367            }
3368        }
3369    }
3370
3371    fn any_positions_in_bucket(
3372        &self,
3373        bucket: &AHashSet<PositionId>,
3374        venue: Option<&Venue>,
3375        instrument_id: Option<&InstrumentId>,
3376        strategy_id: Option<&StrategyId>,
3377        account_id: Option<&AccountId>,
3378        side: Option<PositionSide>,
3379    ) -> bool {
3380        let side = side.unwrap_or(PositionSide::NoPositionSide);
3381
3382        match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
3383            FilterSources::Empty => false,
3384            FilterSources::Unfiltered => {
3385                if side == PositionSide::NoPositionSide {
3386                    !bucket.is_empty()
3387                } else {
3388                    bucket.iter().any(|id| self.position_side_matches(id, side))
3389                }
3390            }
3391            FilterSources::Sets(mut sources) => {
3392                sources.push(bucket);
3393                sources.sort_unstable_by_key(|s| s.len());
3394                let driver = sources[0];
3395                let rest = &sources[1..];
3396
3397                driver
3398                    .iter()
3399                    .filter(|id| rest.iter().all(|s| s.contains(id)))
3400                    .any(|id| {
3401                        side == PositionSide::NoPositionSide || self.position_side_matches(id, side)
3402                    })
3403            }
3404        }
3405    }
3406
3407    fn order_side_matches(&self, client_order_id: &ClientOrderId, side: OrderSide) -> bool {
3408        self.orders
3409            .get(client_order_id)
3410            .is_some_and(|cell| cell.borrow().order_side() == side)
3411    }
3412
3413    fn position_side_matches(&self, position_id: &PositionId, side: PositionSide) -> bool {
3414        self.positions
3415            .get(position_id)
3416            .is_some_and(|cell| cell.borrow().side == side)
3417    }
3418
3419    /// Retrieves orders corresponding to the `client_order_ids`, optionally filtering by `side`.
3420    ///
3421    /// # Panics
3422    ///
3423    /// Panics if any `client_order_id` in the set is not found in the cache.
3424    fn get_orders_for_ids(
3425        &self,
3426        client_order_ids: &AHashSet<ClientOrderId>,
3427        side: Option<OrderSide>,
3428    ) -> Vec<OrderRef<'_>> {
3429        let side = side.unwrap_or(OrderSide::NoOrderSide);
3430        let mut orders = Vec::new();
3431
3432        for client_order_id in client_order_ids {
3433            let order_cell = self
3434                .orders
3435                .get(client_order_id)
3436                .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
3437            let order = OrderRef::new(order_cell.borrow());
3438
3439            if side == OrderSide::NoOrderSide || side == order.order_side() {
3440                orders.push(order);
3441            }
3442        }
3443
3444        // Sort so callers receive a deterministic Vec across runs; the
3445        // underlying client_order_ids set is AHash-backed.
3446        orders.sort_by_key(|o| o.client_order_id());
3447        orders
3448    }
3449
3450    /// Retrieves positions corresponding to the `position_ids`, optionally filtering by `side`.
3451    ///
3452    /// Each [`PositionRef`] in the returned vector borrows its underlying cell; mutating any of
3453    /// those positions while the vector is alive will panic at runtime. Drop the vector before
3454    /// issuing writes.
3455    ///
3456    /// # Panics
3457    ///
3458    /// Panics if any `position_id` in the set is not found in the cache.
3459    fn get_positions_for_ids(
3460        &self,
3461        position_ids: &AHashSet<PositionId>,
3462        side: Option<PositionSide>,
3463    ) -> Vec<PositionRef<'_>> {
3464        let side = side.unwrap_or(PositionSide::NoPositionSide);
3465        let mut positions = Vec::new();
3466
3467        for position_id in position_ids {
3468            let position_cell = self
3469                .positions
3470                .get(position_id)
3471                .unwrap_or_else(|| panic!("Position {position_id} not found"));
3472            let position = PositionRef::new(position_cell.borrow());
3473
3474            if side == PositionSide::NoPositionSide || side == position.side {
3475                positions.push(position);
3476            }
3477        }
3478
3479        // Sort so callers receive a deterministic Vec across runs; the
3480        // underlying position_ids set is AHash-backed.
3481        positions.sort_by_key(|p| p.id);
3482        positions
3483    }
3484
3485    /// Returns the `ClientOrderId`s of all orders.
3486    #[must_use]
3487    pub fn client_order_ids(
3488        &self,
3489        venue: Option<&Venue>,
3490        instrument_id: Option<&InstrumentId>,
3491        strategy_id: Option<&StrategyId>,
3492        account_id: Option<&AccountId>,
3493    ) -> AHashSet<ClientOrderId> {
3494        self.query_orders_in_bucket(
3495            &self.index.orders,
3496            venue,
3497            instrument_id,
3498            strategy_id,
3499            account_id,
3500        )
3501    }
3502
3503    /// Returns the `ClientOrderId`s of all open orders.
3504    #[must_use]
3505    pub fn client_order_ids_open(
3506        &self,
3507        venue: Option<&Venue>,
3508        instrument_id: Option<&InstrumentId>,
3509        strategy_id: Option<&StrategyId>,
3510        account_id: Option<&AccountId>,
3511    ) -> AHashSet<ClientOrderId> {
3512        self.query_orders_in_bucket(
3513            &self.index.orders_open,
3514            venue,
3515            instrument_id,
3516            strategy_id,
3517            account_id,
3518        )
3519    }
3520
3521    /// Returns the `ClientOrderId`s of all closed orders.
3522    #[must_use]
3523    pub fn client_order_ids_closed(
3524        &self,
3525        venue: Option<&Venue>,
3526        instrument_id: Option<&InstrumentId>,
3527        strategy_id: Option<&StrategyId>,
3528        account_id: Option<&AccountId>,
3529    ) -> AHashSet<ClientOrderId> {
3530        self.query_orders_in_bucket(
3531            &self.index.orders_closed,
3532            venue,
3533            instrument_id,
3534            strategy_id,
3535            account_id,
3536        )
3537    }
3538
3539    /// Returns the `ClientOrderId`s of all locally active orders.
3540    ///
3541    /// Locally active orders are in the `INITIALIZED`, `EMULATED`, or `RELEASED` state
3542    /// (a superset of emulated orders).
3543    #[must_use]
3544    pub fn client_order_ids_active_local(
3545        &self,
3546        venue: Option<&Venue>,
3547        instrument_id: Option<&InstrumentId>,
3548        strategy_id: Option<&StrategyId>,
3549        account_id: Option<&AccountId>,
3550    ) -> AHashSet<ClientOrderId> {
3551        self.query_orders_in_bucket(
3552            &self.index.orders_active_local,
3553            venue,
3554            instrument_id,
3555            strategy_id,
3556            account_id,
3557        )
3558    }
3559
3560    /// Returns the `ClientOrderId`s of all emulated orders.
3561    #[must_use]
3562    pub fn client_order_ids_emulated(
3563        &self,
3564        venue: Option<&Venue>,
3565        instrument_id: Option<&InstrumentId>,
3566        strategy_id: Option<&StrategyId>,
3567        account_id: Option<&AccountId>,
3568    ) -> AHashSet<ClientOrderId> {
3569        self.query_orders_in_bucket(
3570            &self.index.orders_emulated,
3571            venue,
3572            instrument_id,
3573            strategy_id,
3574            account_id,
3575        )
3576    }
3577
3578    /// Returns the `ClientOrderId`s of all in-flight orders.
3579    #[must_use]
3580    pub fn client_order_ids_inflight(
3581        &self,
3582        venue: Option<&Venue>,
3583        instrument_id: Option<&InstrumentId>,
3584        strategy_id: Option<&StrategyId>,
3585        account_id: Option<&AccountId>,
3586    ) -> AHashSet<ClientOrderId> {
3587        self.query_orders_in_bucket(
3588            &self.index.orders_inflight,
3589            venue,
3590            instrument_id,
3591            strategy_id,
3592            account_id,
3593        )
3594    }
3595
3596    /// Returns `PositionId`s of all positions.
3597    #[must_use]
3598    pub fn position_ids(
3599        &self,
3600        venue: Option<&Venue>,
3601        instrument_id: Option<&InstrumentId>,
3602        strategy_id: Option<&StrategyId>,
3603        account_id: Option<&AccountId>,
3604    ) -> AHashSet<PositionId> {
3605        self.query_positions_in_bucket(
3606            &self.index.positions,
3607            venue,
3608            instrument_id,
3609            strategy_id,
3610            account_id,
3611        )
3612    }
3613
3614    /// Returns the `PositionId`s of all open positions.
3615    #[must_use]
3616    pub fn position_open_ids(
3617        &self,
3618        venue: Option<&Venue>,
3619        instrument_id: Option<&InstrumentId>,
3620        strategy_id: Option<&StrategyId>,
3621        account_id: Option<&AccountId>,
3622    ) -> AHashSet<PositionId> {
3623        self.query_positions_in_bucket(
3624            &self.index.positions_open,
3625            venue,
3626            instrument_id,
3627            strategy_id,
3628            account_id,
3629        )
3630    }
3631
3632    /// Returns the `PositionId`s of all closed positions.
3633    #[must_use]
3634    pub fn position_closed_ids(
3635        &self,
3636        venue: Option<&Venue>,
3637        instrument_id: Option<&InstrumentId>,
3638        strategy_id: Option<&StrategyId>,
3639        account_id: Option<&AccountId>,
3640    ) -> AHashSet<PositionId> {
3641        self.query_positions_in_bucket(
3642            &self.index.positions_closed,
3643            venue,
3644            instrument_id,
3645            strategy_id,
3646            account_id,
3647        )
3648    }
3649
3650    /// Returns a borrowed view over the [`ClientOrderId`]s of all orders matching the optional
3651    /// filter parameters.
3652    ///
3653    /// The returned [`Cow`] borrows the underlying index when no filter is provided and only
3654    /// allocates an owned [`AHashSet`] when an intersection is required. Prefer this over
3655    /// [`Self::client_order_ids`] when the caller only needs to iterate or read membership.
3656    #[must_use]
3657    pub fn client_order_ids_view(
3658        &self,
3659        venue: Option<&Venue>,
3660        instrument_id: Option<&InstrumentId>,
3661        strategy_id: Option<&StrategyId>,
3662        account_id: Option<&AccountId>,
3663    ) -> Cow<'_, AHashSet<ClientOrderId>> {
3664        self.view_orders_in_bucket(
3665            &self.index.orders,
3666            venue,
3667            instrument_id,
3668            strategy_id,
3669            account_id,
3670        )
3671    }
3672
3673    /// Returns a borrowed view over the [`ClientOrderId`]s of all open orders.
3674    #[must_use]
3675    pub fn client_order_ids_open_view(
3676        &self,
3677        venue: Option<&Venue>,
3678        instrument_id: Option<&InstrumentId>,
3679        strategy_id: Option<&StrategyId>,
3680        account_id: Option<&AccountId>,
3681    ) -> Cow<'_, AHashSet<ClientOrderId>> {
3682        self.view_orders_in_bucket(
3683            &self.index.orders_open,
3684            venue,
3685            instrument_id,
3686            strategy_id,
3687            account_id,
3688        )
3689    }
3690
3691    /// Returns a borrowed view over the [`ClientOrderId`]s of all closed orders.
3692    #[must_use]
3693    pub fn client_order_ids_closed_view(
3694        &self,
3695        venue: Option<&Venue>,
3696        instrument_id: Option<&InstrumentId>,
3697        strategy_id: Option<&StrategyId>,
3698        account_id: Option<&AccountId>,
3699    ) -> Cow<'_, AHashSet<ClientOrderId>> {
3700        self.view_orders_in_bucket(
3701            &self.index.orders_closed,
3702            venue,
3703            instrument_id,
3704            strategy_id,
3705            account_id,
3706        )
3707    }
3708
3709    /// Returns a borrowed view over the [`ClientOrderId`]s of all locally active orders.
3710    #[must_use]
3711    pub fn client_order_ids_active_local_view(
3712        &self,
3713        venue: Option<&Venue>,
3714        instrument_id: Option<&InstrumentId>,
3715        strategy_id: Option<&StrategyId>,
3716        account_id: Option<&AccountId>,
3717    ) -> Cow<'_, AHashSet<ClientOrderId>> {
3718        self.view_orders_in_bucket(
3719            &self.index.orders_active_local,
3720            venue,
3721            instrument_id,
3722            strategy_id,
3723            account_id,
3724        )
3725    }
3726
3727    /// Returns a borrowed view over the [`ClientOrderId`]s of all emulated orders.
3728    #[must_use]
3729    pub fn client_order_ids_emulated_view(
3730        &self,
3731        venue: Option<&Venue>,
3732        instrument_id: Option<&InstrumentId>,
3733        strategy_id: Option<&StrategyId>,
3734        account_id: Option<&AccountId>,
3735    ) -> Cow<'_, AHashSet<ClientOrderId>> {
3736        self.view_orders_in_bucket(
3737            &self.index.orders_emulated,
3738            venue,
3739            instrument_id,
3740            strategy_id,
3741            account_id,
3742        )
3743    }
3744
3745    /// Returns a borrowed view over the [`ClientOrderId`]s of all in-flight orders.
3746    #[must_use]
3747    pub fn client_order_ids_inflight_view(
3748        &self,
3749        venue: Option<&Venue>,
3750        instrument_id: Option<&InstrumentId>,
3751        strategy_id: Option<&StrategyId>,
3752        account_id: Option<&AccountId>,
3753    ) -> Cow<'_, AHashSet<ClientOrderId>> {
3754        self.view_orders_in_bucket(
3755            &self.index.orders_inflight,
3756            venue,
3757            instrument_id,
3758            strategy_id,
3759            account_id,
3760        )
3761    }
3762
3763    /// Returns a borrowed view over the [`PositionId`]s of all positions.
3764    #[must_use]
3765    pub fn position_ids_view(
3766        &self,
3767        venue: Option<&Venue>,
3768        instrument_id: Option<&InstrumentId>,
3769        strategy_id: Option<&StrategyId>,
3770        account_id: Option<&AccountId>,
3771    ) -> Cow<'_, AHashSet<PositionId>> {
3772        self.view_positions_in_bucket(
3773            &self.index.positions,
3774            venue,
3775            instrument_id,
3776            strategy_id,
3777            account_id,
3778        )
3779    }
3780
3781    /// Returns a borrowed view over the [`PositionId`]s of all open positions.
3782    #[must_use]
3783    pub fn position_open_ids_view(
3784        &self,
3785        venue: Option<&Venue>,
3786        instrument_id: Option<&InstrumentId>,
3787        strategy_id: Option<&StrategyId>,
3788        account_id: Option<&AccountId>,
3789    ) -> Cow<'_, AHashSet<PositionId>> {
3790        self.view_positions_in_bucket(
3791            &self.index.positions_open,
3792            venue,
3793            instrument_id,
3794            strategy_id,
3795            account_id,
3796        )
3797    }
3798
3799    /// Returns a borrowed view over the [`PositionId`]s of all closed positions.
3800    #[must_use]
3801    pub fn position_closed_ids_view(
3802        &self,
3803        venue: Option<&Venue>,
3804        instrument_id: Option<&InstrumentId>,
3805        strategy_id: Option<&StrategyId>,
3806        account_id: Option<&AccountId>,
3807    ) -> Cow<'_, AHashSet<PositionId>> {
3808        self.view_positions_in_bucket(
3809            &self.index.positions_closed,
3810            venue,
3811            instrument_id,
3812            strategy_id,
3813            account_id,
3814        )
3815    }
3816
3817    /// Returns a lazy iterator yielding [`ClientOrderId`]s of all orders matching the optional
3818    /// filter parameters.
3819    ///
3820    /// Avoids the [`AHashSet`] allocation performed by [`Self::client_order_ids`]. Useful when
3821    /// the caller iterates the result once and discards it.
3822    pub fn iter_client_order_ids(
3823        &self,
3824        venue: Option<&Venue>,
3825        instrument_id: Option<&InstrumentId>,
3826        strategy_id: Option<&StrategyId>,
3827        account_id: Option<&AccountId>,
3828    ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3829        self.iter_orders_in_bucket(
3830            &self.index.orders,
3831            venue,
3832            instrument_id,
3833            strategy_id,
3834            account_id,
3835        )
3836    }
3837
3838    /// Returns a lazy iterator yielding [`ClientOrderId`]s of all open orders.
3839    pub fn iter_client_order_ids_open(
3840        &self,
3841        venue: Option<&Venue>,
3842        instrument_id: Option<&InstrumentId>,
3843        strategy_id: Option<&StrategyId>,
3844        account_id: Option<&AccountId>,
3845    ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3846        self.iter_orders_in_bucket(
3847            &self.index.orders_open,
3848            venue,
3849            instrument_id,
3850            strategy_id,
3851            account_id,
3852        )
3853    }
3854
3855    /// Returns a lazy iterator yielding [`ClientOrderId`]s of all closed orders.
3856    pub fn iter_client_order_ids_closed(
3857        &self,
3858        venue: Option<&Venue>,
3859        instrument_id: Option<&InstrumentId>,
3860        strategy_id: Option<&StrategyId>,
3861        account_id: Option<&AccountId>,
3862    ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3863        self.iter_orders_in_bucket(
3864            &self.index.orders_closed,
3865            venue,
3866            instrument_id,
3867            strategy_id,
3868            account_id,
3869        )
3870    }
3871
3872    /// Returns a lazy iterator yielding [`ClientOrderId`]s of all locally active orders.
3873    pub fn iter_client_order_ids_active_local(
3874        &self,
3875        venue: Option<&Venue>,
3876        instrument_id: Option<&InstrumentId>,
3877        strategy_id: Option<&StrategyId>,
3878        account_id: Option<&AccountId>,
3879    ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3880        self.iter_orders_in_bucket(
3881            &self.index.orders_active_local,
3882            venue,
3883            instrument_id,
3884            strategy_id,
3885            account_id,
3886        )
3887    }
3888
3889    /// Returns a lazy iterator yielding [`ClientOrderId`]s of all emulated orders.
3890    pub fn iter_client_order_ids_emulated(
3891        &self,
3892        venue: Option<&Venue>,
3893        instrument_id: Option<&InstrumentId>,
3894        strategy_id: Option<&StrategyId>,
3895        account_id: Option<&AccountId>,
3896    ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3897        self.iter_orders_in_bucket(
3898            &self.index.orders_emulated,
3899            venue,
3900            instrument_id,
3901            strategy_id,
3902            account_id,
3903        )
3904    }
3905
3906    /// Returns a lazy iterator yielding [`ClientOrderId`]s of all in-flight orders.
3907    pub fn iter_client_order_ids_inflight(
3908        &self,
3909        venue: Option<&Venue>,
3910        instrument_id: Option<&InstrumentId>,
3911        strategy_id: Option<&StrategyId>,
3912        account_id: Option<&AccountId>,
3913    ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
3914        self.iter_orders_in_bucket(
3915            &self.index.orders_inflight,
3916            venue,
3917            instrument_id,
3918            strategy_id,
3919            account_id,
3920        )
3921    }
3922
3923    /// Returns a lazy iterator yielding [`PositionId`]s of all positions matching the filters.
3924    pub fn iter_position_ids(
3925        &self,
3926        venue: Option<&Venue>,
3927        instrument_id: Option<&InstrumentId>,
3928        strategy_id: Option<&StrategyId>,
3929        account_id: Option<&AccountId>,
3930    ) -> Box<dyn Iterator<Item = PositionId> + '_> {
3931        self.iter_positions_in_bucket(
3932            &self.index.positions,
3933            venue,
3934            instrument_id,
3935            strategy_id,
3936            account_id,
3937        )
3938    }
3939
3940    /// Returns a lazy iterator yielding [`PositionId`]s of all open positions.
3941    pub fn iter_position_open_ids(
3942        &self,
3943        venue: Option<&Venue>,
3944        instrument_id: Option<&InstrumentId>,
3945        strategy_id: Option<&StrategyId>,
3946        account_id: Option<&AccountId>,
3947    ) -> Box<dyn Iterator<Item = PositionId> + '_> {
3948        self.iter_positions_in_bucket(
3949            &self.index.positions_open,
3950            venue,
3951            instrument_id,
3952            strategy_id,
3953            account_id,
3954        )
3955    }
3956
3957    /// Returns a lazy iterator yielding [`PositionId`]s of all closed positions.
3958    pub fn iter_position_closed_ids(
3959        &self,
3960        venue: Option<&Venue>,
3961        instrument_id: Option<&InstrumentId>,
3962        strategy_id: Option<&StrategyId>,
3963        account_id: Option<&AccountId>,
3964    ) -> Box<dyn Iterator<Item = PositionId> + '_> {
3965        self.iter_positions_in_bucket(
3966            &self.index.positions_closed,
3967            venue,
3968            instrument_id,
3969            strategy_id,
3970            account_id,
3971        )
3972    }
3973
3974    /// Returns the `ComponentId`s of all actors.
3975    #[must_use]
3976    pub fn actor_ids(&self) -> AHashSet<ComponentId> {
3977        self.index.actors.clone()
3978    }
3979
3980    /// Returns the `StrategyId`s of all strategies.
3981    #[must_use]
3982    pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
3983        self.index.strategies.clone()
3984    }
3985
3986    /// Returns the `ExecAlgorithmId`s of all execution algorithms.
3987    #[must_use]
3988    pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
3989        self.index.exec_algorithms.clone()
3990    }
3991
3992    // -- ORDER QUERIES ---------------------------------------------------------------------------
3993
3994    /// Gets a borrow of the order with the `client_order_id` (if found).
3995    ///
3996    /// The returned [`OrderRef`] is tied to the cache borrow's scope and panics at runtime if
3997    /// held across a mutation of the same order. Drop the borrow before dispatching events; if
3998    /// post-event state is required, perform a fresh lookup. Use [`Self::order_owned`] when an
3999    /// owned snapshot is needed for a boundary handover.
4000    #[must_use]
4001    pub fn order(&self, client_order_id: &ClientOrderId) -> Option<OrderRef<'_>> {
4002        self.orders
4003            .get(client_order_id)
4004            .map(|order_cell| OrderRef::new(order_cell.borrow()))
4005    }
4006
4007    /// Gets an exclusive write borrow of the order with the `client_order_id` (if found).
4008    ///
4009    /// Requires `&mut Cache` so cache writes are reachable only by privileged crates that hold
4010    /// `Rc<RefCell<Cache>>` directly. Adapter-facing code receives [`CacheView`], which only
4011    /// exposes immutable cache borrows and therefore cannot reach this method.
4012    ///
4013    /// While the returned [`OrderRefMut`] is alive, no other read or write of the same order is
4014    /// permitted. Drop the borrow before dispatching events or taking any other cache borrow that
4015    /// may re-enter the same order.
4016    #[must_use]
4017    pub fn order_mut(&mut self, client_order_id: &ClientOrderId) -> Option<OrderRefMut<'_>> {
4018        self.orders
4019            .get(client_order_id)
4020            .map(|order_cell| OrderRefMut::new(order_cell.borrow_mut()))
4021    }
4022
4023    /// Gets an owned snapshot of the order with the `client_order_id` (if found).
4024    ///
4025    /// Use when downstream needs an owned [`OrderAny`] that crosses a boundary (for example, an
4026    /// adapter `get_order` API). The snapshot will not reflect later cache mutations.
4027    #[must_use]
4028    pub fn order_owned(&self, client_order_id: &ClientOrderId) -> Option<OrderAny> {
4029        self.orders
4030            .get(client_order_id)
4031            .map(|order_cell| order_cell.borrow().clone())
4032    }
4033
4034    /// Gets cloned orders for the given `client_order_ids`, logging an error for any missing.
4035    #[must_use]
4036    pub fn orders_for_ids(
4037        &self,
4038        client_order_ids: &[ClientOrderId],
4039        context: &dyn Display,
4040    ) -> Vec<OrderAny> {
4041        let mut orders = Vec::with_capacity(client_order_ids.len());
4042        for id in client_order_ids {
4043            match self.orders.get(id) {
4044                Some(order_cell) => orders.push(order_cell.borrow().clone()),
4045                None => log::error!("Order {id} not found in cache for {context}"),
4046            }
4047        }
4048        orders
4049    }
4050
4051    /// Gets a reference to the client order ID for the `venue_order_id` (if found).
4052    #[must_use]
4053    pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
4054        self.index.venue_order_ids.get(venue_order_id)
4055    }
4056
4057    /// Gets a reference to the venue order ID for the `client_order_id` (if found).
4058    #[must_use]
4059    pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
4060        self.index.client_order_ids.get(client_order_id)
4061    }
4062
4063    /// Gets a reference to the client ID indexed for then `client_order_id` (if found).
4064    #[must_use]
4065    pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
4066        self.index.order_client.get(client_order_id)
4067    }
4068
4069    /// Returns borrows of all orders matching the optional filter parameters.
4070    ///
4071    /// Each [`Ref`] in the returned vector borrows its underlying cell; mutating any of
4072    /// those orders while the vector is alive will panic at runtime. Drop the vector
4073    /// before issuing writes.
4074    #[must_use]
4075    pub fn orders(
4076        &self,
4077        venue: Option<&Venue>,
4078        instrument_id: Option<&InstrumentId>,
4079        strategy_id: Option<&StrategyId>,
4080        account_id: Option<&AccountId>,
4081        side: Option<OrderSide>,
4082    ) -> Vec<OrderRef<'_>> {
4083        let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id, account_id);
4084        self.get_orders_for_ids(&client_order_ids, side)
4085    }
4086
4087    /// Returns borrows of all open orders matching the optional filter parameters.
4088    #[must_use]
4089    pub fn orders_open(
4090        &self,
4091        venue: Option<&Venue>,
4092        instrument_id: Option<&InstrumentId>,
4093        strategy_id: Option<&StrategyId>,
4094        account_id: Option<&AccountId>,
4095        side: Option<OrderSide>,
4096    ) -> Vec<OrderRef<'_>> {
4097        let client_order_ids =
4098            self.client_order_ids_open(venue, instrument_id, strategy_id, account_id);
4099        self.get_orders_for_ids(&client_order_ids, side)
4100    }
4101
4102    /// Returns borrows of all closed orders matching the optional filter parameters.
4103    #[must_use]
4104    pub fn orders_closed(
4105        &self,
4106        venue: Option<&Venue>,
4107        instrument_id: Option<&InstrumentId>,
4108        strategy_id: Option<&StrategyId>,
4109        account_id: Option<&AccountId>,
4110        side: Option<OrderSide>,
4111    ) -> Vec<OrderRef<'_>> {
4112        let client_order_ids =
4113            self.client_order_ids_closed(venue, instrument_id, strategy_id, account_id);
4114        self.get_orders_for_ids(&client_order_ids, side)
4115    }
4116
4117    /// Returns borrows of all locally active orders matching the optional filter parameters.
4118    ///
4119    /// Locally active orders are in the `INITIALIZED`, `EMULATED`, or `RELEASED` state
4120    /// (a superset of emulated orders).
4121    #[must_use]
4122    pub fn orders_active_local(
4123        &self,
4124        venue: Option<&Venue>,
4125        instrument_id: Option<&InstrumentId>,
4126        strategy_id: Option<&StrategyId>,
4127        account_id: Option<&AccountId>,
4128        side: Option<OrderSide>,
4129    ) -> Vec<OrderRef<'_>> {
4130        let client_order_ids =
4131            self.client_order_ids_active_local(venue, instrument_id, strategy_id, account_id);
4132        self.get_orders_for_ids(&client_order_ids, side)
4133    }
4134
4135    /// Returns borrows of all emulated orders matching the optional filter parameters.
4136    #[must_use]
4137    pub fn orders_emulated(
4138        &self,
4139        venue: Option<&Venue>,
4140        instrument_id: Option<&InstrumentId>,
4141        strategy_id: Option<&StrategyId>,
4142        account_id: Option<&AccountId>,
4143        side: Option<OrderSide>,
4144    ) -> Vec<OrderRef<'_>> {
4145        let client_order_ids =
4146            self.client_order_ids_emulated(venue, instrument_id, strategy_id, account_id);
4147        self.get_orders_for_ids(&client_order_ids, side)
4148    }
4149
4150    /// Returns borrows of all in-flight orders matching the optional filter parameters.
4151    #[must_use]
4152    pub fn orders_inflight(
4153        &self,
4154        venue: Option<&Venue>,
4155        instrument_id: Option<&InstrumentId>,
4156        strategy_id: Option<&StrategyId>,
4157        account_id: Option<&AccountId>,
4158        side: Option<OrderSide>,
4159    ) -> Vec<OrderRef<'_>> {
4160        let client_order_ids =
4161            self.client_order_ids_inflight(venue, instrument_id, strategy_id, account_id);
4162        self.get_orders_for_ids(&client_order_ids, side)
4163    }
4164
4165    /// Returns borrows of all orders for the `position_id`.
4166    #[must_use]
4167    pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<OrderRef<'_>> {
4168        match self.index.position_orders.get(position_id) {
4169            Some(client_order_ids) => self.get_orders_for_ids(client_order_ids, None),
4170            None => Vec::new(),
4171        }
4172    }
4173
4174    /// Returns whether an order with the `client_order_id` exists.
4175    #[must_use]
4176    pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
4177        self.index.orders.contains(client_order_id)
4178    }
4179
4180    /// Returns whether an order with the `client_order_id` is open.
4181    #[must_use]
4182    pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
4183        self.index.orders_open.contains(client_order_id)
4184    }
4185
4186    /// Returns whether an order with the `client_order_id` is closed.
4187    #[must_use]
4188    pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
4189        self.index.orders_closed.contains(client_order_id)
4190    }
4191
4192    /// Returns whether an order with the `client_order_id` is locally active.
4193    ///
4194    /// Locally active orders are in the `INITIALIZED`, `EMULATED`, or `RELEASED` state
4195    /// (a superset of emulated orders).
4196    #[must_use]
4197    pub fn is_order_active_local(&self, client_order_id: &ClientOrderId) -> bool {
4198        self.index.orders_active_local.contains(client_order_id)
4199    }
4200
4201    /// Returns whether an order with the `client_order_id` is emulated.
4202    #[must_use]
4203    pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
4204        self.index.orders_emulated.contains(client_order_id)
4205    }
4206
4207    /// Returns whether an order with the `client_order_id` is in-flight.
4208    #[must_use]
4209    pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
4210        self.index.orders_inflight.contains(client_order_id)
4211    }
4212
4213    /// Returns whether an order with the `client_order_id` is `PENDING_CANCEL` locally.
4214    #[must_use]
4215    pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
4216        self.index.orders_pending_cancel.contains(client_order_id)
4217    }
4218
4219    /// Returns the count of all open orders.
4220    #[must_use]
4221    pub fn orders_open_count(
4222        &self,
4223        venue: Option<&Venue>,
4224        instrument_id: Option<&InstrumentId>,
4225        strategy_id: Option<&StrategyId>,
4226        account_id: Option<&AccountId>,
4227        side: Option<OrderSide>,
4228    ) -> usize {
4229        self.count_orders_in_bucket(
4230            &self.index.orders_open,
4231            venue,
4232            instrument_id,
4233            strategy_id,
4234            account_id,
4235            side,
4236        )
4237    }
4238
4239    /// Returns the count of all closed orders.
4240    #[must_use]
4241    pub fn orders_closed_count(
4242        &self,
4243        venue: Option<&Venue>,
4244        instrument_id: Option<&InstrumentId>,
4245        strategy_id: Option<&StrategyId>,
4246        account_id: Option<&AccountId>,
4247        side: Option<OrderSide>,
4248    ) -> usize {
4249        self.count_orders_in_bucket(
4250            &self.index.orders_closed,
4251            venue,
4252            instrument_id,
4253            strategy_id,
4254            account_id,
4255            side,
4256        )
4257    }
4258
4259    /// Returns the count of all locally active orders.
4260    ///
4261    /// Locally active orders are in the `INITIALIZED`, `EMULATED`, or `RELEASED` state
4262    /// (a superset of emulated orders).
4263    #[must_use]
4264    pub fn orders_active_local_count(
4265        &self,
4266        venue: Option<&Venue>,
4267        instrument_id: Option<&InstrumentId>,
4268        strategy_id: Option<&StrategyId>,
4269        account_id: Option<&AccountId>,
4270        side: Option<OrderSide>,
4271    ) -> usize {
4272        self.count_orders_in_bucket(
4273            &self.index.orders_active_local,
4274            venue,
4275            instrument_id,
4276            strategy_id,
4277            account_id,
4278            side,
4279        )
4280    }
4281
4282    /// Returns the count of all emulated orders.
4283    #[must_use]
4284    pub fn orders_emulated_count(
4285        &self,
4286        venue: Option<&Venue>,
4287        instrument_id: Option<&InstrumentId>,
4288        strategy_id: Option<&StrategyId>,
4289        account_id: Option<&AccountId>,
4290        side: Option<OrderSide>,
4291    ) -> usize {
4292        self.count_orders_in_bucket(
4293            &self.index.orders_emulated,
4294            venue,
4295            instrument_id,
4296            strategy_id,
4297            account_id,
4298            side,
4299        )
4300    }
4301
4302    /// Returns the count of all in-flight orders.
4303    #[must_use]
4304    pub fn orders_inflight_count(
4305        &self,
4306        venue: Option<&Venue>,
4307        instrument_id: Option<&InstrumentId>,
4308        strategy_id: Option<&StrategyId>,
4309        account_id: Option<&AccountId>,
4310        side: Option<OrderSide>,
4311    ) -> usize {
4312        self.count_orders_in_bucket(
4313            &self.index.orders_inflight,
4314            venue,
4315            instrument_id,
4316            strategy_id,
4317            account_id,
4318            side,
4319        )
4320    }
4321
4322    /// Returns the count of all orders.
4323    #[must_use]
4324    pub fn orders_total_count(
4325        &self,
4326        venue: Option<&Venue>,
4327        instrument_id: Option<&InstrumentId>,
4328        strategy_id: Option<&StrategyId>,
4329        account_id: Option<&AccountId>,
4330        side: Option<OrderSide>,
4331    ) -> usize {
4332        self.count_orders_in_bucket(
4333            &self.index.orders,
4334            venue,
4335            instrument_id,
4336            strategy_id,
4337            account_id,
4338            side,
4339        )
4340    }
4341
4342    /// Returns whether any open order matches the optional filter parameters.
4343    ///
4344    /// Short-circuits on the first match, avoiding the full intersection walk performed by
4345    /// [`Self::orders_open_count`]. Prefer this over `orders_open_count(...) > 0` when only
4346    /// existence matters.
4347    #[must_use]
4348    pub fn has_orders_open(
4349        &self,
4350        venue: Option<&Venue>,
4351        instrument_id: Option<&InstrumentId>,
4352        strategy_id: Option<&StrategyId>,
4353        account_id: Option<&AccountId>,
4354        side: Option<OrderSide>,
4355    ) -> bool {
4356        self.any_orders_in_bucket(
4357            &self.index.orders_open,
4358            venue,
4359            instrument_id,
4360            strategy_id,
4361            account_id,
4362            side,
4363        )
4364    }
4365
4366    /// Returns whether any closed order matches the optional filter parameters.
4367    #[must_use]
4368    pub fn has_orders_closed(
4369        &self,
4370        venue: Option<&Venue>,
4371        instrument_id: Option<&InstrumentId>,
4372        strategy_id: Option<&StrategyId>,
4373        account_id: Option<&AccountId>,
4374        side: Option<OrderSide>,
4375    ) -> bool {
4376        self.any_orders_in_bucket(
4377            &self.index.orders_closed,
4378            venue,
4379            instrument_id,
4380            strategy_id,
4381            account_id,
4382            side,
4383        )
4384    }
4385
4386    /// Returns whether any locally active order matches the optional filter parameters.
4387    ///
4388    /// Locally active orders are in the `INITIALIZED`, `EMULATED`, or `RELEASED` state.
4389    #[must_use]
4390    pub fn has_orders_active_local(
4391        &self,
4392        venue: Option<&Venue>,
4393        instrument_id: Option<&InstrumentId>,
4394        strategy_id: Option<&StrategyId>,
4395        account_id: Option<&AccountId>,
4396        side: Option<OrderSide>,
4397    ) -> bool {
4398        self.any_orders_in_bucket(
4399            &self.index.orders_active_local,
4400            venue,
4401            instrument_id,
4402            strategy_id,
4403            account_id,
4404            side,
4405        )
4406    }
4407
4408    /// Returns whether any emulated order matches the optional filter parameters.
4409    #[must_use]
4410    pub fn has_orders_emulated(
4411        &self,
4412        venue: Option<&Venue>,
4413        instrument_id: Option<&InstrumentId>,
4414        strategy_id: Option<&StrategyId>,
4415        account_id: Option<&AccountId>,
4416        side: Option<OrderSide>,
4417    ) -> bool {
4418        self.any_orders_in_bucket(
4419            &self.index.orders_emulated,
4420            venue,
4421            instrument_id,
4422            strategy_id,
4423            account_id,
4424            side,
4425        )
4426    }
4427
4428    /// Returns whether any in-flight order matches the optional filter parameters.
4429    #[must_use]
4430    pub fn has_orders_inflight(
4431        &self,
4432        venue: Option<&Venue>,
4433        instrument_id: Option<&InstrumentId>,
4434        strategy_id: Option<&StrategyId>,
4435        account_id: Option<&AccountId>,
4436        side: Option<OrderSide>,
4437    ) -> bool {
4438        self.any_orders_in_bucket(
4439            &self.index.orders_inflight,
4440            venue,
4441            instrument_id,
4442            strategy_id,
4443            account_id,
4444            side,
4445        )
4446    }
4447
4448    /// Returns whether any order (in any state) matches the optional filter parameters.
4449    #[must_use]
4450    pub fn has_orders(
4451        &self,
4452        venue: Option<&Venue>,
4453        instrument_id: Option<&InstrumentId>,
4454        strategy_id: Option<&StrategyId>,
4455        account_id: Option<&AccountId>,
4456        side: Option<OrderSide>,
4457    ) -> bool {
4458        self.any_orders_in_bucket(
4459            &self.index.orders,
4460            venue,
4461            instrument_id,
4462            strategy_id,
4463            account_id,
4464            side,
4465        )
4466    }
4467
4468    /// Returns the order list for the `order_list_id`.
4469    #[must_use]
4470    pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
4471        self.order_lists.get(order_list_id)
4472    }
4473
4474    /// Returns all order lists matching the optional filter parameters.
4475    #[must_use]
4476    pub fn order_lists(
4477        &self,
4478        venue: Option<&Venue>,
4479        instrument_id: Option<&InstrumentId>,
4480        strategy_id: Option<&StrategyId>,
4481        account_id: Option<&AccountId>,
4482    ) -> Vec<&OrderList> {
4483        let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
4484
4485        if let Some(venue) = venue {
4486            order_lists.retain(|ol| &ol.instrument_id.venue == venue);
4487        }
4488
4489        if let Some(instrument_id) = instrument_id {
4490            order_lists.retain(|ol| &ol.instrument_id == instrument_id);
4491        }
4492
4493        if let Some(strategy_id) = strategy_id {
4494            order_lists.retain(|ol| &ol.strategy_id == strategy_id);
4495        }
4496
4497        if let Some(account_id) = account_id {
4498            order_lists.retain(|ol| {
4499                ol.client_order_ids.iter().any(|client_order_id| {
4500                    self.orders.get(client_order_id).is_some_and(|order_cell| {
4501                        order_cell.borrow().account_id().as_ref() == Some(account_id)
4502                    })
4503                })
4504            });
4505        }
4506
4507        order_lists
4508    }
4509
4510    /// Returns whether an order list with the `order_list_id` exists.
4511    #[must_use]
4512    pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
4513        self.order_lists.contains_key(order_list_id)
4514    }
4515
4516    // -- EXEC ALGORITHM QUERIES ------------------------------------------------------------------
4517
4518    /// Returns references to all orders associated with the `exec_algorithm_id` matching the
4519    /// optional filter parameters.
4520    #[must_use]
4521    pub fn orders_for_exec_algorithm(
4522        &self,
4523        exec_algorithm_id: &ExecAlgorithmId,
4524        venue: Option<&Venue>,
4525        instrument_id: Option<&InstrumentId>,
4526        strategy_id: Option<&StrategyId>,
4527        account_id: Option<&AccountId>,
4528        side: Option<OrderSide>,
4529    ) -> Vec<OrderRef<'_>> {
4530        let Some(exec_algorithm_order_ids) =
4531            self.index.exec_algorithm_orders.get(exec_algorithm_id)
4532        else {
4533            return Vec::new();
4534        };
4535
4536        let filtered = self.query_orders_in_bucket(
4537            exec_algorithm_order_ids,
4538            venue,
4539            instrument_id,
4540            strategy_id,
4541            account_id,
4542        );
4543        self.get_orders_for_ids(&filtered, side)
4544    }
4545
4546    /// Returns references to all orders with the `exec_spawn_id`.
4547    #[must_use]
4548    pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<OrderRef<'_>> {
4549        match self.index.exec_spawn_orders.get(exec_spawn_id) {
4550            Some(ids) => self.get_orders_for_ids(ids, None),
4551            None => Vec::new(),
4552        }
4553    }
4554
4555    /// Returns the total order quantity for the `exec_spawn_id`.
4556    #[must_use]
4557    pub fn exec_spawn_total_quantity(
4558        &self,
4559        exec_spawn_id: &ClientOrderId,
4560        active_only: bool,
4561    ) -> Option<Quantity> {
4562        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
4563
4564        let mut total_quantity: Option<Quantity> = None;
4565
4566        for spawn_order in exec_spawn_orders {
4567            if active_only && spawn_order.is_closed() {
4568                continue;
4569            }
4570
4571            match total_quantity.as_mut() {
4572                Some(total) => *total = *total + spawn_order.quantity(),
4573                None => total_quantity = Some(spawn_order.quantity()),
4574            }
4575        }
4576
4577        total_quantity
4578    }
4579
4580    /// Returns the total filled quantity for all orders with the `exec_spawn_id`.
4581    #[must_use]
4582    pub fn exec_spawn_total_filled_qty(
4583        &self,
4584        exec_spawn_id: &ClientOrderId,
4585        active_only: bool,
4586    ) -> Option<Quantity> {
4587        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
4588
4589        let mut total_quantity: Option<Quantity> = None;
4590
4591        for spawn_order in exec_spawn_orders {
4592            if active_only && spawn_order.is_closed() {
4593                continue;
4594            }
4595
4596            match total_quantity.as_mut() {
4597                Some(total) => *total = *total + spawn_order.filled_qty(),
4598                None => total_quantity = Some(spawn_order.filled_qty()),
4599            }
4600        }
4601
4602        total_quantity
4603    }
4604
4605    /// Returns the total leaves quantity for all orders with the `exec_spawn_id`.
4606    #[must_use]
4607    pub fn exec_spawn_total_leaves_qty(
4608        &self,
4609        exec_spawn_id: &ClientOrderId,
4610        active_only: bool,
4611    ) -> Option<Quantity> {
4612        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
4613
4614        let mut total_quantity: Option<Quantity> = None;
4615
4616        for spawn_order in exec_spawn_orders {
4617            if active_only && spawn_order.is_closed() {
4618                continue;
4619            }
4620
4621            match total_quantity.as_mut() {
4622                Some(total) => *total = *total + spawn_order.leaves_qty(),
4623                None => total_quantity = Some(spawn_order.leaves_qty()),
4624            }
4625        }
4626
4627        total_quantity
4628    }
4629
4630    // -- POSITION QUERIES ------------------------------------------------------------------------
4631
4632    /// Returns a borrow of the position with the `position_id` (if found).
4633    #[must_use]
4634    pub fn position(&self, position_id: &PositionId) -> Option<PositionRef<'_>> {
4635        self.positions
4636            .get(position_id)
4637            .map(|position_cell| PositionRef::new(position_cell.borrow()))
4638    }
4639
4640    /// Gets an exclusive write borrow of the position with the `position_id` (if found).
4641    ///
4642    /// Requires `&mut Cache` so cache writes are reachable only by privileged crates that hold
4643    /// `Rc<RefCell<Cache>>` directly. Adapter-facing code receives [`CacheView`], which only
4644    /// exposes immutable cache borrows and therefore cannot reach this method.
4645    ///
4646    /// While the returned [`PositionRefMut`] is alive, no other read or write of the same position
4647    /// is permitted. Drop the borrow before dispatching events or taking any other cache borrow
4648    /// that may re-enter the same position.
4649    #[must_use]
4650    pub fn position_mut(&mut self, position_id: &PositionId) -> Option<PositionRefMut<'_>> {
4651        self.positions
4652            .get(position_id)
4653            .map(|position_cell| PositionRefMut::new(position_cell.borrow_mut()))
4654    }
4655
4656    /// Gets an owned snapshot of the position with the `position_id` (if found).
4657    ///
4658    /// Use when downstream needs an owned [`Position`] that crosses a boundary. The snapshot will
4659    /// not reflect later cache mutations.
4660    #[must_use]
4661    pub fn position_owned(&self, position_id: &PositionId) -> Option<Position> {
4662        self.positions
4663            .get(position_id)
4664            .map(|position_cell| position_cell.borrow().clone())
4665    }
4666
4667    /// Returns a borrow of the position for the `client_order_id` (if found).
4668    #[must_use]
4669    pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<PositionRef<'_>> {
4670        self.index
4671            .order_position
4672            .get(client_order_id)
4673            .and_then(|position_id| self.positions.get(position_id))
4674            .map(|position_cell| PositionRef::new(position_cell.borrow()))
4675    }
4676
4677    /// Returns a reference to the position ID for the `client_order_id` (if found).
4678    #[must_use]
4679    pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
4680        self.index.order_position.get(client_order_id)
4681    }
4682
4683    /// Returns borrows of all positions matching the optional filter parameters.
4684    ///
4685    /// Each [`PositionRef`] in the returned vector borrows its underlying cell; mutating any of
4686    /// those positions while the vector is alive will panic at runtime. Drop the vector before
4687    /// issuing writes.
4688    #[must_use]
4689    pub fn positions(
4690        &self,
4691        venue: Option<&Venue>,
4692        instrument_id: Option<&InstrumentId>,
4693        strategy_id: Option<&StrategyId>,
4694        account_id: Option<&AccountId>,
4695        side: Option<PositionSide>,
4696    ) -> Vec<PositionRef<'_>> {
4697        let position_ids = self.position_ids(venue, instrument_id, strategy_id, account_id);
4698        self.get_positions_for_ids(&position_ids, side)
4699    }
4700
4701    /// Returns borrows of all open positions matching the optional filter parameters.
4702    #[must_use]
4703    pub fn positions_open(
4704        &self,
4705        venue: Option<&Venue>,
4706        instrument_id: Option<&InstrumentId>,
4707        strategy_id: Option<&StrategyId>,
4708        account_id: Option<&AccountId>,
4709        side: Option<PositionSide>,
4710    ) -> Vec<PositionRef<'_>> {
4711        let position_ids = self.position_open_ids(venue, instrument_id, strategy_id, account_id);
4712        self.get_positions_for_ids(&position_ids, side)
4713    }
4714
4715    /// Returns borrows of all closed positions matching the optional filter parameters.
4716    #[must_use]
4717    pub fn positions_closed(
4718        &self,
4719        venue: Option<&Venue>,
4720        instrument_id: Option<&InstrumentId>,
4721        strategy_id: Option<&StrategyId>,
4722        account_id: Option<&AccountId>,
4723        side: Option<PositionSide>,
4724    ) -> Vec<PositionRef<'_>> {
4725        let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id, account_id);
4726        self.get_positions_for_ids(&position_ids, side)
4727    }
4728
4729    /// Returns whether a position with the `position_id` exists.
4730    #[must_use]
4731    pub fn position_exists(&self, position_id: &PositionId) -> bool {
4732        self.index.positions.contains(position_id)
4733    }
4734
4735    /// Returns whether a position with the `position_id` is open.
4736    #[must_use]
4737    pub fn is_position_open(&self, position_id: &PositionId) -> bool {
4738        self.index.positions_open.contains(position_id)
4739    }
4740
4741    /// Returns whether a position with the `position_id` is closed.
4742    #[must_use]
4743    pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
4744        self.index.positions_closed.contains(position_id)
4745    }
4746
4747    /// Returns the count of all open positions.
4748    #[must_use]
4749    pub fn positions_open_count(
4750        &self,
4751        venue: Option<&Venue>,
4752        instrument_id: Option<&InstrumentId>,
4753        strategy_id: Option<&StrategyId>,
4754        account_id: Option<&AccountId>,
4755        side: Option<PositionSide>,
4756    ) -> usize {
4757        self.count_positions_in_bucket(
4758            &self.index.positions_open,
4759            venue,
4760            instrument_id,
4761            strategy_id,
4762            account_id,
4763            side,
4764        )
4765    }
4766
4767    /// Returns the count of all closed positions.
4768    #[must_use]
4769    pub fn positions_closed_count(
4770        &self,
4771        venue: Option<&Venue>,
4772        instrument_id: Option<&InstrumentId>,
4773        strategy_id: Option<&StrategyId>,
4774        account_id: Option<&AccountId>,
4775        side: Option<PositionSide>,
4776    ) -> usize {
4777        self.count_positions_in_bucket(
4778            &self.index.positions_closed,
4779            venue,
4780            instrument_id,
4781            strategy_id,
4782            account_id,
4783            side,
4784        )
4785    }
4786
4787    /// Returns the count of all positions.
4788    #[must_use]
4789    pub fn positions_total_count(
4790        &self,
4791        venue: Option<&Venue>,
4792        instrument_id: Option<&InstrumentId>,
4793        strategy_id: Option<&StrategyId>,
4794        account_id: Option<&AccountId>,
4795        side: Option<PositionSide>,
4796    ) -> usize {
4797        self.count_positions_in_bucket(
4798            &self.index.positions,
4799            venue,
4800            instrument_id,
4801            strategy_id,
4802            account_id,
4803            side,
4804        )
4805    }
4806
4807    /// Returns whether any open position matches the optional filter parameters.
4808    ///
4809    /// Short-circuits on the first match, avoiding the full intersection walk performed by
4810    /// [`Self::positions_open_count`]. Prefer this over `positions_open_count(...) > 0` when
4811    /// only existence matters.
4812    #[must_use]
4813    pub fn has_positions_open(
4814        &self,
4815        venue: Option<&Venue>,
4816        instrument_id: Option<&InstrumentId>,
4817        strategy_id: Option<&StrategyId>,
4818        account_id: Option<&AccountId>,
4819        side: Option<PositionSide>,
4820    ) -> bool {
4821        self.any_positions_in_bucket(
4822            &self.index.positions_open,
4823            venue,
4824            instrument_id,
4825            strategy_id,
4826            account_id,
4827            side,
4828        )
4829    }
4830
4831    /// Returns whether any closed position matches the optional filter parameters.
4832    #[must_use]
4833    pub fn has_positions_closed(
4834        &self,
4835        venue: Option<&Venue>,
4836        instrument_id: Option<&InstrumentId>,
4837        strategy_id: Option<&StrategyId>,
4838        account_id: Option<&AccountId>,
4839        side: Option<PositionSide>,
4840    ) -> bool {
4841        self.any_positions_in_bucket(
4842            &self.index.positions_closed,
4843            venue,
4844            instrument_id,
4845            strategy_id,
4846            account_id,
4847            side,
4848        )
4849    }
4850
4851    /// Returns whether any position (open or closed) matches the optional filter parameters.
4852    #[must_use]
4853    pub fn has_positions(
4854        &self,
4855        venue: Option<&Venue>,
4856        instrument_id: Option<&InstrumentId>,
4857        strategy_id: Option<&StrategyId>,
4858        account_id: Option<&AccountId>,
4859        side: Option<PositionSide>,
4860    ) -> bool {
4861        self.any_positions_in_bucket(
4862            &self.index.positions,
4863            venue,
4864            instrument_id,
4865            strategy_id,
4866            account_id,
4867            side,
4868        )
4869    }
4870
4871    // -- STRATEGY QUERIES ------------------------------------------------------------------------
4872
4873    /// Gets a reference to the strategy ID for the `client_order_id` (if found).
4874    #[must_use]
4875    pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
4876        self.index.order_strategy.get(client_order_id)
4877    }
4878
4879    /// Gets a reference to the strategy ID for the `position_id` (if found).
4880    #[must_use]
4881    pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
4882        self.index.position_strategy.get(position_id)
4883    }
4884
4885    // -- GENERAL ---------------------------------------------------------------------------------
4886
4887    /// Gets a reference to the general value for the `key` (if found).
4888    ///
4889    /// # Errors
4890    ///
4891    /// Returns an error if the `key` is invalid.
4892    pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
4893        check_valid_string_ascii(key, stringify!(key))?;
4894
4895        Ok(self.general.get(key))
4896    }
4897
4898    // -- DATA QUERIES ----------------------------------------------------------------------------
4899
4900    /// Returns the price for the `instrument_id` and `price_type` (if found).
4901    #[must_use]
4902    pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
4903        match price_type {
4904            PriceType::Bid => self
4905                .quotes
4906                .get(instrument_id)
4907                .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
4908            PriceType::Ask => self
4909                .quotes
4910                .get(instrument_id)
4911                .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
4912            PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
4913                quotes.front().map(|quote| {
4914                    Price::new(
4915                        f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
4916                        quote.bid_price.precision + 1,
4917                    )
4918                })
4919            }),
4920            PriceType::Last => self
4921                .trades
4922                .get(instrument_id)
4923                .and_then(|trades| trades.front().map(|trade| trade.price)),
4924            PriceType::Mark => self
4925                .mark_prices
4926                .get(instrument_id)
4927                .and_then(|marks| marks.front().map(|mark| mark.value)),
4928        }
4929    }
4930
4931    /// Gets all quotes for the `instrument_id`.
4932    #[must_use]
4933    pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
4934        self.quotes
4935            .get(instrument_id)
4936            .map(|quotes| quotes.iter().copied().collect())
4937    }
4938
4939    /// Gets all trades for the `instrument_id`.
4940    #[must_use]
4941    pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
4942        self.trades
4943            .get(instrument_id)
4944            .map(|trades| trades.iter().copied().collect())
4945    }
4946
4947    /// Gets all mark price updates for the `instrument_id`.
4948    #[must_use]
4949    pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
4950        self.mark_prices
4951            .get(instrument_id)
4952            .map(|mark_prices| mark_prices.iter().copied().collect())
4953    }
4954
4955    /// Gets all index price updates for the `instrument_id`.
4956    #[must_use]
4957    pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
4958        self.index_prices
4959            .get(instrument_id)
4960            .map(|index_prices| index_prices.iter().copied().collect())
4961    }
4962
4963    /// Gets all funding rate updates for the `instrument_id`.
4964    #[must_use]
4965    pub fn funding_rates(&self, instrument_id: &InstrumentId) -> Option<Vec<FundingRateUpdate>> {
4966        self.funding_rates
4967            .get(instrument_id)
4968            .map(|funding_rates| funding_rates.iter().copied().collect())
4969    }
4970
4971    /// Gets all instrument status updates for the `instrument_id`.
4972    #[must_use]
4973    pub fn instrument_statuses(
4974        &self,
4975        instrument_id: &InstrumentId,
4976    ) -> Option<Vec<InstrumentStatus>> {
4977        self.instrument_statuses
4978            .get(instrument_id)
4979            .map(|statuses| statuses.iter().copied().collect())
4980    }
4981
4982    /// Gets all bars for the `bar_type`.
4983    #[must_use]
4984    pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
4985        self.bars
4986            .get(bar_type)
4987            .map(|bars| bars.iter().copied().collect())
4988    }
4989
4990    /// Gets a reference to the order book for the `instrument_id`.
4991    #[must_use]
4992    pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
4993        self.books.get(instrument_id)
4994    }
4995
4996    /// Gets a reference to the order book for the `instrument_id`.
4997    #[must_use]
4998    pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
4999        self.books.get_mut(instrument_id)
5000    }
5001
5002    /// Gets a reference to the own order book for the `instrument_id`.
5003    #[must_use]
5004    pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
5005        self.own_books.get(instrument_id)
5006    }
5007
5008    /// Gets a reference to the own order book for the `instrument_id`.
5009    #[must_use]
5010    pub fn own_order_book_mut(
5011        &mut self,
5012        instrument_id: &InstrumentId,
5013    ) -> Option<&mut OwnOrderBook> {
5014        self.own_books.get_mut(instrument_id)
5015    }
5016
5017    /// Gets a reference to the latest quote for the `instrument_id`.
5018    #[must_use]
5019    pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
5020        self.quotes
5021            .get(instrument_id)
5022            .and_then(|quotes| quotes.front())
5023    }
5024
5025    /// Gets a reference to the quote at `index` for the `instrument_id`.
5026    ///
5027    /// Index 0 is the most recent.
5028    #[must_use]
5029    pub fn quote_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<&QuoteTick> {
5030        self.quotes
5031            .get(instrument_id)
5032            .and_then(|quotes| quotes.get(index))
5033    }
5034
5035    /// Gets a reference to the latest trade for the `instrument_id`.
5036    #[must_use]
5037    pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
5038        self.trades
5039            .get(instrument_id)
5040            .and_then(|trades| trades.front())
5041    }
5042
5043    /// Gets a reference to the trade at `index` for the `instrument_id`.
5044    ///
5045    /// Index 0 is the most recent.
5046    #[must_use]
5047    pub fn trade_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<&TradeTick> {
5048        self.trades
5049            .get(instrument_id)
5050            .and_then(|trades| trades.get(index))
5051    }
5052
5053    /// Gets a reference to the latest mark price update for the `instrument_id`.
5054    #[must_use]
5055    pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
5056        self.mark_prices
5057            .get(instrument_id)
5058            .and_then(|mark_prices| mark_prices.front())
5059    }
5060
5061    /// Gets a reference to the latest index price update for the `instrument_id`.
5062    #[must_use]
5063    pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
5064        self.index_prices
5065            .get(instrument_id)
5066            .and_then(|index_prices| index_prices.front())
5067    }
5068
5069    /// Gets a reference to the latest funding rate update for the `instrument_id`.
5070    #[must_use]
5071    pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
5072        self.funding_rates
5073            .get(instrument_id)
5074            .and_then(|funding_rates| funding_rates.front())
5075    }
5076
5077    /// Gets a reference to the latest instrument status update for the `instrument_id`.
5078    #[must_use]
5079    pub fn instrument_status(&self, instrument_id: &InstrumentId) -> Option<&InstrumentStatus> {
5080        self.instrument_statuses
5081            .get(instrument_id)
5082            .and_then(|statuses| statuses.front())
5083    }
5084
5085    /// Gets a reference to the latest bar for the `bar_type`.
5086    #[must_use]
5087    pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
5088        self.bars.get(bar_type).and_then(|bars| bars.front())
5089    }
5090
5091    /// Gets a reference to the bar at `index` for the `bar_type`.
5092    ///
5093    /// Index 0 is the most recent.
5094    #[must_use]
5095    pub fn bar_at_index(&self, bar_type: &BarType, index: usize) -> Option<&Bar> {
5096        self.bars.get(bar_type).and_then(|bars| bars.get(index))
5097    }
5098
5099    /// Gets the order book update count for the `instrument_id`.
5100    #[must_use]
5101    pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
5102        self.books
5103            .get(instrument_id)
5104            .map_or(0, |book| book.update_count) as usize
5105    }
5106
5107    /// Gets the quote tick count for the `instrument_id`.
5108    #[must_use]
5109    pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
5110        self.quotes
5111            .get(instrument_id)
5112            .map_or(0, BoundedVecDeque::len)
5113    }
5114
5115    /// Gets the trade tick count for the `instrument_id`.
5116    #[must_use]
5117    pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
5118        self.trades
5119            .get(instrument_id)
5120            .map_or(0, BoundedVecDeque::len)
5121    }
5122
5123    /// Gets the bar count for the `instrument_id`.
5124    #[must_use]
5125    pub fn bar_count(&self, bar_type: &BarType) -> usize {
5126        self.bars.get(bar_type).map_or(0, BoundedVecDeque::len)
5127    }
5128
5129    /// Returns whether the cache contains an order book for the `instrument_id`.
5130    #[must_use]
5131    pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
5132        self.books.contains_key(instrument_id)
5133    }
5134
5135    /// Returns whether the cache contains quotes for the `instrument_id`.
5136    #[must_use]
5137    pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
5138        self.quote_count(instrument_id) > 0
5139    }
5140
5141    /// Returns whether the cache contains trades for the `instrument_id`.
5142    #[must_use]
5143    pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
5144        self.trade_count(instrument_id) > 0
5145    }
5146
5147    /// Returns whether the cache contains bars for the `bar_type`.
5148    #[must_use]
5149    pub fn has_bars(&self, bar_type: &BarType) -> bool {
5150        self.bar_count(bar_type) > 0
5151    }
5152
5153    #[must_use]
5154    pub fn get_xrate(
5155        &self,
5156        venue: Venue,
5157        from_currency: Currency,
5158        to_currency: Currency,
5159        price_type: PriceType,
5160    ) -> Option<Decimal> {
5161        if from_currency == to_currency {
5162            // When the source and target currencies are identical,
5163            // no conversion is needed; return an exchange rate of one.
5164            return Some(Decimal::ONE);
5165        }
5166
5167        let (bid_quote, ask_quote) = self.build_quote_table(&venue);
5168
5169        match get_exchange_rate(
5170            from_currency.code,
5171            to_currency.code,
5172            price_type,
5173            bid_quote,
5174            ask_quote,
5175        ) {
5176            Ok(rate) => rate,
5177            Err(e) => {
5178                log::error!("Failed to calculate xrate: {e}");
5179                None
5180            }
5181        }
5182    }
5183
5184    fn build_quote_table(
5185        &self,
5186        venue: &Venue,
5187    ) -> (AHashMap<Ustr, Decimal>, AHashMap<Ustr, Decimal>) {
5188        let mut bid_quotes = AHashMap::new();
5189        let mut ask_quotes = AHashMap::new();
5190
5191        for instrument_id in self.instruments.keys() {
5192            if instrument_id.venue != *venue {
5193                continue;
5194            }
5195
5196            let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
5197                if let Some(tick) = ticks.front() {
5198                    (tick.bid_price, tick.ask_price)
5199                } else {
5200                    continue; // Empty ticks vector
5201                }
5202            } else {
5203                let bid_bar = self
5204                    .bars
5205                    .iter()
5206                    .find(|(k, _)| {
5207                        k.instrument_id() == *instrument_id
5208                            && matches!(k.spec().price_type, PriceType::Bid)
5209                    })
5210                    .map(|(_, v)| v);
5211
5212                let ask_bar = self
5213                    .bars
5214                    .iter()
5215                    .find(|(k, _)| {
5216                        k.instrument_id() == *instrument_id
5217                            && matches!(k.spec().price_type, PriceType::Ask)
5218                    })
5219                    .map(|(_, v)| v);
5220
5221                match (bid_bar, ask_bar) {
5222                    (Some(bid), Some(ask)) => {
5223                        match (bid.front(), ask.front()) {
5224                            (Some(bid_bar), Some(ask_bar)) => (bid_bar.close, ask_bar.close),
5225                            _ => {
5226                                // Empty bar VecDeques
5227                                continue;
5228                            }
5229                        }
5230                    }
5231                    _ => continue,
5232                }
5233            };
5234
5235            bid_quotes.insert(instrument_id.symbol.inner(), bid_price.as_decimal());
5236            ask_quotes.insert(instrument_id.symbol.inner(), ask_price.as_decimal());
5237        }
5238
5239        (bid_quotes, ask_quotes)
5240    }
5241
5242    /// Returns the mark exchange rate for the given currency pair, or `None` if not set.
5243    #[must_use]
5244    pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
5245        self.mark_xrates.get(&(from_currency, to_currency)).copied()
5246    }
5247
5248    /// Sets the mark exchange rate for the given currency pair and automatically sets the inverse rate.
5249    ///
5250    /// # Panics
5251    ///
5252    /// Panics if `xrate` is not positive.
5253    pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
5254        assert!(xrate > 0.0, "xrate was zero");
5255        self.mark_xrates.insert((from_currency, to_currency), xrate);
5256        self.mark_xrates
5257            .insert((to_currency, from_currency), 1.0 / xrate);
5258    }
5259
5260    /// Clears the mark exchange rate for the given currency pair.
5261    pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
5262        let _ = self.mark_xrates.remove(&(from_currency, to_currency));
5263    }
5264
5265    /// Clears all mark exchange rates.
5266    pub fn clear_mark_xrates(&mut self) {
5267        self.mark_xrates.clear();
5268    }
5269
5270    // -- INSTRUMENT QUERIES ----------------------------------------------------------------------
5271
5272    /// Returns a reference to the instrument for the `instrument_id` (if found).
5273    #[must_use]
5274    pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
5275        self.instruments.get(instrument_id)
5276    }
5277
5278    /// Returns references to all instrument IDs for the `venue`.
5279    #[must_use]
5280    pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
5281        match venue {
5282            Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
5283            None => self.instruments.keys().collect(),
5284        }
5285    }
5286
5287    /// Returns references to all instruments for the `venue`.
5288    #[must_use]
5289    pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
5290        self.instruments
5291            .values()
5292            .filter(|i| &i.id().venue == venue)
5293            .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
5294            .collect()
5295    }
5296
5297    /// Returns references to all instruments for the `venue` whose underlying
5298    /// equals `root` and whose [`InstrumentClass`] equals `class`.
5299    ///
5300    /// Use when expanding a parent-symbol subscription: filtering by class as
5301    /// well as root prevents leaves of a different class (e.g. options when
5302    /// the user asked for futures, or vice versa) from being pulled in.
5303    #[must_use]
5304    pub fn instruments_by_parent(
5305        &self,
5306        venue: &Venue,
5307        root: &Ustr,
5308        class: InstrumentClass,
5309    ) -> Vec<&InstrumentAny> {
5310        self.instruments
5311            .values()
5312            .filter(|i| &i.id().venue == venue)
5313            .filter(|i| i.underlying() == Some(*root))
5314            .filter(|i| i.instrument_class() == class)
5315            .collect()
5316    }
5317
5318    /// Returns references to all bar types contained in the cache.
5319    #[must_use]
5320    pub fn bar_types(
5321        &self,
5322        instrument_id: Option<&InstrumentId>,
5323        price_type: Option<&PriceType>,
5324        aggregation_source: AggregationSource,
5325    ) -> Vec<&BarType> {
5326        let mut bar_types = self
5327            .bars
5328            .keys()
5329            .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
5330            .collect::<Vec<&BarType>>();
5331
5332        if let Some(instrument_id) = instrument_id {
5333            bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
5334        }
5335
5336        if let Some(price_type) = price_type {
5337            bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
5338        }
5339
5340        bar_types
5341    }
5342
5343    // -- SYNTHETIC QUERIES -----------------------------------------------------------------------
5344
5345    /// Returns a reference to the synthetic instrument for the `instrument_id` (if found).
5346    #[must_use]
5347    pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
5348        self.synthetics.get(instrument_id)
5349    }
5350
5351    /// Returns references to instrument IDs for all synthetic instruments contained in the cache.
5352    #[must_use]
5353    pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
5354        self.synthetics.keys().collect()
5355    }
5356
5357    /// Returns references to all synthetic instruments contained in the cache.
5358    #[must_use]
5359    pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
5360        self.synthetics.values().collect()
5361    }
5362
5363    // -- ACCOUNT QUERIES -----------------------------------------------------------------------
5364
5365    /// Returns a borrow of the account for the `account_id` (if found).
5366    #[must_use]
5367    pub fn account(&self, account_id: &AccountId) -> Option<AccountRef<'_>> {
5368        self.accounts
5369            .get(account_id)
5370            .map(|account_cell| AccountRef::new(account_cell.borrow()))
5371    }
5372
5373    /// Gets an exclusive write borrow of the account with the `account_id` (if found).
5374    ///
5375    /// Requires `&mut Cache` so cache writes are reachable only by privileged crates that hold
5376    /// `Rc<RefCell<Cache>>` directly. Adapter-facing code receives [`CacheView`], which only
5377    /// exposes immutable cache borrows and therefore cannot reach this method.
5378    ///
5379    /// While the returned [`AccountRefMut`] is alive, no other read or write of the same account
5380    /// is permitted. Drop the borrow before dispatching events or taking any other cache borrow
5381    /// that may re-enter the same account.
5382    #[must_use]
5383    pub fn account_mut(&mut self, account_id: &AccountId) -> Option<AccountRefMut<'_>> {
5384        self.accounts
5385            .get(account_id)
5386            .map(|account_cell| AccountRefMut::new(account_cell.borrow_mut()))
5387    }
5388
5389    /// Gets an owned snapshot of the account with the `account_id` (if found).
5390    ///
5391    /// Use when downstream needs an owned [`AccountAny`] that crosses a boundary. The snapshot
5392    /// will not reflect later cache mutations.
5393    #[must_use]
5394    pub fn account_owned(&self, account_id: &AccountId) -> Option<AccountAny> {
5395        self.accounts
5396            .get(account_id)
5397            .map(|account_cell| account_cell.borrow().clone())
5398    }
5399
5400    /// Returns a borrow of the account for the `venue` (if found).
5401    #[must_use]
5402    pub fn account_for_venue(&self, venue: &Venue) -> Option<AccountRef<'_>> {
5403        self.index
5404            .venue_account
5405            .get(venue)
5406            .and_then(|account_id| self.accounts.get(account_id))
5407            .map(|account_cell| AccountRef::new(account_cell.borrow()))
5408    }
5409
5410    /// Returns an owned snapshot of the account for the `venue` (if found).
5411    ///
5412    /// Use when downstream needs an owned [`AccountAny`] that crosses a boundary. The snapshot
5413    /// will not reflect later cache mutations.
5414    #[must_use]
5415    pub fn account_for_venue_owned(&self, venue: &Venue) -> Option<AccountAny> {
5416        self.index
5417            .venue_account
5418            .get(venue)
5419            .and_then(|account_id| self.accounts.get(account_id))
5420            .map(|account_cell| account_cell.borrow().clone())
5421    }
5422
5423    /// Returns a reference to the account ID for the `venue` (if found).
5424    #[must_use]
5425    pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
5426        self.index.venue_account.get(venue)
5427    }
5428
5429    /// Returns borrows of all accounts for the `account_id`.
5430    ///
5431    /// Each [`AccountRef`] in the returned vector borrows its underlying cell; mutating any of
5432    /// those accounts while the vector is alive will panic at runtime. Drop the vector before
5433    /// issuing writes.
5434    #[must_use]
5435    pub fn accounts(&self, account_id: &AccountId) -> Vec<AccountRef<'_>> {
5436        self.accounts
5437            .values()
5438            .filter(|account_cell| &account_cell.borrow().id() == account_id)
5439            .map(|account_cell| AccountRef::new(account_cell.borrow()))
5440            .collect()
5441    }
5442
5443    /// Updates the own order book with an order.
5444    ///
5445    /// This method adds, updates, or removes an order from the own order book
5446    /// based on the order's current state.
5447    ///
5448    /// Orders without prices (MARKET, etc.) are skipped as they cannot be
5449    /// represented in own books.
5450    pub fn update_own_order_book(&mut self, order: &OrderAny) {
5451        if !order.has_price() {
5452            return;
5453        }
5454
5455        let instrument_id = order.instrument_id();
5456
5457        if !self.own_books.contains_key(&instrument_id) {
5458            if order.is_closed() {
5459                return;
5460            }
5461
5462            self.own_books
5463                .insert(instrument_id, OwnOrderBook::new(instrument_id));
5464        }
5465
5466        let Some(own_book) = self.own_books.get_mut(&instrument_id) else {
5467            return;
5468        };
5469
5470        let own_book_order = order.to_own_book_order();
5471
5472        if order.is_closed() {
5473            if let Err(e) = own_book.delete(own_book_order) {
5474                log::debug!(
5475                    "Failed to delete order {} from own book: {e}",
5476                    order.client_order_id(),
5477                );
5478            } else {
5479                log::debug!("Deleted order {} from own book", order.client_order_id());
5480            }
5481        } else {
5482            // Add or update the order in the own book
5483            if let Err(e) = own_book.update(own_book_order) {
5484                log::debug!(
5485                    "Failed to update order {} in own book: {e}; inserting instead",
5486                    order.client_order_id(),
5487                );
5488                own_book.add(own_book_order);
5489            }
5490            log::debug!("Updated order {} in own book", order.client_order_id());
5491        }
5492    }
5493
5494    /// Force removal of an order from own order books and clean up all indexes.
5495    ///
5496    /// This method is used when order event application fails and we need to ensure
5497    /// terminal orders are properly cleaned up from own books and all relevant indexes.
5498    /// Replicates the index cleanup that `update_order` performs for closed orders.
5499    pub fn force_remove_from_own_order_book(&mut self, client_order_id: &ClientOrderId) {
5500        let Some(order_cell) = self.orders.get(client_order_id) else {
5501            return;
5502        };
5503        let order = order_cell.borrow();
5504        let instrument_id = order.instrument_id();
5505        let own_book_order = if order.has_price() {
5506            Some(order.to_own_book_order())
5507        } else {
5508            None
5509        };
5510        drop(order);
5511
5512        self.index.orders_open.remove(client_order_id);
5513        self.index.orders_pending_cancel.remove(client_order_id);
5514        self.index.orders_inflight.remove(client_order_id);
5515        self.index.orders_emulated.remove(client_order_id);
5516        self.index.orders_active_local.remove(client_order_id);
5517
5518        if let Some(own_book) = self.own_books.get_mut(&instrument_id)
5519            && let Some(own_book_order) = own_book_order
5520        {
5521            if let Err(e) = own_book.delete(own_book_order) {
5522                log::debug!("Could not force delete {client_order_id} from own book: {e}");
5523            } else {
5524                log::debug!("Force deleted {client_order_id} from own book");
5525            }
5526        }
5527
5528        self.index.orders_closed.insert(*client_order_id);
5529    }
5530
5531    /// Audit all own order books against open and inflight order indexes.
5532    ///
5533    /// Ensures closed orders are removed from own order books. This includes both
5534    /// orders tracked in `orders_open` (`ACCEPTED`, `TRIGGERED`, `PENDING_*`, `PARTIALLY_FILLED`)
5535    /// and `orders_inflight` (`INITIALIZED`, `SUBMITTED`) to prevent false positives
5536    /// during venue latency windows.
5537    pub fn audit_own_order_books(&mut self) {
5538        log::debug!("Starting own books audit");
5539        let start = std::time::Instant::now();
5540
5541        // Build union of open and inflight orders for audit,
5542        // this prevents false positives for SUBMITTED orders during venue latency.
5543        let valid_order_ids: AHashSet<ClientOrderId> = self
5544            .index
5545            .orders_open
5546            .union(&self.index.orders_inflight)
5547            .copied()
5548            .collect();
5549
5550        for own_book in self.own_books.values_mut() {
5551            own_book.audit_open_orders(&valid_order_ids);
5552        }
5553
5554        log::debug!("Completed own books audit in {:?}", start.elapsed());
5555    }
5556}
5557
5558fn parse_position_snapshot_blob_ref(blob_ref: &str) -> anyhow::Result<(PositionId, usize)> {
5559    let Some(rest) = blob_ref.strip_prefix("cache://position-snapshots/") else {
5560        anyhow::bail!("unsupported cache snapshot blob_ref {blob_ref}");
5561    };
5562
5563    let Some((position_id, snapshot_index)) = rest.rsplit_once('/') else {
5564        anyhow::bail!("malformed position snapshot blob_ref {blob_ref}");
5565    };
5566
5567    if position_id.is_empty() {
5568        anyhow::bail!("position snapshot blob_ref {blob_ref} has empty position id");
5569    }
5570
5571    let snapshot_index = snapshot_index.parse::<usize>().map_err(|e| {
5572        anyhow::anyhow!("position snapshot blob_ref {blob_ref} has invalid frame index: {e}")
5573    })?;
5574
5575    Ok((PositionId::new(position_id), snapshot_index))
5576}
5577
5578fn validate_position_snapshot_blob(position_id: &PositionId, blob: &[u8]) -> anyhow::Result<()> {
5579    let snapshot = serde_json::from_slice::<Position>(blob)?;
5580    let expected_prefix = format!("{}-", position_id.as_str());
5581
5582    let Some(snapshot_uuid) = snapshot.id.as_str().strip_prefix(&expected_prefix) else {
5583        anyhow::bail!(
5584            "position snapshot id {} does not match blob_ref position {position_id}",
5585            snapshot.id
5586        );
5587    };
5588
5589    if UUID4::from_str(snapshot_uuid).is_err() {
5590        anyhow::bail!(
5591            "position snapshot id {} does not match blob_ref position {position_id}",
5592            snapshot.id
5593        );
5594    }
5595
5596    Ok(())
5597}