Skip to main content

nautilus_common/cache/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! In-memory cache for market and execution data, with optional persistent backing.
17//!
18//! Provides methods to load, query, and update cached data such as instruments, orders, and prices.
19
20pub mod config;
21pub mod database;
22mod error;
23pub mod fifo;
24pub mod quote;
25pub mod refs;
26
27mod bounded;
28mod index;
29
30#[cfg(test)]
31mod tests;
32
33use std::{
34    borrow::Cow,
35    cell::{Ref, RefCell},
36    fmt::{Debug, Display},
37    rc::Rc,
38    str::FromStr,
39    time::{SystemTime, UNIX_EPOCH},
40};
41
42use ahash::{AHashMap, AHashSet};
43use bounded::BoundedVecDeque;
44use bytes::Bytes;
45pub use config::CacheConfig; // Re-export
46use database::{CacheDatabaseAdapter, CacheMap};
47pub use error::{
48    ACCOUNT_NOT_FOUND, AccountLookupError, CURRENCY_NOT_FOUND, CurrencyLookupError,
49    INSTRUMENT_NOT_FOUND, InstrumentLookupError, ORDER_BOOK_NOT_FOUND, ORDER_LIST_NOT_FOUND,
50    ORDER_NOT_FOUND, OWN_ORDER_BOOK_NOT_FOUND, OrderBookLookupError, OrderListLookupError,
51    OrderLookupError, OwnOrderBookLookupError, POSITION_NOT_FOUND, PositionLookupError,
52    SYNTHETIC_INSTRUMENT_NOT_FOUND, SyntheticInstrumentLookupError,
53};
54use index::CacheIndex;
55use nautilus_core::{
56    SharedCell, UUID4, UnixNanos,
57    correctness::{
58        check_key_not_in_map, check_predicate_false, check_slice_not_empty,
59        check_valid_string_ascii,
60    },
61    datetime::secs_to_nanos_unchecked,
62};
63#[cfg(feature = "defi")]
64use nautilus_model::defi::{Pool, PoolProfiler};
65use nautilus_model::{
66    accounts::{Account, AccountAny},
67    data::{
68        Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, InstrumentStatus,
69        MarkPriceUpdate, QuoteTick, TradeTick, YieldCurveData, option_chain::OptionGreeks,
70    },
71    enums::{
72        AggregationSource, ContingencyType, InstrumentClass, OmsType, OrderSide, PositionSide,
73        PriceType, TriggerType,
74    },
75    events::{AccountState, OrderEventAny},
76    identifiers::{
77        AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
78        OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
79    },
80    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
81    orderbook::{
82        OrderBook,
83        own::{OwnOrderBook, should_handle_own_book_order},
84    },
85    orders::{Order, OrderAny, OrderError, OrderList},
86    position::Position,
87    types::{Currency, Money, Price, Quantity},
88};
89pub use refs::{AccountRef, AccountRefMut, OrderRef, OrderRefMut, PositionRef, PositionRefMut};
90use rust_decimal::Decimal;
91use ustr::Ustr;
92
93use crate::xrate::get_exchange_rate;
94
95/// Cache-owned reference to a snapshot blob.
96///
97/// The cache writes and later fetches the blob; external systems persist this opaque reference
98/// and may hash the bytes before recording a durable anchor.
99#[derive(Clone, Debug, PartialEq, Eq)]
100pub struct CacheSnapshotRef {
101    /// Opaque cache-owned snapshot location.
102    pub blob_ref: String,
103    /// Snapshot bytes stored under [`Self::blob_ref`].
104    pub blob: Bytes,
105}
106
107impl CacheSnapshotRef {
108    /// Creates a new [`CacheSnapshotRef`].
109    #[must_use]
110    pub fn new(blob_ref: impl Into<String>, blob: impl Into<Bytes>) -> Self {
111        Self {
112            blob_ref: blob_ref.into(),
113            blob: blob.into(),
114        }
115    }
116}
117
118// TODO: Reassess whether CacheView should consolidate with CacheApi once adapter and client
119// construction no longer need a cache-handle facade.
120/// Read-only view over the platform cache.
121///
122/// Adapter-facing code receives this type instead of the mutable cache handle so cache writes stay
123/// owned by the data and execution engines.
124#[derive(Clone, Debug)]
125pub struct CacheView {
126    inner: Rc<RefCell<Cache>>,
127}
128
129impl CacheView {
130    /// Creates a new [`CacheView`] from a cache handle.
131    #[must_use]
132    pub fn new(inner: Rc<RefCell<Cache>>) -> Self {
133        Self { inner }
134    }
135
136    /// Borrows the cache immutably.
137    ///
138    /// # Panics
139    ///
140    /// Panics if the cache is already mutably borrowed.
141    pub fn borrow(&self) -> Ref<'_, Cache> {
142        self.inner.borrow()
143    }
144}
145
146impl From<Rc<RefCell<Cache>>> for CacheView {
147    fn from(inner: Rc<RefCell<Cache>>) -> Self {
148        Self::new(inner)
149    }
150}
151
152/// User-facing cache API.
153///
154/// Point reads return owned snapshots where possible, so actor code does not retain a `Ref` into
155/// the live [`Cache`]. Plural collection reads return owned snapshots of all matching values and
156/// are intentionally named as bulk reads. Prefer the count, ID, or `has_*` methods in hot paths
157/// when a full snapshot is not needed.
158#[derive(Debug)]
159pub struct CacheApi<'a> {
160    cache: &'a RefCell<Cache>,
161}
162
163impl<'a> CacheApi<'a> {
164    pub(crate) fn new(cache: &'a RefCell<Cache>) -> Self {
165        Self { cache }
166    }
167
168    /// Returns the unrealized PnL for the `position` using cached market data.
169    ///
170    /// # Panics
171    ///
172    /// Panics if the cache is already mutably borrowed.
173    #[must_use]
174    pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
175        self.cache().calculate_unrealized_pnl(position)
176    }
177
178    /// Returns the OMS type for the `position_id` (if known).
179    ///
180    /// # Panics
181    ///
182    /// Panics if the cache is already mutably borrowed.
183    #[must_use]
184    pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
185        self.cache().oms_type(position_id)
186    }
187
188    /// Returns serialized position snapshot frames for the `position_id`.
189    ///
190    /// # Panics
191    ///
192    /// Panics if the cache is already mutably borrowed.
193    #[must_use]
194    pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<Vec<u8>>> {
195        self.cache().position_snapshot_bytes(position_id)
196    }
197
198    /// Returns the number of stored position snapshots for the `position_id`.
199    ///
200    /// # Panics
201    ///
202    /// Panics if the cache is already mutably borrowed.
203    #[must_use]
204    pub fn position_snapshot_count(&self, position_id: &PositionId) -> usize {
205        self.cache().position_snapshot_count(position_id)
206    }
207
208    /// Returns position snapshots matching the optional filters.
209    ///
210    /// # Panics
211    ///
212    /// Panics if the cache is already mutably borrowed.
213    #[must_use]
214    pub fn position_snapshots(
215        &self,
216        position_id: Option<&PositionId>,
217        account_id: Option<&AccountId>,
218    ) -> Vec<Position> {
219        self.cache().position_snapshots(position_id, account_id)
220    }
221
222    /// Returns position snapshots for `position_id` starting from `skip`.
223    ///
224    /// # Panics
225    ///
226    /// Panics if the cache is already mutably borrowed.
227    #[must_use]
228    pub fn position_snapshots_from(&self, position_id: &PositionId, skip: usize) -> Vec<Position> {
229        self.cache().position_snapshots_from(position_id, skip)
230    }
231
232    /// Returns position snapshot IDs for the `instrument_id`.
233    ///
234    /// # Panics
235    ///
236    /// Panics if the cache is already mutably borrowed.
237    #[must_use]
238    pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> AHashSet<PositionId> {
239        self.cache().position_snapshot_ids(instrument_id)
240    }
241
242    /// Returns the client order IDs of all orders matching the optional filter parameters.
243    ///
244    /// # Panics
245    ///
246    /// Panics if the cache is already mutably borrowed.
247    #[must_use]
248    pub fn client_order_ids(
249        &self,
250        venue: Option<&Venue>,
251        instrument_id: Option<&InstrumentId>,
252        strategy_id: Option<&StrategyId>,
253        account_id: Option<&AccountId>,
254    ) -> AHashSet<ClientOrderId> {
255        self.cache()
256            .client_order_ids(venue, instrument_id, strategy_id, account_id)
257    }
258
259    /// Returns the client order IDs of all open orders matching the optional filter parameters.
260    ///
261    /// # Panics
262    ///
263    /// Panics if the cache is already mutably borrowed.
264    #[must_use]
265    pub fn client_order_ids_open(
266        &self,
267        venue: Option<&Venue>,
268        instrument_id: Option<&InstrumentId>,
269        strategy_id: Option<&StrategyId>,
270        account_id: Option<&AccountId>,
271    ) -> AHashSet<ClientOrderId> {
272        self.cache()
273            .client_order_ids_open(venue, instrument_id, strategy_id, account_id)
274    }
275
276    /// Returns the client order IDs of all closed orders matching the optional filter parameters.
277    ///
278    /// # Panics
279    ///
280    /// Panics if the cache is already mutably borrowed.
281    #[must_use]
282    pub fn client_order_ids_closed(
283        &self,
284        venue: Option<&Venue>,
285        instrument_id: Option<&InstrumentId>,
286        strategy_id: Option<&StrategyId>,
287        account_id: Option<&AccountId>,
288    ) -> AHashSet<ClientOrderId> {
289        self.cache()
290            .client_order_ids_closed(venue, instrument_id, strategy_id, account_id)
291    }
292
293    /// Returns the client order IDs of all locally active orders matching the optional filter parameters.
294    ///
295    /// # Panics
296    ///
297    /// Panics if the cache is already mutably borrowed.
298    #[must_use]
299    pub fn client_order_ids_active_local(
300        &self,
301        venue: Option<&Venue>,
302        instrument_id: Option<&InstrumentId>,
303        strategy_id: Option<&StrategyId>,
304        account_id: Option<&AccountId>,
305    ) -> AHashSet<ClientOrderId> {
306        self.cache()
307            .client_order_ids_active_local(venue, instrument_id, strategy_id, account_id)
308    }
309
310    /// Returns the client order IDs of all emulated orders matching the optional filter parameters.
311    ///
312    /// # Panics
313    ///
314    /// Panics if the cache is already mutably borrowed.
315    #[must_use]
316    pub fn client_order_ids_emulated(
317        &self,
318        venue: Option<&Venue>,
319        instrument_id: Option<&InstrumentId>,
320        strategy_id: Option<&StrategyId>,
321        account_id: Option<&AccountId>,
322    ) -> AHashSet<ClientOrderId> {
323        self.cache()
324            .client_order_ids_emulated(venue, instrument_id, strategy_id, account_id)
325    }
326
327    /// Returns the client order IDs of all in-flight orders matching the optional filter parameters.
328    ///
329    /// # Panics
330    ///
331    /// Panics if the cache is already mutably borrowed.
332    #[must_use]
333    pub fn client_order_ids_inflight(
334        &self,
335        venue: Option<&Venue>,
336        instrument_id: Option<&InstrumentId>,
337        strategy_id: Option<&StrategyId>,
338        account_id: Option<&AccountId>,
339    ) -> AHashSet<ClientOrderId> {
340        self.cache()
341            .client_order_ids_inflight(venue, instrument_id, strategy_id, account_id)
342    }
343
344    /// Returns the position IDs of all positions matching the optional filter parameters.
345    ///
346    /// # Panics
347    ///
348    /// Panics if the cache is already mutably borrowed.
349    #[must_use]
350    pub fn position_ids(
351        &self,
352        venue: Option<&Venue>,
353        instrument_id: Option<&InstrumentId>,
354        strategy_id: Option<&StrategyId>,
355        account_id: Option<&AccountId>,
356    ) -> AHashSet<PositionId> {
357        self.cache()
358            .position_ids(venue, instrument_id, strategy_id, account_id)
359    }
360
361    /// Returns the position IDs of all open positions matching the optional filter parameters.
362    ///
363    /// # Panics
364    ///
365    /// Panics if the cache is already mutably borrowed.
366    #[must_use]
367    pub fn position_open_ids(
368        &self,
369        venue: Option<&Venue>,
370        instrument_id: Option<&InstrumentId>,
371        strategy_id: Option<&StrategyId>,
372        account_id: Option<&AccountId>,
373    ) -> AHashSet<PositionId> {
374        self.cache()
375            .position_open_ids(venue, instrument_id, strategy_id, account_id)
376    }
377
378    /// Returns the position IDs of all closed positions matching the optional filter parameters.
379    ///
380    /// # Panics
381    ///
382    /// Panics if the cache is already mutably borrowed.
383    #[must_use]
384    pub fn position_closed_ids(
385        &self,
386        venue: Option<&Venue>,
387        instrument_id: Option<&InstrumentId>,
388        strategy_id: Option<&StrategyId>,
389        account_id: Option<&AccountId>,
390    ) -> AHashSet<PositionId> {
391        self.cache()
392            .position_closed_ids(venue, instrument_id, strategy_id, account_id)
393    }
394
395    /// Returns the actor IDs in the cache.
396    ///
397    /// # Panics
398    ///
399    /// Panics if the cache is already mutably borrowed.
400    #[must_use]
401    pub fn actor_ids(&self) -> AHashSet<ComponentId> {
402        self.cache().actor_ids()
403    }
404
405    /// Returns the strategy IDs in the cache.
406    ///
407    /// # Panics
408    ///
409    /// Panics if the cache is already mutably borrowed.
410    #[must_use]
411    pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
412        self.cache().strategy_ids()
413    }
414
415    /// Returns the execution algorithm IDs in the cache.
416    ///
417    /// # Panics
418    ///
419    /// Panics if the cache is already mutably borrowed.
420    #[must_use]
421    pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
422        self.cache().exec_algorithm_ids()
423    }
424
425    /// Returns an owned copy of the order for the `client_order_id` (if found).
426    ///
427    /// # Panics
428    ///
429    /// Panics if the cache is already mutably borrowed.
430    #[must_use]
431    pub fn order(&self, client_order_id: &ClientOrderId) -> Option<OrderAny> {
432        self.cache().order_owned(client_order_id)
433    }
434
435    // panics-doc-ok
436    /// Returns an owned copy of the order for the `client_order_id`.
437    ///
438    /// # Errors
439    ///
440    /// Returns [`OrderLookupError::NotFound`] when the order is not present in the cache.
441    ///
442    /// # Panics
443    ///
444    /// Panics if the cache is already mutably borrowed.
445    pub fn try_order(&self, client_order_id: &ClientOrderId) -> Result<OrderAny, OrderLookupError> {
446        self.cache().try_order_owned(client_order_id)
447    }
448
449    /// Returns owned copies of the orders for `client_order_ids`.
450    ///
451    /// # Panics
452    ///
453    /// Panics if the cache is already mutably borrowed.
454    #[must_use]
455    pub fn orders_for_ids(
456        &self,
457        client_order_ids: &[ClientOrderId],
458        context: &dyn Display,
459    ) -> Vec<OrderAny> {
460        self.cache().orders_for_ids(client_order_ids, context)
461    }
462
463    /// Returns the client order ID for the `venue_order_id` (if found).
464    ///
465    /// # Panics
466    ///
467    /// Panics if the cache is already mutably borrowed.
468    #[must_use]
469    pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<ClientOrderId> {
470        self.cache().client_order_id(venue_order_id).copied()
471    }
472
473    /// Returns the venue order ID for the `client_order_id` (if found).
474    ///
475    /// # Panics
476    ///
477    /// Panics if the cache is already mutably borrowed.
478    #[must_use]
479    pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<VenueOrderId> {
480        self.cache().venue_order_id(client_order_id).copied()
481    }
482
483    /// Returns the client ID indexed for the `client_order_id` (if found).
484    ///
485    /// # Panics
486    ///
487    /// Panics if the cache is already mutably borrowed.
488    #[must_use]
489    pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<ClientId> {
490        self.cache().client_id(client_order_id).copied()
491    }
492
493    /// Returns owned copies of all orders matching the optional filter parameters.
494    ///
495    /// # Panics
496    ///
497    /// Panics if the cache is already mutably borrowed.
498    #[must_use]
499    pub fn orders(
500        &self,
501        venue: Option<&Venue>,
502        instrument_id: Option<&InstrumentId>,
503        strategy_id: Option<&StrategyId>,
504        account_id: Option<&AccountId>,
505        side: Option<OrderSide>,
506    ) -> Vec<OrderAny> {
507        self.cache()
508            .orders_refs(venue, instrument_id, strategy_id, account_id, side)
509            .into_iter()
510            .map(|order| order.cloned())
511            .collect()
512    }
513
514    /// Returns owned copies of all open orders matching the optional filter parameters.
515    ///
516    /// # Panics
517    ///
518    /// Panics if the cache is already mutably borrowed.
519    #[must_use]
520    pub fn orders_open(
521        &self,
522        venue: Option<&Venue>,
523        instrument_id: Option<&InstrumentId>,
524        strategy_id: Option<&StrategyId>,
525        account_id: Option<&AccountId>,
526        side: Option<OrderSide>,
527    ) -> Vec<OrderAny> {
528        self.cache()
529            .orders_open_refs(venue, instrument_id, strategy_id, account_id, side)
530            .into_iter()
531            .map(|order| order.cloned())
532            .collect()
533    }
534
535    /// Returns owned copies of all closed orders matching the optional filter parameters.
536    ///
537    /// # Panics
538    ///
539    /// Panics if the cache is already mutably borrowed.
540    #[must_use]
541    pub fn orders_closed(
542        &self,
543        venue: Option<&Venue>,
544        instrument_id: Option<&InstrumentId>,
545        strategy_id: Option<&StrategyId>,
546        account_id: Option<&AccountId>,
547        side: Option<OrderSide>,
548    ) -> Vec<OrderAny> {
549        self.cache()
550            .orders_closed_refs(venue, instrument_id, strategy_id, account_id, side)
551            .into_iter()
552            .map(|order| order.cloned())
553            .collect()
554    }
555
556    /// Returns owned copies of all locally active orders matching the optional filter parameters.
557    ///
558    /// # Panics
559    ///
560    /// Panics if the cache is already mutably borrowed.
561    #[must_use]
562    pub fn orders_active_local(
563        &self,
564        venue: Option<&Venue>,
565        instrument_id: Option<&InstrumentId>,
566        strategy_id: Option<&StrategyId>,
567        account_id: Option<&AccountId>,
568        side: Option<OrderSide>,
569    ) -> Vec<OrderAny> {
570        self.cache()
571            .orders_active_local_refs(venue, instrument_id, strategy_id, account_id, side)
572            .into_iter()
573            .map(|order| order.cloned())
574            .collect()
575    }
576
577    /// Returns owned copies of all emulated orders matching the optional filter parameters.
578    ///
579    /// # Panics
580    ///
581    /// Panics if the cache is already mutably borrowed.
582    #[must_use]
583    pub fn orders_emulated(
584        &self,
585        venue: Option<&Venue>,
586        instrument_id: Option<&InstrumentId>,
587        strategy_id: Option<&StrategyId>,
588        account_id: Option<&AccountId>,
589        side: Option<OrderSide>,
590    ) -> Vec<OrderAny> {
591        self.cache()
592            .orders_emulated_refs(venue, instrument_id, strategy_id, account_id, side)
593            .into_iter()
594            .map(|order| order.cloned())
595            .collect()
596    }
597
598    /// Returns owned copies of all in-flight orders matching the optional filter parameters.
599    ///
600    /// # Panics
601    ///
602    /// Panics if the cache is already mutably borrowed.
603    #[must_use]
604    pub fn orders_inflight(
605        &self,
606        venue: Option<&Venue>,
607        instrument_id: Option<&InstrumentId>,
608        strategy_id: Option<&StrategyId>,
609        account_id: Option<&AccountId>,
610        side: Option<OrderSide>,
611    ) -> Vec<OrderAny> {
612        self.cache()
613            .orders_inflight_refs(venue, instrument_id, strategy_id, account_id, side)
614            .into_iter()
615            .map(|order| order.cloned())
616            .collect()
617    }
618
619    /// Returns owned copies of all orders for the `position_id`.
620    ///
621    /// # Panics
622    ///
623    /// Panics if the cache is already mutably borrowed.
624    #[must_use]
625    pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<OrderAny> {
626        self.cache()
627            .orders_for_position(position_id)
628            .into_iter()
629            .map(|order| order.cloned())
630            .collect()
631    }
632
633    /// Returns whether an order with the `client_order_id` exists.
634    ///
635    /// # Panics
636    ///
637    /// Panics if the cache is already mutably borrowed.
638    #[must_use]
639    pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
640        self.cache().order_exists(client_order_id)
641    }
642
643    /// Returns whether an order with the `client_order_id` is open.
644    ///
645    /// # Panics
646    ///
647    /// Panics if the cache is already mutably borrowed.
648    #[must_use]
649    pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
650        self.cache().is_order_open(client_order_id)
651    }
652
653    /// Returns whether an order with the `client_order_id` is closed.
654    ///
655    /// # Panics
656    ///
657    /// Panics if the cache is already mutably borrowed.
658    #[must_use]
659    pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
660        self.cache().is_order_closed(client_order_id)
661    }
662
663    /// Returns whether an order with the `client_order_id` is locally active.
664    ///
665    /// # Panics
666    ///
667    /// Panics if the cache is already mutably borrowed.
668    #[must_use]
669    pub fn is_order_active_local(&self, client_order_id: &ClientOrderId) -> bool {
670        self.cache().is_order_active_local(client_order_id)
671    }
672
673    /// Returns whether an order with the `client_order_id` is emulated.
674    ///
675    /// # Panics
676    ///
677    /// Panics if the cache is already mutably borrowed.
678    #[must_use]
679    pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
680        self.cache().is_order_emulated(client_order_id)
681    }
682
683    /// Returns whether an order with the `client_order_id` is in-flight.
684    ///
685    /// # Panics
686    ///
687    /// Panics if the cache is already mutably borrowed.
688    #[must_use]
689    pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
690        self.cache().is_order_inflight(client_order_id)
691    }
692
693    /// Returns whether an order with the `client_order_id` is `PENDING_CANCEL` locally.
694    ///
695    /// # Panics
696    ///
697    /// Panics if the cache is already mutably borrowed.
698    #[must_use]
699    pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
700        self.cache().is_order_pending_cancel_local(client_order_id)
701    }
702
703    /// Returns the count of all open orders matching the optional filter parameters.
704    ///
705    /// # Panics
706    ///
707    /// Panics if the cache is already mutably borrowed.
708    #[must_use]
709    pub fn orders_open_count(
710        &self,
711        venue: Option<&Venue>,
712        instrument_id: Option<&InstrumentId>,
713        strategy_id: Option<&StrategyId>,
714        account_id: Option<&AccountId>,
715        side: Option<OrderSide>,
716    ) -> usize {
717        self.cache()
718            .orders_open_count(venue, instrument_id, strategy_id, account_id, side)
719    }
720
721    /// Returns the count of all closed orders matching the optional filter parameters.
722    ///
723    /// # Panics
724    ///
725    /// Panics if the cache is already mutably borrowed.
726    #[must_use]
727    pub fn orders_closed_count(
728        &self,
729        venue: Option<&Venue>,
730        instrument_id: Option<&InstrumentId>,
731        strategy_id: Option<&StrategyId>,
732        account_id: Option<&AccountId>,
733        side: Option<OrderSide>,
734    ) -> usize {
735        self.cache()
736            .orders_closed_count(venue, instrument_id, strategy_id, account_id, side)
737    }
738
739    /// Returns the count of all locally active orders matching the optional filter parameters.
740    ///
741    /// # Panics
742    ///
743    /// Panics if the cache is already mutably borrowed.
744    #[must_use]
745    pub fn orders_active_local_count(
746        &self,
747        venue: Option<&Venue>,
748        instrument_id: Option<&InstrumentId>,
749        strategy_id: Option<&StrategyId>,
750        account_id: Option<&AccountId>,
751        side: Option<OrderSide>,
752    ) -> usize {
753        self.cache()
754            .orders_active_local_count(venue, instrument_id, strategy_id, account_id, side)
755    }
756
757    /// Returns the count of all emulated orders matching the optional filter parameters.
758    ///
759    /// # Panics
760    ///
761    /// Panics if the cache is already mutably borrowed.
762    #[must_use]
763    pub fn orders_emulated_count(
764        &self,
765        venue: Option<&Venue>,
766        instrument_id: Option<&InstrumentId>,
767        strategy_id: Option<&StrategyId>,
768        account_id: Option<&AccountId>,
769        side: Option<OrderSide>,
770    ) -> usize {
771        self.cache()
772            .orders_emulated_count(venue, instrument_id, strategy_id, account_id, side)
773    }
774
775    /// Returns the count of all in-flight orders matching the optional filter parameters.
776    ///
777    /// # Panics
778    ///
779    /// Panics if the cache is already mutably borrowed.
780    #[must_use]
781    pub fn orders_inflight_count(
782        &self,
783        venue: Option<&Venue>,
784        instrument_id: Option<&InstrumentId>,
785        strategy_id: Option<&StrategyId>,
786        account_id: Option<&AccountId>,
787        side: Option<OrderSide>,
788    ) -> usize {
789        self.cache()
790            .orders_inflight_count(venue, instrument_id, strategy_id, account_id, side)
791    }
792
793    /// Returns the count of all orders matching the optional filter parameters.
794    ///
795    /// # Panics
796    ///
797    /// Panics if the cache is already mutably borrowed.
798    #[must_use]
799    pub fn orders_total_count(
800        &self,
801        venue: Option<&Venue>,
802        instrument_id: Option<&InstrumentId>,
803        strategy_id: Option<&StrategyId>,
804        account_id: Option<&AccountId>,
805        side: Option<OrderSide>,
806    ) -> usize {
807        self.cache()
808            .orders_total_count(venue, instrument_id, strategy_id, account_id, side)
809    }
810
811    /// Returns whether any open order matches the optional filter parameters.
812    ///
813    /// # Panics
814    ///
815    /// Panics if the cache is already mutably borrowed.
816    #[must_use]
817    pub fn has_orders_open(
818        &self,
819        venue: Option<&Venue>,
820        instrument_id: Option<&InstrumentId>,
821        strategy_id: Option<&StrategyId>,
822        account_id: Option<&AccountId>,
823        side: Option<OrderSide>,
824    ) -> bool {
825        self.cache()
826            .has_orders_open(venue, instrument_id, strategy_id, account_id, side)
827    }
828
829    /// Returns whether any closed order matches the optional filter parameters.
830    ///
831    /// # Panics
832    ///
833    /// Panics if the cache is already mutably borrowed.
834    #[must_use]
835    pub fn has_orders_closed(
836        &self,
837        venue: Option<&Venue>,
838        instrument_id: Option<&InstrumentId>,
839        strategy_id: Option<&StrategyId>,
840        account_id: Option<&AccountId>,
841        side: Option<OrderSide>,
842    ) -> bool {
843        self.cache()
844            .has_orders_closed(venue, instrument_id, strategy_id, account_id, side)
845    }
846
847    /// Returns whether any locally active order matches the optional filter parameters.
848    ///
849    /// # Panics
850    ///
851    /// Panics if the cache is already mutably borrowed.
852    #[must_use]
853    pub fn has_orders_active_local(
854        &self,
855        venue: Option<&Venue>,
856        instrument_id: Option<&InstrumentId>,
857        strategy_id: Option<&StrategyId>,
858        account_id: Option<&AccountId>,
859        side: Option<OrderSide>,
860    ) -> bool {
861        self.cache()
862            .has_orders_active_local(venue, instrument_id, strategy_id, account_id, side)
863    }
864
865    /// Returns whether any emulated order matches the optional filter parameters.
866    ///
867    /// # Panics
868    ///
869    /// Panics if the cache is already mutably borrowed.
870    #[must_use]
871    pub fn has_orders_emulated(
872        &self,
873        venue: Option<&Venue>,
874        instrument_id: Option<&InstrumentId>,
875        strategy_id: Option<&StrategyId>,
876        account_id: Option<&AccountId>,
877        side: Option<OrderSide>,
878    ) -> bool {
879        self.cache()
880            .has_orders_emulated(venue, instrument_id, strategy_id, account_id, side)
881    }
882
883    /// Returns whether any in-flight order matches the optional filter parameters.
884    ///
885    /// # Panics
886    ///
887    /// Panics if the cache is already mutably borrowed.
888    #[must_use]
889    pub fn has_orders_inflight(
890        &self,
891        venue: Option<&Venue>,
892        instrument_id: Option<&InstrumentId>,
893        strategy_id: Option<&StrategyId>,
894        account_id: Option<&AccountId>,
895        side: Option<OrderSide>,
896    ) -> bool {
897        self.cache()
898            .has_orders_inflight(venue, instrument_id, strategy_id, account_id, side)
899    }
900
901    /// Returns whether any order matches the optional filter parameters.
902    ///
903    /// # Panics
904    ///
905    /// Panics if the cache is already mutably borrowed.
906    #[must_use]
907    pub fn has_orders(
908        &self,
909        venue: Option<&Venue>,
910        instrument_id: Option<&InstrumentId>,
911        strategy_id: Option<&StrategyId>,
912        account_id: Option<&AccountId>,
913        side: Option<OrderSide>,
914    ) -> bool {
915        self.cache()
916            .has_orders(venue, instrument_id, strategy_id, account_id, side)
917    }
918
919    /// Returns an owned copy of the order list for the `order_list_id` (if found).
920    ///
921    /// # Panics
922    ///
923    /// Panics if the cache is already mutably borrowed.
924    #[must_use]
925    pub fn order_list(&self, order_list_id: &OrderListId) -> Option<OrderList> {
926        self.cache().order_list(order_list_id).cloned()
927    }
928
929    // panics-doc-ok
930    /// Returns an owned copy of the order list for the `order_list_id`.
931    ///
932    /// # Errors
933    ///
934    /// Returns [`OrderListLookupError::NotFound`] when the order list is not present in the cache.
935    ///
936    /// # Panics
937    ///
938    /// Panics if the cache is already mutably borrowed.
939    pub fn try_order_list(
940        &self,
941        order_list_id: &OrderListId,
942    ) -> Result<OrderList, OrderListLookupError> {
943        self.cache().try_order_list(order_list_id).cloned()
944    }
945
946    /// Returns owned copies of all order lists matching the optional filter parameters.
947    ///
948    /// # Panics
949    ///
950    /// Panics if the cache is already mutably borrowed.
951    #[must_use]
952    pub fn order_lists(
953        &self,
954        venue: Option<&Venue>,
955        instrument_id: Option<&InstrumentId>,
956        strategy_id: Option<&StrategyId>,
957        account_id: Option<&AccountId>,
958    ) -> Vec<OrderList> {
959        self.cache()
960            .order_lists(venue, instrument_id, strategy_id, account_id)
961            .into_iter()
962            .cloned()
963            .collect()
964    }
965
966    /// Returns whether an order list with the `order_list_id` exists.
967    ///
968    /// # Panics
969    ///
970    /// Panics if the cache is already mutably borrowed.
971    #[must_use]
972    pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
973        self.cache().order_list_exists(order_list_id)
974    }
975
976    /// Returns owned copies of all orders associated with the `exec_algorithm_id`.
977    ///
978    /// # Panics
979    ///
980    /// Panics if the cache is already mutably borrowed.
981    #[must_use]
982    pub fn orders_for_exec_algorithm(
983        &self,
984        exec_algorithm_id: &ExecAlgorithmId,
985        venue: Option<&Venue>,
986        instrument_id: Option<&InstrumentId>,
987        strategy_id: Option<&StrategyId>,
988        account_id: Option<&AccountId>,
989        side: Option<OrderSide>,
990    ) -> Vec<OrderAny> {
991        self.cache()
992            .orders_for_exec_algorithm(
993                exec_algorithm_id,
994                venue,
995                instrument_id,
996                strategy_id,
997                account_id,
998                side,
999            )
1000            .into_iter()
1001            .map(|order| order.cloned())
1002            .collect()
1003    }
1004
1005    /// Returns owned copies of all orders with the `exec_spawn_id`.
1006    ///
1007    /// # Panics
1008    ///
1009    /// Panics if the cache is already mutably borrowed.
1010    #[must_use]
1011    pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<OrderAny> {
1012        self.cache()
1013            .orders_for_exec_spawn(exec_spawn_id)
1014            .into_iter()
1015            .map(|order| order.cloned())
1016            .collect()
1017    }
1018
1019    /// Returns the total order quantity for the `exec_spawn_id`.
1020    ///
1021    /// # Panics
1022    ///
1023    /// Panics if the cache is already mutably borrowed.
1024    #[must_use]
1025    pub fn exec_spawn_total_quantity(
1026        &self,
1027        exec_spawn_id: &ClientOrderId,
1028        active_only: bool,
1029    ) -> Option<Quantity> {
1030        self.cache()
1031            .exec_spawn_total_quantity(exec_spawn_id, active_only)
1032    }
1033
1034    /// Returns the total filled quantity for all orders with the `exec_spawn_id`.
1035    ///
1036    /// # Panics
1037    ///
1038    /// Panics if the cache is already mutably borrowed.
1039    #[must_use]
1040    pub fn exec_spawn_total_filled_qty(
1041        &self,
1042        exec_spawn_id: &ClientOrderId,
1043        active_only: bool,
1044    ) -> Option<Quantity> {
1045        self.cache()
1046            .exec_spawn_total_filled_qty(exec_spawn_id, active_only)
1047    }
1048
1049    /// Returns the total leaves quantity for all orders with the `exec_spawn_id`.
1050    ///
1051    /// # Panics
1052    ///
1053    /// Panics if the cache is already mutably borrowed.
1054    #[must_use]
1055    pub fn exec_spawn_total_leaves_qty(
1056        &self,
1057        exec_spawn_id: &ClientOrderId,
1058        active_only: bool,
1059    ) -> Option<Quantity> {
1060        self.cache()
1061            .exec_spawn_total_leaves_qty(exec_spawn_id, active_only)
1062    }
1063
1064    /// Returns an owned copy of the position for the `position_id` (if found).
1065    ///
1066    /// # Panics
1067    ///
1068    /// Panics if the cache is already mutably borrowed.
1069    #[must_use]
1070    pub fn position(&self, position_id: &PositionId) -> Option<Position> {
1071        self.cache()
1072            .position_ref(position_id)
1073            .map(|position| position.cloned())
1074    }
1075
1076    // panics-doc-ok
1077    /// Returns an owned copy of the position for the `position_id`.
1078    ///
1079    /// # Errors
1080    ///
1081    /// Returns [`PositionLookupError::NotFound`] when the position is not present in the cache.
1082    ///
1083    /// # Panics
1084    ///
1085    /// Panics if the cache is already mutably borrowed.
1086    pub fn try_position(&self, position_id: &PositionId) -> Result<Position, PositionLookupError> {
1087        self.cache()
1088            .try_position_ref(position_id)
1089            .map(|position| position.cloned())
1090    }
1091
1092    /// Returns an owned copy of the position for the `client_order_id` (if found).
1093    ///
1094    /// # Panics
1095    ///
1096    /// Panics if the cache is already mutably borrowed.
1097    #[must_use]
1098    pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<Position> {
1099        self.cache()
1100            .position_for_order_ref(client_order_id)
1101            .map(|position| position.cloned())
1102    }
1103
1104    /// Returns the position ID for the `client_order_id` (if found).
1105    ///
1106    /// # Panics
1107    ///
1108    /// Panics if the cache is already mutably borrowed.
1109    #[must_use]
1110    pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<PositionId> {
1111        self.cache().position_id(client_order_id).copied()
1112    }
1113
1114    /// Returns owned copies of all positions matching the optional filter parameters.
1115    ///
1116    /// # Panics
1117    ///
1118    /// Panics if the cache is already mutably borrowed.
1119    #[must_use]
1120    pub fn positions(
1121        &self,
1122        venue: Option<&Venue>,
1123        instrument_id: Option<&InstrumentId>,
1124        strategy_id: Option<&StrategyId>,
1125        account_id: Option<&AccountId>,
1126        side: Option<PositionSide>,
1127    ) -> Vec<Position> {
1128        self.cache()
1129            .positions_refs(venue, instrument_id, strategy_id, account_id, side)
1130            .into_iter()
1131            .map(|position| position.cloned())
1132            .collect()
1133    }
1134
1135    /// Returns owned copies of all open positions matching the optional filter parameters.
1136    ///
1137    /// # Panics
1138    ///
1139    /// Panics if the cache is already mutably borrowed.
1140    #[must_use]
1141    pub fn positions_open(
1142        &self,
1143        venue: Option<&Venue>,
1144        instrument_id: Option<&InstrumentId>,
1145        strategy_id: Option<&StrategyId>,
1146        account_id: Option<&AccountId>,
1147        side: Option<PositionSide>,
1148    ) -> Vec<Position> {
1149        self.cache()
1150            .positions_open_refs(venue, instrument_id, strategy_id, account_id, side)
1151            .into_iter()
1152            .map(|position| position.cloned())
1153            .collect()
1154    }
1155
1156    /// Returns owned copies of all closed positions matching the optional filter parameters.
1157    ///
1158    /// # Panics
1159    ///
1160    /// Panics if the cache is already mutably borrowed.
1161    #[must_use]
1162    pub fn positions_closed(
1163        &self,
1164        venue: Option<&Venue>,
1165        instrument_id: Option<&InstrumentId>,
1166        strategy_id: Option<&StrategyId>,
1167        account_id: Option<&AccountId>,
1168        side: Option<PositionSide>,
1169    ) -> Vec<Position> {
1170        self.cache()
1171            .positions_closed_refs(venue, instrument_id, strategy_id, account_id, side)
1172            .into_iter()
1173            .map(|position| position.cloned())
1174            .collect()
1175    }
1176
1177    /// Returns whether a position with the `position_id` exists.
1178    ///
1179    /// # Panics
1180    ///
1181    /// Panics if the cache is already mutably borrowed.
1182    #[must_use]
1183    pub fn position_exists(&self, position_id: &PositionId) -> bool {
1184        self.cache().position_exists(position_id)
1185    }
1186
1187    /// Returns whether a position with the `position_id` is open.
1188    ///
1189    /// # Panics
1190    ///
1191    /// Panics if the cache is already mutably borrowed.
1192    #[must_use]
1193    pub fn is_position_open(&self, position_id: &PositionId) -> bool {
1194        self.cache().is_position_open(position_id)
1195    }
1196
1197    /// Returns whether a position with the `position_id` is closed.
1198    ///
1199    /// # Panics
1200    ///
1201    /// Panics if the cache is already mutably borrowed.
1202    #[must_use]
1203    pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
1204        self.cache().is_position_closed(position_id)
1205    }
1206
1207    /// Returns the count of all open positions matching the optional filter parameters.
1208    ///
1209    /// # Panics
1210    ///
1211    /// Panics if the cache is already mutably borrowed.
1212    #[must_use]
1213    pub fn positions_open_count(
1214        &self,
1215        venue: Option<&Venue>,
1216        instrument_id: Option<&InstrumentId>,
1217        strategy_id: Option<&StrategyId>,
1218        account_id: Option<&AccountId>,
1219        side: Option<PositionSide>,
1220    ) -> usize {
1221        self.cache()
1222            .positions_open_count(venue, instrument_id, strategy_id, account_id, side)
1223    }
1224
1225    /// Returns the count of all closed positions matching the optional filter parameters.
1226    ///
1227    /// # Panics
1228    ///
1229    /// Panics if the cache is already mutably borrowed.
1230    #[must_use]
1231    pub fn positions_closed_count(
1232        &self,
1233        venue: Option<&Venue>,
1234        instrument_id: Option<&InstrumentId>,
1235        strategy_id: Option<&StrategyId>,
1236        account_id: Option<&AccountId>,
1237        side: Option<PositionSide>,
1238    ) -> usize {
1239        self.cache()
1240            .positions_closed_count(venue, instrument_id, strategy_id, account_id, side)
1241    }
1242
1243    /// Returns the count of all positions matching the optional filter parameters.
1244    ///
1245    /// # Panics
1246    ///
1247    /// Panics if the cache is already mutably borrowed.
1248    #[must_use]
1249    pub fn positions_total_count(
1250        &self,
1251        venue: Option<&Venue>,
1252        instrument_id: Option<&InstrumentId>,
1253        strategy_id: Option<&StrategyId>,
1254        account_id: Option<&AccountId>,
1255        side: Option<PositionSide>,
1256    ) -> usize {
1257        self.cache()
1258            .positions_total_count(venue, instrument_id, strategy_id, account_id, side)
1259    }
1260
1261    /// Returns whether any open position matches the optional filter parameters.
1262    ///
1263    /// # Panics
1264    ///
1265    /// Panics if the cache is already mutably borrowed.
1266    #[must_use]
1267    pub fn has_positions_open(
1268        &self,
1269        venue: Option<&Venue>,
1270        instrument_id: Option<&InstrumentId>,
1271        strategy_id: Option<&StrategyId>,
1272        account_id: Option<&AccountId>,
1273        side: Option<PositionSide>,
1274    ) -> bool {
1275        self.cache()
1276            .has_positions_open(venue, instrument_id, strategy_id, account_id, side)
1277    }
1278
1279    /// Returns whether any closed position matches the optional filter parameters.
1280    ///
1281    /// # Panics
1282    ///
1283    /// Panics if the cache is already mutably borrowed.
1284    #[must_use]
1285    pub fn has_positions_closed(
1286        &self,
1287        venue: Option<&Venue>,
1288        instrument_id: Option<&InstrumentId>,
1289        strategy_id: Option<&StrategyId>,
1290        account_id: Option<&AccountId>,
1291        side: Option<PositionSide>,
1292    ) -> bool {
1293        self.cache()
1294            .has_positions_closed(venue, instrument_id, strategy_id, account_id, side)
1295    }
1296
1297    /// Returns whether any position matches the optional filter parameters.
1298    ///
1299    /// # Panics
1300    ///
1301    /// Panics if the cache is already mutably borrowed.
1302    #[must_use]
1303    pub fn has_positions(
1304        &self,
1305        venue: Option<&Venue>,
1306        instrument_id: Option<&InstrumentId>,
1307        strategy_id: Option<&StrategyId>,
1308        account_id: Option<&AccountId>,
1309        side: Option<PositionSide>,
1310    ) -> bool {
1311        self.cache()
1312            .has_positions(venue, instrument_id, strategy_id, account_id, side)
1313    }
1314
1315    /// Returns the strategy ID for the `client_order_id` (if found).
1316    ///
1317    /// # Panics
1318    ///
1319    /// Panics if the cache is already mutably borrowed.
1320    #[must_use]
1321    pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<StrategyId> {
1322        self.cache().strategy_id_for_order(client_order_id).copied()
1323    }
1324
1325    /// Returns the strategy ID for the `position_id` (if found).
1326    ///
1327    /// # Panics
1328    ///
1329    /// Panics if the cache is already mutably borrowed.
1330    #[must_use]
1331    pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<StrategyId> {
1332        self.cache().strategy_id_for_position(position_id).copied()
1333    }
1334
1335    // panics-doc-ok
1336    /// Returns the general cache value for the `key` (if found).
1337    ///
1338    /// # Errors
1339    ///
1340    /// Returns an error if the `key` is invalid.
1341    ///
1342    /// # Panics
1343    ///
1344    /// Panics if the cache is already mutably borrowed.
1345    pub fn get(&self, key: &str) -> anyhow::Result<Option<Bytes>> {
1346        let cache = self.cache();
1347        let value = cache.get(key)?;
1348        Ok(value.cloned())
1349    }
1350
1351    /// Returns the price for the `instrument_id` and `price_type` (if found).
1352    ///
1353    /// # Panics
1354    ///
1355    /// Panics if the cache is already mutably borrowed, or if `price_type` is [`PriceType::Mid`]
1356    /// and the quote price precision is already at the maximum fixed precision.
1357    #[must_use]
1358    pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
1359        self.cache().price(instrument_id, price_type)
1360    }
1361
1362    /// Returns all quotes for the `instrument_id` (if found).
1363    ///
1364    /// # Panics
1365    ///
1366    /// Panics if the cache is already mutably borrowed.
1367    #[must_use]
1368    pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
1369        self.cache().quotes(instrument_id)
1370    }
1371
1372    /// Returns all trades for the `instrument_id` (if found).
1373    ///
1374    /// # Panics
1375    ///
1376    /// Panics if the cache is already mutably borrowed.
1377    #[must_use]
1378    pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
1379        self.cache().trades(instrument_id)
1380    }
1381
1382    /// Returns all mark price updates for the `instrument_id` (if found).
1383    ///
1384    /// # Panics
1385    ///
1386    /// Panics if the cache is already mutably borrowed.
1387    #[must_use]
1388    pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
1389        self.cache().mark_prices(instrument_id)
1390    }
1391
1392    /// Returns all index price updates for the `instrument_id` (if found).
1393    ///
1394    /// # Panics
1395    ///
1396    /// Panics if the cache is already mutably borrowed.
1397    #[must_use]
1398    pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
1399        self.cache().index_prices(instrument_id)
1400    }
1401
1402    /// Returns all funding rate updates for the `instrument_id` (if found).
1403    ///
1404    /// # Panics
1405    ///
1406    /// Panics if the cache is already mutably borrowed.
1407    #[must_use]
1408    pub fn funding_rates(&self, instrument_id: &InstrumentId) -> Option<Vec<FundingRateUpdate>> {
1409        self.cache().funding_rates(instrument_id)
1410    }
1411
1412    /// Returns all instrument status updates for the `instrument_id` (if found).
1413    ///
1414    /// # Panics
1415    ///
1416    /// Panics if the cache is already mutably borrowed.
1417    #[must_use]
1418    pub fn instrument_statuses(
1419        &self,
1420        instrument_id: &InstrumentId,
1421    ) -> Option<Vec<InstrumentStatus>> {
1422        self.cache().instrument_statuses(instrument_id)
1423    }
1424
1425    /// Returns all bars for the `bar_type` (if found).
1426    ///
1427    /// # Panics
1428    ///
1429    /// Panics if the cache is already mutably borrowed.
1430    #[must_use]
1431    pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
1432        self.cache().bars(bar_type)
1433    }
1434
1435    /// Returns an owned copy of the order book for the `instrument_id` (if found).
1436    ///
1437    /// # Panics
1438    ///
1439    /// Panics if the cache is already mutably borrowed.
1440    #[must_use]
1441    pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<OrderBook> {
1442        self.cache().order_book(instrument_id).cloned()
1443    }
1444
1445    // panics-doc-ok
1446    /// Returns an owned copy of the order book for the `instrument_id`.
1447    ///
1448    /// # Errors
1449    ///
1450    /// Returns [`OrderBookLookupError::NotFound`] when the order book is not present in the cache.
1451    ///
1452    /// # Panics
1453    ///
1454    /// Panics if the cache is already mutably borrowed.
1455    pub fn try_order_book(
1456        &self,
1457        instrument_id: &InstrumentId,
1458    ) -> Result<OrderBook, OrderBookLookupError> {
1459        self.cache().try_order_book(instrument_id).cloned()
1460    }
1461
1462    /// Returns an owned copy of the own order book for the `instrument_id` (if found).
1463    ///
1464    /// # Panics
1465    ///
1466    /// Panics if the cache is already mutably borrowed.
1467    #[must_use]
1468    pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<OwnOrderBook> {
1469        self.cache().own_order_book(instrument_id).cloned()
1470    }
1471
1472    // panics-doc-ok
1473    /// Returns an owned copy of the own order book for the `instrument_id`.
1474    ///
1475    /// # Errors
1476    ///
1477    /// Returns [`OwnOrderBookLookupError::NotFound`] when the own order book is not present in the
1478    /// cache.
1479    ///
1480    /// # Panics
1481    ///
1482    /// Panics if the cache is already mutably borrowed.
1483    pub fn try_own_order_book(
1484        &self,
1485        instrument_id: &InstrumentId,
1486    ) -> Result<OwnOrderBook, OwnOrderBookLookupError> {
1487        self.cache().try_own_order_book(instrument_id).cloned()
1488    }
1489
1490    /// Returns the latest quote for the `instrument_id` (if found).
1491    ///
1492    /// # Panics
1493    ///
1494    /// Panics if the cache is already mutably borrowed.
1495    #[must_use]
1496    pub fn quote(&self, instrument_id: &InstrumentId) -> Option<QuoteTick> {
1497        self.cache().quote(instrument_id).copied()
1498    }
1499
1500    /// Returns the quote at `index` for the `instrument_id` (if found).
1501    ///
1502    /// Index 0 is the most recent.
1503    ///
1504    /// # Panics
1505    ///
1506    /// Panics if the cache is already mutably borrowed.
1507    #[must_use]
1508    pub fn quote_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<QuoteTick> {
1509        self.cache().quote_at_index(instrument_id, index).copied()
1510    }
1511
1512    /// Returns the latest trade for the `instrument_id` (if found).
1513    ///
1514    /// # Panics
1515    ///
1516    /// Panics if the cache is already mutably borrowed.
1517    #[must_use]
1518    pub fn trade(&self, instrument_id: &InstrumentId) -> Option<TradeTick> {
1519        self.cache().trade(instrument_id).copied()
1520    }
1521
1522    /// Returns the trade at `index` for the `instrument_id` (if found).
1523    ///
1524    /// Index 0 is the most recent.
1525    ///
1526    /// # Panics
1527    ///
1528    /// Panics if the cache is already mutably borrowed.
1529    #[must_use]
1530    pub fn trade_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<TradeTick> {
1531        self.cache().trade_at_index(instrument_id, index).copied()
1532    }
1533
1534    /// Returns the latest mark price update for the `instrument_id` (if found).
1535    ///
1536    /// # Panics
1537    ///
1538    /// Panics if the cache is already mutably borrowed.
1539    #[must_use]
1540    pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<MarkPriceUpdate> {
1541        self.cache().mark_price(instrument_id).copied()
1542    }
1543
1544    /// Returns the latest index price update for the `instrument_id` (if found).
1545    ///
1546    /// # Panics
1547    ///
1548    /// Panics if the cache is already mutably borrowed.
1549    #[must_use]
1550    pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<IndexPriceUpdate> {
1551        self.cache().index_price(instrument_id).copied()
1552    }
1553
1554    /// Returns the latest funding rate update for the `instrument_id` (if found).
1555    ///
1556    /// # Panics
1557    ///
1558    /// Panics if the cache is already mutably borrowed.
1559    #[must_use]
1560    pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<FundingRateUpdate> {
1561        self.cache().funding_rate(instrument_id).copied()
1562    }
1563
1564    /// Returns the latest instrument status update for the `instrument_id` (if found).
1565    ///
1566    /// # Panics
1567    ///
1568    /// Panics if the cache is already mutably borrowed.
1569    #[must_use]
1570    pub fn instrument_status(&self, instrument_id: &InstrumentId) -> Option<InstrumentStatus> {
1571        self.cache().instrument_status(instrument_id).copied()
1572    }
1573
1574    /// Returns the latest bar for the `bar_type` (if found).
1575    ///
1576    /// # Panics
1577    ///
1578    /// Panics if the cache is already mutably borrowed.
1579    #[must_use]
1580    pub fn bar(&self, bar_type: &BarType) -> Option<Bar> {
1581        self.cache().bar(bar_type).copied()
1582    }
1583
1584    /// Returns the bar at `index` for the `bar_type` (if found).
1585    ///
1586    /// Index 0 is the most recent.
1587    ///
1588    /// # Panics
1589    ///
1590    /// Panics if the cache is already mutably borrowed.
1591    #[must_use]
1592    pub fn bar_at_index(&self, bar_type: &BarType, index: usize) -> Option<Bar> {
1593        self.cache().bar_at_index(bar_type, index).copied()
1594    }
1595
1596    /// Returns the order book update count for the `instrument_id`.
1597    ///
1598    /// # Panics
1599    ///
1600    /// Panics if the cache is already mutably borrowed.
1601    #[must_use]
1602    pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
1603        self.cache().book_update_count(instrument_id)
1604    }
1605
1606    /// Returns the quote tick count for the `instrument_id`.
1607    ///
1608    /// # Panics
1609    ///
1610    /// Panics if the cache is already mutably borrowed.
1611    #[must_use]
1612    pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
1613        self.cache().quote_count(instrument_id)
1614    }
1615
1616    /// Returns the trade tick count for the `instrument_id`.
1617    ///
1618    /// # Panics
1619    ///
1620    /// Panics if the cache is already mutably borrowed.
1621    #[must_use]
1622    pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
1623        self.cache().trade_count(instrument_id)
1624    }
1625
1626    /// Returns the bar count for the `bar_type`.
1627    ///
1628    /// # Panics
1629    ///
1630    /// Panics if the cache is already mutably borrowed.
1631    #[must_use]
1632    pub fn bar_count(&self, bar_type: &BarType) -> usize {
1633        self.cache().bar_count(bar_type)
1634    }
1635
1636    /// Returns whether the cache contains an order book for the `instrument_id`.
1637    ///
1638    /// # Panics
1639    ///
1640    /// Panics if the cache is already mutably borrowed.
1641    #[must_use]
1642    pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
1643        self.cache().has_order_book(instrument_id)
1644    }
1645
1646    /// Returns whether the cache contains quotes for the `instrument_id`.
1647    ///
1648    /// # Panics
1649    ///
1650    /// Panics if the cache is already mutably borrowed.
1651    #[must_use]
1652    pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
1653        self.cache().has_quote_ticks(instrument_id)
1654    }
1655
1656    /// Returns whether the cache contains trades for the `instrument_id`.
1657    ///
1658    /// # Panics
1659    ///
1660    /// Panics if the cache is already mutably borrowed.
1661    #[must_use]
1662    pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
1663        self.cache().has_trade_ticks(instrument_id)
1664    }
1665
1666    /// Returns whether the cache contains bars for the `bar_type`.
1667    ///
1668    /// # Panics
1669    ///
1670    /// Panics if the cache is already mutably borrowed.
1671    #[must_use]
1672    pub fn has_bars(&self, bar_type: &BarType) -> bool {
1673        self.cache().has_bars(bar_type)
1674    }
1675
1676    /// Returns the exchange rate for the given currencies and price type (if available).
1677    ///
1678    /// # Panics
1679    ///
1680    /// Panics if the cache is already mutably borrowed.
1681    #[must_use]
1682    pub fn get_xrate(
1683        &self,
1684        venue: Venue,
1685        from_currency: Currency,
1686        to_currency: Currency,
1687        price_type: PriceType,
1688    ) -> Option<Decimal> {
1689        self.cache()
1690            .get_xrate(venue, from_currency, to_currency, price_type)
1691    }
1692
1693    /// Returns the mark exchange rate for the currency pair (if set).
1694    ///
1695    /// # Panics
1696    ///
1697    /// Panics if the cache is already mutably borrowed.
1698    #[must_use]
1699    pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
1700        self.cache().get_mark_xrate(from_currency, to_currency)
1701    }
1702
1703    /// Returns the yield curve for the `key` (if found).
1704    ///
1705    /// # Panics
1706    ///
1707    /// Panics if the cache is already mutably borrowed.
1708    #[must_use]
1709    pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1710        self.cache().yield_curve(key)
1711    }
1712
1713    /// Returns an owned copy of the greeks data for the `instrument_id` (if found).
1714    ///
1715    /// # Panics
1716    ///
1717    /// Panics if the cache is already mutably borrowed.
1718    #[must_use]
1719    pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1720        self.cache().greeks(instrument_id)
1721    }
1722
1723    /// Returns exchange-provided option greeks for the `instrument_id` (if found).
1724    ///
1725    /// # Panics
1726    ///
1727    /// Panics if the cache is already mutably borrowed.
1728    #[must_use]
1729    pub fn option_greeks(&self, instrument_id: &InstrumentId) -> Option<OptionGreeks> {
1730        self.cache().option_greeks(instrument_id).copied()
1731    }
1732
1733    /// Returns the currency for the `code` (if found).
1734    ///
1735    /// # Panics
1736    ///
1737    /// Panics if the cache is already mutably borrowed.
1738    #[must_use]
1739    pub fn currency(&self, code: &Ustr) -> Option<Currency> {
1740        self.cache().currency(code).copied()
1741    }
1742
1743    // panics-doc-ok
1744    /// Returns the currency for the `code`.
1745    ///
1746    /// # Errors
1747    ///
1748    /// Returns [`CurrencyLookupError::NotFound`] when the currency is not present in the cache.
1749    ///
1750    /// # Panics
1751    ///
1752    /// Panics if the cache is already mutably borrowed.
1753    pub fn try_currency(&self, code: &Ustr) -> Result<Currency, CurrencyLookupError> {
1754        self.cache().try_currency(code).copied()
1755    }
1756
1757    /// Returns an owned copy of the instrument for the `instrument_id` (if found).
1758    ///
1759    /// # Panics
1760    ///
1761    /// Panics if the cache is already mutably borrowed.
1762    #[must_use]
1763    pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
1764        self.cache().instrument(instrument_id).cloned()
1765    }
1766
1767    // panics-doc-ok
1768    /// Returns an owned copy of the instrument for the `instrument_id`.
1769    ///
1770    /// # Errors
1771    ///
1772    /// Returns [`InstrumentLookupError::NotFound`] when the instrument is not present in the cache.
1773    ///
1774    /// # Panics
1775    ///
1776    /// Panics if the cache is already mutably borrowed.
1777    pub fn try_instrument(
1778        &self,
1779        instrument_id: &InstrumentId,
1780    ) -> Result<InstrumentAny, InstrumentLookupError> {
1781        self.cache().try_instrument(instrument_id).cloned()
1782    }
1783
1784    /// Returns the instrument IDs in the cache, optionally filtered by `venue`.
1785    ///
1786    /// # Panics
1787    ///
1788    /// Panics if the cache is already mutably borrowed.
1789    #[must_use]
1790    pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<InstrumentId> {
1791        self.cache()
1792            .instrument_ids(venue)
1793            .into_iter()
1794            .copied()
1795            .collect()
1796    }
1797
1798    /// Returns owned copies of all instruments for the `venue`.
1799    ///
1800    /// # Panics
1801    ///
1802    /// Panics if the cache is already mutably borrowed.
1803    #[must_use]
1804    pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<InstrumentAny> {
1805        self.cache()
1806            .instruments(venue, underlying)
1807            .into_iter()
1808            .cloned()
1809            .collect()
1810    }
1811
1812    /// Returns owned copies of all instruments for the `venue`, parent `root`, and instrument
1813    /// `class`.
1814    ///
1815    /// # Panics
1816    ///
1817    /// Panics if the cache is already mutably borrowed.
1818    #[must_use]
1819    pub fn instruments_by_parent(
1820        &self,
1821        venue: &Venue,
1822        root: &Ustr,
1823        class: InstrumentClass,
1824    ) -> Vec<InstrumentAny> {
1825        self.cache()
1826            .instruments_by_parent(venue, root, class)
1827            .into_iter()
1828            .cloned()
1829            .collect()
1830    }
1831
1832    /// Returns the bar types in the cache, optionally filtered by instrument and price type.
1833    ///
1834    /// # Panics
1835    ///
1836    /// Panics if the cache is already mutably borrowed.
1837    #[must_use]
1838    pub fn bar_types(
1839        &self,
1840        instrument_id: Option<&InstrumentId>,
1841        price_type: Option<&PriceType>,
1842        aggregation_source: AggregationSource,
1843    ) -> Vec<BarType> {
1844        self.cache()
1845            .bar_types(instrument_id, price_type, aggregation_source)
1846            .into_iter()
1847            .copied()
1848            .collect()
1849    }
1850
1851    /// Returns an owned copy of the synthetic instrument for the `instrument_id` (if found).
1852    ///
1853    /// # Panics
1854    ///
1855    /// Panics if the cache is already mutably borrowed.
1856    #[must_use]
1857    pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<SyntheticInstrument> {
1858        self.cache().synthetic(instrument_id).cloned()
1859    }
1860
1861    // panics-doc-ok
1862    /// Returns an owned copy of the synthetic instrument for the `instrument_id`.
1863    ///
1864    /// # Errors
1865    ///
1866    /// Returns [`SyntheticInstrumentLookupError::NotFound`] when the synthetic instrument is not
1867    /// present in the cache.
1868    ///
1869    /// # Panics
1870    ///
1871    /// Panics if the cache is already mutably borrowed.
1872    pub fn try_synthetic(
1873        &self,
1874        instrument_id: &InstrumentId,
1875    ) -> Result<SyntheticInstrument, SyntheticInstrumentLookupError> {
1876        self.cache().try_synthetic(instrument_id).cloned()
1877    }
1878
1879    /// Returns the synthetic instrument IDs in the cache.
1880    ///
1881    /// # Panics
1882    ///
1883    /// Panics if the cache is already mutably borrowed.
1884    #[must_use]
1885    pub fn synthetic_ids(&self) -> Vec<InstrumentId> {
1886        self.cache().synthetic_ids().into_iter().copied().collect()
1887    }
1888
1889    /// Returns owned copies of all synthetic instruments in the cache.
1890    ///
1891    /// # Panics
1892    ///
1893    /// Panics if the cache is already mutably borrowed.
1894    #[must_use]
1895    pub fn synthetics(&self) -> Vec<SyntheticInstrument> {
1896        self.cache().synthetics().into_iter().cloned().collect()
1897    }
1898
1899    /// Returns an owned copy of the pool for the `instrument_id` (if found).
1900    ///
1901    /// # Panics
1902    ///
1903    /// Panics if the cache is already mutably borrowed.
1904    #[cfg(feature = "defi")]
1905    #[must_use]
1906    pub fn pool(&self, instrument_id: &InstrumentId) -> Option<Pool> {
1907        self.cache().pool(instrument_id).cloned()
1908    }
1909
1910    /// Returns the pool instrument IDs in the cache, optionally filtered by `venue`.
1911    ///
1912    /// # Panics
1913    ///
1914    /// Panics if the cache is already mutably borrowed.
1915    #[cfg(feature = "defi")]
1916    #[must_use]
1917    pub fn pool_ids(&self, venue: Option<&Venue>) -> Vec<InstrumentId> {
1918        self.cache().pool_ids(venue)
1919    }
1920
1921    /// Returns owned copies of all pools in the cache, optionally filtered by `venue`.
1922    ///
1923    /// # Panics
1924    ///
1925    /// Panics if the cache is already mutably borrowed.
1926    #[cfg(feature = "defi")]
1927    #[must_use]
1928    pub fn pools(&self, venue: Option<&Venue>) -> Vec<Pool> {
1929        self.cache().pools(venue).into_iter().cloned().collect()
1930    }
1931
1932    /// Returns an owned copy of the pool profiler for the `instrument_id` (if found).
1933    ///
1934    /// # Panics
1935    ///
1936    /// Panics if the cache is already mutably borrowed.
1937    #[cfg(feature = "defi")]
1938    #[must_use]
1939    pub fn pool_profiler(&self, instrument_id: &InstrumentId) -> Option<PoolProfiler> {
1940        self.cache().pool_profiler(instrument_id).cloned()
1941    }
1942
1943    /// Returns the pool profiler instrument IDs in the cache, optionally filtered by `venue`.
1944    ///
1945    /// # Panics
1946    ///
1947    /// Panics if the cache is already mutably borrowed.
1948    #[cfg(feature = "defi")]
1949    #[must_use]
1950    pub fn pool_profiler_ids(&self, venue: Option<&Venue>) -> Vec<InstrumentId> {
1951        self.cache().pool_profiler_ids(venue)
1952    }
1953
1954    /// Returns owned copies of all pool profilers in the cache, optionally filtered by `venue`.
1955    ///
1956    /// # Panics
1957    ///
1958    /// Panics if the cache is already mutably borrowed.
1959    #[cfg(feature = "defi")]
1960    #[must_use]
1961    pub fn pool_profilers(&self, venue: Option<&Venue>) -> Vec<PoolProfiler> {
1962        self.cache()
1963            .pool_profilers(venue)
1964            .into_iter()
1965            .cloned()
1966            .collect()
1967    }
1968
1969    /// Returns an owned copy of the account for the `account_id` (if found).
1970    ///
1971    /// # Panics
1972    ///
1973    /// Panics if the cache is already mutably borrowed.
1974    #[must_use]
1975    pub fn account(&self, account_id: &AccountId) -> Option<AccountAny> {
1976        self.cache().account_owned(account_id)
1977    }
1978
1979    // panics-doc-ok
1980    /// Returns an owned copy of the account for the `account_id`.
1981    ///
1982    /// # Errors
1983    ///
1984    /// Returns [`AccountLookupError::NotFound`] when the account is not present in the cache.
1985    ///
1986    /// # Panics
1987    ///
1988    /// Panics if the cache is already mutably borrowed.
1989    pub fn try_account(&self, account_id: &AccountId) -> Result<AccountAny, AccountLookupError> {
1990        self.cache()
1991            .try_account(account_id)
1992            .map(|account| account.cloned())
1993    }
1994
1995    /// Returns an owned copy of the account for the `venue` (if found).
1996    ///
1997    /// # Panics
1998    ///
1999    /// Panics if the cache is already mutably borrowed.
2000    #[must_use]
2001    pub fn account_for_venue(&self, venue: &Venue) -> Option<AccountAny> {
2002        self.cache().account_for_venue_owned(venue)
2003    }
2004
2005    /// Returns the account ID for the `venue` (if found).
2006    ///
2007    /// # Panics
2008    ///
2009    /// Panics if the cache is already mutably borrowed.
2010    #[must_use]
2011    pub fn account_id(&self, venue: &Venue) -> Option<AccountId> {
2012        self.cache().account_id(venue).copied()
2013    }
2014
2015    /// Returns owned copies of all accounts matching the `account_id`.
2016    ///
2017    /// # Panics
2018    ///
2019    /// Panics if the cache is already mutably borrowed.
2020    #[must_use]
2021    pub fn accounts(&self, account_id: &AccountId) -> Vec<AccountAny> {
2022        self.cache()
2023            .accounts(account_id)
2024            .into_iter()
2025            .map(|account| account.cloned())
2026            .collect()
2027    }
2028
2029    /// Returns owned copies of every account in the cache.
2030    ///
2031    /// # Panics
2032    ///
2033    /// Panics if the cache is already mutably borrowed.
2034    #[must_use]
2035    pub fn accounts_all(&self) -> Vec<AccountAny> {
2036        self.cache().accounts_all_owned()
2037    }
2038
2039    fn cache(&self) -> Ref<'_, Cache> {
2040        self.cache.borrow()
2041    }
2042}
2043
2044// Filter sources resolved from an order or position query.
2045//
2046// Captures the three states of a multi-key index intersection without committing to an owned
2047// result set: no filters at all (the caller iterates the bucket directly), one or more filter
2048// sources resolved successfully (intersect them lazily), or one filter resolved to no entries
2049// at all (the result is unconditionally empty).
2050enum FilterSources<'a, K> {
2051    Unfiltered,
2052    Empty,
2053    Sets(Vec<&'a AHashSet<K>>),
2054}
2055
2056// Intersects a non-empty collection of filter sources by sorting them ascending by length and
2057// driving the loop from the smallest set, collecting one `AHashSet` of matching keys.
2058//
2059// Single-source inputs short-circuit to a direct `AHashSet::clone` (memcopy of the bucket
2060// table) rather than rehashing each entry through `iter().copied().collect()`.
2061fn intersect_filter_sources<K>(mut sources: Vec<&AHashSet<K>>) -> AHashSet<K>
2062where
2063    K: Copy + Eq + std::hash::Hash,
2064{
2065    debug_assert!(!sources.is_empty());
2066    sources.sort_unstable_by_key(|s| s.len());
2067    let driver = sources[0];
2068    let rest = &sources[1..];
2069
2070    if rest.is_empty() {
2071        return driver.clone();
2072    }
2073
2074    driver
2075        .iter()
2076        .filter(|id| rest.iter().all(|s| s.contains(id)))
2077        .copied()
2078        .collect()
2079}
2080
2081// Intersects `bucket` with one or more filter sources.
2082//
2083// For exactly one filter source, iterates the larger of (bucket, filter) and looks up in the
2084// smaller. The larger set scans linearly (HW-prefetcher friendly) and the smaller stays hot in
2085// cache, which empirically beats the size-ordered approach when the smaller filter is too
2086// large to fit in L1 (e.g., a 20k-entry venue filter against a 100k-entry bucket). For two or
2087// more filters the size-ordered driver is reinstated and the bucket joins the source list.
2088fn intersect_pair_or_many<'a, K>(
2089    bucket: &'a AHashSet<K>,
2090    mut sources: Vec<&'a AHashSet<K>>,
2091) -> AHashSet<K>
2092where
2093    K: Copy + Eq + std::hash::Hash,
2094{
2095    debug_assert!(!sources.is_empty());
2096    if sources.len() == 1 {
2097        let filter = sources[0];
2098        let (larger, smaller) = if bucket.len() >= filter.len() {
2099            (bucket, filter)
2100        } else {
2101            (filter, bucket)
2102        };
2103        return larger.intersection(smaller).copied().collect();
2104    }
2105
2106    sources.push(bucket);
2107    intersect_filter_sources(sources)
2108}
2109
2110/// A common in-memory `Cache` for market and execution related data.
2111#[cfg_attr(
2112    feature = "python",
2113    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
2114)]
2115pub struct Cache {
2116    config: CacheConfig,
2117    index: CacheIndex,
2118    database: Option<Box<dyn CacheDatabaseAdapter>>,
2119    general: AHashMap<String, Bytes>,
2120    currencies: AHashMap<Ustr, Currency>,
2121    instruments: AHashMap<InstrumentId, InstrumentAny>,
2122    synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
2123    books: AHashMap<InstrumentId, OrderBook>,
2124    own_books: AHashMap<InstrumentId, OwnOrderBook>,
2125    quotes: AHashMap<InstrumentId, BoundedVecDeque<QuoteTick>>,
2126    trades: AHashMap<InstrumentId, BoundedVecDeque<TradeTick>>,
2127    mark_xrates: AHashMap<(Currency, Currency), f64>,
2128    mark_prices: AHashMap<InstrumentId, BoundedVecDeque<MarkPriceUpdate>>,
2129    index_prices: AHashMap<InstrumentId, BoundedVecDeque<IndexPriceUpdate>>,
2130    funding_rates: AHashMap<InstrumentId, BoundedVecDeque<FundingRateUpdate>>,
2131    instrument_statuses: AHashMap<InstrumentId, BoundedVecDeque<InstrumentStatus>>,
2132    bars: AHashMap<BarType, BoundedVecDeque<Bar>>,
2133    greeks: AHashMap<InstrumentId, GreeksData>,
2134    option_greeks: AHashMap<InstrumentId, OptionGreeks>,
2135    yield_curves: AHashMap<String, YieldCurveData>,
2136    accounts: AHashMap<AccountId, SharedCell<AccountAny>>,
2137    orders: AHashMap<ClientOrderId, SharedCell<OrderAny>>,
2138    order_lists: AHashMap<OrderListId, OrderList>,
2139    positions: AHashMap<PositionId, SharedCell<Position>>,
2140    position_snapshots: AHashMap<PositionId, Vec<Bytes>>,
2141    #[cfg(feature = "defi")]
2142    pub(crate) defi: crate::defi::cache::DefiCache,
2143}
2144
2145impl Debug for Cache {
2146    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2147        f.debug_struct(stringify!(Cache))
2148            .field("config", &self.config)
2149            .field("index", &self.index)
2150            .field("general", &self.general)
2151            .field("currencies", &self.currencies)
2152            .field("instruments", &self.instruments)
2153            .field("synthetics", &self.synthetics)
2154            .field("books", &self.books)
2155            .field("own_books", &self.own_books)
2156            .field("quotes", &self.quotes)
2157            .field("trades", &self.trades)
2158            .field("mark_xrates", &self.mark_xrates)
2159            .field("mark_prices", &self.mark_prices)
2160            .field("index_prices", &self.index_prices)
2161            .field("funding_rates", &self.funding_rates)
2162            .field("instrument_statuses", &self.instrument_statuses)
2163            .field("bars", &self.bars)
2164            .field("greeks", &self.greeks)
2165            .field("option_greeks", &self.option_greeks)
2166            .field("yield_curves", &self.yield_curves)
2167            .field("accounts", &self.accounts)
2168            .field("orders", &self.orders)
2169            .field("order_lists", &self.order_lists)
2170            .field("positions", &self.positions)
2171            .field("position_snapshots", &self.position_snapshots)
2172            .finish()
2173    }
2174}
2175
2176impl Default for Cache {
2177    /// Creates a new default [`Cache`] instance.
2178    fn default() -> Self {
2179        Self::new(Some(CacheConfig::default()), None)
2180    }
2181}
2182
2183impl Cache {
2184    /// Creates a new [`Cache`] instance with optional configuration and database adapter.
2185    #[must_use]
2186    /// # Note
2187    ///
2188    /// Uses provided `CacheConfig` or defaults, and optional `CacheDatabaseAdapter` for persistence.
2189    ///
2190    /// # Panics
2191    ///
2192    /// Panics if the cache config has a zero tick or bar capacity.
2193    pub fn new(
2194        config: Option<CacheConfig>,
2195        database: Option<Box<dyn CacheDatabaseAdapter>>,
2196    ) -> Self {
2197        let config = config.unwrap_or_default();
2198        config.validate().expect("invalid `CacheConfig`");
2199
2200        Self {
2201            config,
2202            index: CacheIndex::default(),
2203            database,
2204            general: AHashMap::new(),
2205            currencies: AHashMap::new(),
2206            instruments: AHashMap::new(),
2207            synthetics: AHashMap::new(),
2208            books: AHashMap::new(),
2209            own_books: AHashMap::new(),
2210            quotes: AHashMap::new(),
2211            trades: AHashMap::new(),
2212            mark_xrates: AHashMap::new(),
2213            mark_prices: AHashMap::new(),
2214            index_prices: AHashMap::new(),
2215            funding_rates: AHashMap::new(),
2216            instrument_statuses: AHashMap::new(),
2217            bars: AHashMap::new(),
2218            greeks: AHashMap::new(),
2219            option_greeks: AHashMap::new(),
2220            yield_curves: AHashMap::new(),
2221            accounts: AHashMap::new(),
2222            orders: AHashMap::new(),
2223            order_lists: AHashMap::new(),
2224            positions: AHashMap::new(),
2225            position_snapshots: AHashMap::new(),
2226            #[cfg(feature = "defi")]
2227            defi: crate::defi::cache::DefiCache::default(),
2228        }
2229    }
2230
2231    /// Returns the cache instances memory address.
2232    #[must_use]
2233    pub fn memory_address(&self) -> String {
2234        format!("{:?}", std::ptr::from_ref(self))
2235    }
2236
2237    /// Sets the cache database adapter for persistence.
2238    ///
2239    /// This allows setting or replacing the database adapter after cache construction.
2240    pub fn set_database(&mut self, database: Box<dyn CacheDatabaseAdapter>) {
2241        let type_name = std::any::type_name_of_val(&*database);
2242        log::info!("Cache database adapter set: {type_name}");
2243        self.database = Some(database);
2244    }
2245
2246    // -- COMMANDS --------------------------------------------------------------------------------
2247
2248    /// Clears and reloads general entries from the database into the cache.
2249    ///
2250    /// # Errors
2251    ///
2252    /// Returns an error if loading general cache data fails.
2253    pub fn cache_general(&mut self) -> anyhow::Result<()> {
2254        self.general = match &mut self.database {
2255            Some(db) => db.load()?,
2256            None => AHashMap::new(),
2257        };
2258
2259        log::info!(
2260            "Cached {} general object(s) from database",
2261            self.general.len()
2262        );
2263        Ok(())
2264    }
2265
2266    /// Loads all core caches (currencies, instruments, accounts, orders, positions) from the database.
2267    ///
2268    /// # Errors
2269    ///
2270    /// Returns an error if loading all cache data fails.
2271    pub async fn cache_all(&mut self) -> anyhow::Result<()> {
2272        let cache_map = match &self.database {
2273            Some(db) => db.load_all().await?,
2274            None => CacheMap::default(),
2275        };
2276
2277        self.currencies = cache_map.currencies;
2278        self.instruments = cache_map.instruments;
2279        self.synthetics = cache_map.synthetics;
2280        self.accounts = cache_map
2281            .accounts
2282            .into_iter()
2283            .map(|(id, account)| (id, SharedCell::new(account)))
2284            .collect();
2285        self.orders = cache_map
2286            .orders
2287            .into_iter()
2288            .map(|(id, order)| (id, SharedCell::new(order)))
2289            .collect();
2290        self.positions = cache_map
2291            .positions
2292            .into_iter()
2293            .map(|(id, position)| (id, SharedCell::new(position)))
2294            .collect();
2295
2296        if let Some(db) = &self.database {
2297            self.index.order_position = db.load_index_order_position()?;
2298            self.index.order_client = db.load_index_order_client()?;
2299        }
2300
2301        self.assign_position_ids_to_contingencies();
2302        Ok(())
2303    }
2304
2305    /// Clears and reloads the currency cache from the database.
2306    ///
2307    /// # Errors
2308    ///
2309    /// Returns an error if loading currencies cache fails.
2310    pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
2311        self.currencies = match &mut self.database {
2312            Some(db) => db.load_currencies().await?,
2313            None => AHashMap::new(),
2314        };
2315
2316        log::info!("Cached {} currencies from database", self.general.len());
2317        Ok(())
2318    }
2319
2320    /// Clears and reloads the instrument cache from the database.
2321    ///
2322    /// # Errors
2323    ///
2324    /// Returns an error if loading instruments cache fails.
2325    pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
2326        self.instruments = match &mut self.database {
2327            Some(db) => db.load_instruments().await?,
2328            None => AHashMap::new(),
2329        };
2330
2331        log::info!("Cached {} instruments from database", self.general.len());
2332        Ok(())
2333    }
2334
2335    /// Clears and reloads the synthetic instrument cache from the database.
2336    ///
2337    /// # Errors
2338    ///
2339    /// Returns an error if loading synthetic instruments cache fails.
2340    pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
2341        self.synthetics = match &mut self.database {
2342            Some(db) => db.load_synthetics().await?,
2343            None => AHashMap::new(),
2344        };
2345
2346        log::info!(
2347            "Cached {} synthetic instruments from database",
2348            self.general.len()
2349        );
2350        Ok(())
2351    }
2352
2353    /// Clears and reloads the account cache from the database.
2354    ///
2355    /// # Errors
2356    ///
2357    /// Returns an error if loading accounts cache fails.
2358    pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
2359        self.accounts = match &mut self.database {
2360            Some(db) => db
2361                .load_accounts()
2362                .await?
2363                .into_iter()
2364                .map(|(id, account)| (id, SharedCell::new(account)))
2365                .collect(),
2366            None => AHashMap::new(),
2367        };
2368
2369        log::info!(
2370            "Cached {} synthetic instruments from database",
2371            self.general.len()
2372        );
2373        Ok(())
2374    }
2375
2376    /// Clears and reloads the order cache from the database.
2377    ///
2378    /// # Errors
2379    ///
2380    /// Returns an error if loading orders cache fails.
2381    pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
2382        self.orders = match &mut self.database {
2383            Some(db) => db
2384                .load_orders()
2385                .await?
2386                .into_iter()
2387                .map(|(id, order)| (id, SharedCell::new(order)))
2388                .collect(),
2389            None => AHashMap::new(),
2390        };
2391
2392        if let Some(db) = &self.database {
2393            self.index.order_position = db.load_index_order_position()?;
2394            self.index.order_client = db.load_index_order_client()?;
2395        }
2396
2397        log::info!("Cached {} orders from database", self.general.len());
2398
2399        self.assign_position_ids_to_contingencies();
2400        Ok(())
2401    }
2402
2403    /// Clears and reloads the position cache from the database.
2404    ///
2405    /// # Errors
2406    ///
2407    /// Returns an error if loading positions cache fails.
2408    pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
2409        self.positions = match &mut self.database {
2410            Some(db) => db
2411                .load_positions()
2412                .await?
2413                .into_iter()
2414                .map(|(id, position)| (id, SharedCell::new(position)))
2415                .collect(),
2416            None => AHashMap::new(),
2417        };
2418
2419        log::info!("Cached {} positions from database", self.general.len());
2420        Ok(())
2421    }
2422
2423    /// Clears the current cache index and re-build.
2424    pub fn build_index(&mut self) {
2425        log::debug!("Building index");
2426
2427        // Index accounts
2428        for account_id in self.accounts.keys() {
2429            self.index
2430                .venue_account
2431                .insert(account_id.get_issuer(), *account_id);
2432        }
2433
2434        // Index orders
2435        for (client_order_id, order_cell) in &self.orders {
2436            let order = order_cell.borrow();
2437            let instrument_id = order.instrument_id();
2438            let venue = instrument_id.venue;
2439            let strategy_id = order.strategy_id();
2440
2441            // 1: Build index.venue_orders -> {Venue, {ClientOrderId}}
2442            self.index
2443                .venue_orders
2444                .entry(venue)
2445                .or_default()
2446                .insert(*client_order_id);
2447
2448            // 2: Build index.order_ids -> {VenueOrderId, ClientOrderId}
2449            if let Some(venue_order_id) = order.venue_order_id() {
2450                self.index
2451                    .venue_order_ids
2452                    .insert(venue_order_id, *client_order_id);
2453            }
2454
2455            // 3: Build index.order_position -> {ClientOrderId, PositionId}
2456            if let Some(position_id) = order.position_id() {
2457                self.index
2458                    .order_position
2459                    .insert(*client_order_id, position_id);
2460            }
2461
2462            // 4: Build index.order_strategy -> {ClientOrderId, StrategyId}
2463            self.index
2464                .order_strategy
2465                .insert(*client_order_id, order.strategy_id());
2466
2467            // 5: Build index.instrument_orders -> {InstrumentId, {ClientOrderId}}
2468            self.index
2469                .instrument_orders
2470                .entry(instrument_id)
2471                .or_default()
2472                .insert(*client_order_id);
2473
2474            // 6: Build index.strategy_orders -> {StrategyId, {ClientOrderId}}
2475            self.index
2476                .strategy_orders
2477                .entry(strategy_id)
2478                .or_default()
2479                .insert(*client_order_id);
2480
2481            // 7: Build index.account_orders -> {AccountId, {ClientOrderId}}
2482            if let Some(account_id) = order.account_id() {
2483                self.index
2484                    .account_orders
2485                    .entry(account_id)
2486                    .or_default()
2487                    .insert(*client_order_id);
2488            }
2489
2490            // 8: Build index.exec_algorithm_orders -> {ExecAlgorithmId, {ClientOrderId}}
2491            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
2492                self.index
2493                    .exec_algorithm_orders
2494                    .entry(exec_algorithm_id)
2495                    .or_default()
2496                    .insert(*client_order_id);
2497            }
2498
2499            // 8: Build index.exec_spawn_orders -> {ClientOrderId, {ClientOrderId}}
2500            if let Some(exec_spawn_id) = order.exec_spawn_id() {
2501                self.index
2502                    .exec_spawn_orders
2503                    .entry(exec_spawn_id)
2504                    .or_default()
2505                    .insert(*client_order_id);
2506            }
2507
2508            // 9: Build index.orders -> {ClientOrderId}
2509            self.index.orders.insert(*client_order_id);
2510
2511            // 10: Build index.orders_active_local -> {ClientOrderId}
2512            if order.is_active_local() {
2513                self.index.orders_active_local.insert(*client_order_id);
2514            }
2515
2516            // 11: Build index.orders_open -> {ClientOrderId}
2517            if order.is_open() {
2518                self.index.orders_open.insert(*client_order_id);
2519            }
2520
2521            // 12: Build index.orders_closed -> {ClientOrderId}
2522            if order.is_closed() {
2523                self.index.orders_closed.insert(*client_order_id);
2524            }
2525
2526            // 13: Build index.orders_emulated -> {ClientOrderId}
2527            if let Some(emulation_trigger) = order.emulation_trigger()
2528                && emulation_trigger != TriggerType::NoTrigger
2529                && !order.is_closed()
2530            {
2531                self.index.orders_emulated.insert(*client_order_id);
2532            }
2533
2534            // 14: Build index.orders_inflight -> {ClientOrderId}
2535            if order.is_inflight() {
2536                self.index.orders_inflight.insert(*client_order_id);
2537            }
2538
2539            // 15: Build index.strategies -> {StrategyId}
2540            self.index.strategies.insert(strategy_id);
2541
2542            // 16: Build index.strategies -> {ExecAlgorithmId}
2543            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
2544                self.index.exec_algorithms.insert(exec_algorithm_id);
2545            }
2546        }
2547
2548        // Index positions
2549        for (position_id, position_cell) in &self.positions {
2550            let position = position_cell.borrow();
2551            let instrument_id = position.instrument_id;
2552            let venue = instrument_id.venue;
2553            let strategy_id = position.strategy_id;
2554
2555            // 1: Build index.venue_positions -> {Venue, {PositionId}}
2556            self.index
2557                .venue_positions
2558                .entry(venue)
2559                .or_default()
2560                .insert(*position_id);
2561
2562            // 2: Build index.position_strategy -> {PositionId, StrategyId}
2563            self.index
2564                .position_strategy
2565                .insert(*position_id, position.strategy_id);
2566
2567            // 3: Build index.position_orders -> {PositionId, {ClientOrderId}}
2568            self.index
2569                .position_orders
2570                .entry(*position_id)
2571                .or_default()
2572                .extend(position.client_order_ids());
2573
2574            // 4: Build index.instrument_positions -> {InstrumentId, {PositionId}}
2575            self.index
2576                .instrument_positions
2577                .entry(instrument_id)
2578                .or_default()
2579                .insert(*position_id);
2580
2581            // 5: Build index.strategy_positions -> {StrategyId, {PositionId}}
2582            self.index
2583                .strategy_positions
2584                .entry(strategy_id)
2585                .or_default()
2586                .insert(*position_id);
2587
2588            // 6: Build index.account_positions -> {AccountId, {PositionId}}
2589            self.index
2590                .account_positions
2591                .entry(position.account_id)
2592                .or_default()
2593                .insert(*position_id);
2594
2595            // 7: Build index.positions -> {PositionId}
2596            self.index.positions.insert(*position_id);
2597
2598            // 8: Build index.positions_open -> {PositionId}
2599            if position.is_open() {
2600                self.index.positions_open.insert(*position_id);
2601            }
2602
2603            // 9: Build index.positions_closed -> {PositionId}
2604            if position.is_closed() {
2605                self.index.positions_closed.insert(*position_id);
2606            }
2607
2608            // 10: Build index.strategies -> {StrategyId}
2609            self.index.strategies.insert(strategy_id);
2610        }
2611    }
2612
2613    /// Returns whether the cache has a backing database.
2614    #[must_use]
2615    pub const fn has_backing(&self) -> bool {
2616        self.database.is_some()
2617    }
2618
2619    // Calculate the unrealized profit and loss (PnL) for `position`.
2620    #[must_use]
2621    pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
2622        let Some(quote) = self.quote(&position.instrument_id) else {
2623            log::warn!(
2624                "Cannot calculate unrealized PnL for {}, no quotes for {}",
2625                position.id,
2626                position.instrument_id
2627            );
2628            return None;
2629        };
2630
2631        // Use exit price for mark-to-market: longs exit at bid, shorts exit at ask
2632        let last = match position.side {
2633            PositionSide::Flat | PositionSide::NoPositionSide => {
2634                return Some(Money::zero(position.settlement_currency));
2635            }
2636            PositionSide::Long => quote.bid_price,
2637            PositionSide::Short => quote.ask_price,
2638        };
2639
2640        Some(position.unrealized_pnl(last))
2641    }
2642
2643    /// Checks integrity of data within the cache.
2644    ///
2645    /// All data should be loaded from the database prior to this call.
2646    /// If an error is found then a log error message will also be produced.
2647    ///
2648    /// # Panics
2649    ///
2650    /// Panics if failure calling system clock.
2651    #[must_use]
2652    pub fn check_integrity(&mut self) -> bool {
2653        let mut error_count = 0;
2654        let failure = "Integrity failure";
2655
2656        // Get current timestamp in microseconds
2657        let timestamp_us = SystemTime::now()
2658            .duration_since(UNIX_EPOCH)
2659            .expect("Time went backwards")
2660            .as_micros();
2661
2662        log::info!("Checking data integrity");
2663
2664        // Check object caches
2665        for account_id in self.accounts.keys() {
2666            if !self
2667                .index
2668                .venue_account
2669                .contains_key(&account_id.get_issuer())
2670            {
2671                log::error!(
2672                    "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
2673                );
2674                error_count += 1;
2675            }
2676        }
2677
2678        for (client_order_id, order_cell) in &self.orders {
2679            let order = order_cell.borrow();
2680
2681            if !self.index.order_strategy.contains_key(client_order_id) {
2682                log::error!(
2683                    "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
2684                );
2685                error_count += 1;
2686            }
2687
2688            if !self.index.orders.contains(client_order_id) {
2689                log::error!(
2690                    "{failure} in orders: {client_order_id} not found in `self.index.orders`",
2691                );
2692                error_count += 1;
2693            }
2694
2695            if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
2696                log::error!(
2697                    "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
2698                );
2699                error_count += 1;
2700            }
2701
2702            if order.is_active_local() && !self.index.orders_active_local.contains(client_order_id)
2703            {
2704                log::error!(
2705                    "{failure} in orders: {client_order_id} not found in `self.index.orders_active_local`",
2706                );
2707                error_count += 1;
2708            }
2709
2710            if order.is_open() && !self.index.orders_open.contains(client_order_id) {
2711                log::error!(
2712                    "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
2713                );
2714                error_count += 1;
2715            }
2716
2717            if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
2718                log::error!(
2719                    "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
2720                );
2721                error_count += 1;
2722            }
2723
2724            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
2725                if !self
2726                    .index
2727                    .exec_algorithm_orders
2728                    .contains_key(&exec_algorithm_id)
2729                {
2730                    log::error!(
2731                        "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
2732                    );
2733                    error_count += 1;
2734                }
2735
2736                if order.exec_spawn_id().is_none()
2737                    && !self.index.exec_spawn_orders.contains_key(client_order_id)
2738                {
2739                    log::error!(
2740                        "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
2741                    );
2742                    error_count += 1;
2743                }
2744            }
2745        }
2746
2747        for (position_id, position_cell) in &self.positions {
2748            let position = position_cell.borrow();
2749
2750            if !self.index.position_strategy.contains_key(position_id) {
2751                log::error!(
2752                    "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
2753                );
2754                error_count += 1;
2755            }
2756
2757            if !self.index.position_orders.contains_key(position_id) {
2758                log::error!(
2759                    "{failure} in positions: {position_id} not found in `self.index.position_orders`",
2760                );
2761                error_count += 1;
2762            }
2763
2764            if !self.index.positions.contains(position_id) {
2765                log::error!(
2766                    "{failure} in positions: {position_id} not found in `self.index.positions`",
2767                );
2768                error_count += 1;
2769            }
2770
2771            if position.is_open() && !self.index.positions_open.contains(position_id) {
2772                log::error!(
2773                    "{failure} in positions: {position_id} not found in `self.index.positions_open`",
2774                );
2775                error_count += 1;
2776            }
2777
2778            if position.is_closed() && !self.index.positions_closed.contains(position_id) {
2779                log::error!(
2780                    "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
2781                );
2782                error_count += 1;
2783            }
2784        }
2785
2786        // Check indexes
2787        for account_id in self.index.venue_account.values() {
2788            if !self.accounts.contains_key(account_id) {
2789                log::error!(
2790                    "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
2791                );
2792                error_count += 1;
2793            }
2794        }
2795
2796        for client_order_id in self.index.venue_order_ids.values() {
2797            if !self.orders.contains_key(client_order_id) {
2798                log::error!(
2799                    "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
2800                );
2801                error_count += 1;
2802            }
2803        }
2804
2805        for client_order_id in self.index.client_order_ids.keys() {
2806            if !self.orders.contains_key(client_order_id) {
2807                log::error!(
2808                    "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
2809                );
2810                error_count += 1;
2811            }
2812        }
2813
2814        for client_order_id in self.index.order_position.keys() {
2815            if !self.orders.contains_key(client_order_id) {
2816                log::error!(
2817                    "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
2818                );
2819                error_count += 1;
2820            }
2821        }
2822
2823        // Check indexes
2824        for client_order_id in self.index.order_strategy.keys() {
2825            if !self.orders.contains_key(client_order_id) {
2826                log::error!(
2827                    "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
2828                );
2829                error_count += 1;
2830            }
2831        }
2832
2833        for position_id in self.index.position_strategy.keys() {
2834            if !self.positions.contains_key(position_id) {
2835                log::error!(
2836                    "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
2837                );
2838                error_count += 1;
2839            }
2840        }
2841
2842        for position_id in self.index.position_orders.keys() {
2843            if !self.positions.contains_key(position_id) {
2844                log::error!(
2845                    "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
2846                );
2847                error_count += 1;
2848            }
2849        }
2850
2851        for (instrument_id, client_order_ids) in &self.index.instrument_orders {
2852            for client_order_id in client_order_ids {
2853                if !self.orders.contains_key(client_order_id) {
2854                    log::error!(
2855                        "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
2856                    );
2857                    error_count += 1;
2858                }
2859            }
2860        }
2861
2862        for instrument_id in self.index.instrument_positions.keys() {
2863            if !self.index.instrument_orders.contains_key(instrument_id) {
2864                log::error!(
2865                    "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
2866                );
2867                error_count += 1;
2868            }
2869        }
2870
2871        for client_order_ids in self.index.strategy_orders.values() {
2872            for client_order_id in client_order_ids {
2873                if !self.orders.contains_key(client_order_id) {
2874                    log::error!(
2875                        "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
2876                    );
2877                    error_count += 1;
2878                }
2879            }
2880        }
2881
2882        for position_ids in self.index.strategy_positions.values() {
2883            for position_id in position_ids {
2884                if !self.positions.contains_key(position_id) {
2885                    log::error!(
2886                        "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
2887                    );
2888                    error_count += 1;
2889                }
2890            }
2891        }
2892
2893        for client_order_id in &self.index.orders {
2894            if !self.orders.contains_key(client_order_id) {
2895                log::error!(
2896                    "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
2897                );
2898                error_count += 1;
2899            }
2900        }
2901
2902        for client_order_id in &self.index.orders_emulated {
2903            if !self.orders.contains_key(client_order_id) {
2904                log::error!(
2905                    "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
2906                );
2907                error_count += 1;
2908            }
2909        }
2910
2911        for client_order_id in &self.index.orders_active_local {
2912            if !self.orders.contains_key(client_order_id) {
2913                log::error!(
2914                    "{failure} in `index.orders_active_local`: {client_order_id} not found in `self.orders`",
2915                );
2916                error_count += 1;
2917            }
2918        }
2919
2920        for client_order_id in &self.index.orders_inflight {
2921            if !self.orders.contains_key(client_order_id) {
2922                log::error!(
2923                    "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
2924                );
2925                error_count += 1;
2926            }
2927        }
2928
2929        for client_order_id in &self.index.orders_open {
2930            if !self.orders.contains_key(client_order_id) {
2931                log::error!(
2932                    "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
2933                );
2934                error_count += 1;
2935            }
2936        }
2937
2938        for client_order_id in &self.index.orders_closed {
2939            if !self.orders.contains_key(client_order_id) {
2940                log::error!(
2941                    "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
2942                );
2943                error_count += 1;
2944            }
2945        }
2946
2947        for position_id in &self.index.positions {
2948            if !self.positions.contains_key(position_id) {
2949                log::error!(
2950                    "{failure} in `index.positions`: {position_id} not found in `self.positions`",
2951                );
2952                error_count += 1;
2953            }
2954        }
2955
2956        for position_id in &self.index.positions_open {
2957            if !self.positions.contains_key(position_id) {
2958                log::error!(
2959                    "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
2960                );
2961                error_count += 1;
2962            }
2963        }
2964
2965        for position_id in &self.index.positions_closed {
2966            if !self.positions.contains_key(position_id) {
2967                log::error!(
2968                    "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
2969                );
2970                error_count += 1;
2971            }
2972        }
2973
2974        for strategy_id in &self.index.strategies {
2975            if !self.index.strategy_orders.contains_key(strategy_id) {
2976                log::error!(
2977                    "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
2978                );
2979                error_count += 1;
2980            }
2981        }
2982
2983        for exec_algorithm_id in &self.index.exec_algorithms {
2984            if !self
2985                .index
2986                .exec_algorithm_orders
2987                .contains_key(exec_algorithm_id)
2988            {
2989                log::error!(
2990                    "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
2991                );
2992                error_count += 1;
2993            }
2994        }
2995
2996        let total_us = SystemTime::now()
2997            .duration_since(UNIX_EPOCH)
2998            .expect("Time went backwards")
2999            .as_micros()
3000            - timestamp_us;
3001
3002        if error_count == 0 {
3003            log::info!("Integrity check passed in {total_us}μs");
3004            true
3005        } else {
3006            log::error!(
3007                "Integrity check failed with {error_count} error{} in {total_us}μs",
3008                if error_count == 1 { "" } else { "s" },
3009            );
3010            false
3011        }
3012    }
3013
3014    /// Checks for any residual open state and log warnings if any are found.
3015    ///
3016    ///'Open state' is considered to be open orders and open positions.
3017    #[must_use]
3018    pub fn check_residuals(&self) -> bool {
3019        log::debug!("Checking residuals");
3020
3021        let mut residuals = false;
3022
3023        // Check for any open orders
3024        for order in self.orders_open(None, None, None, None, None) {
3025            residuals = true;
3026            log::warn!("Residual {order}");
3027        }
3028
3029        // Check for any open positions
3030        for position in self.positions_open(None, None, None, None, None) {
3031            residuals = true;
3032            log::warn!("Residual {position}");
3033        }
3034
3035        residuals
3036    }
3037
3038    /// Purges all closed orders from the cache that are older than `buffer_secs`.
3039    ///
3040    ///
3041    /// Only orders that have been closed for at least this amount of time will be purged.
3042    /// A value of 0 means purge all closed orders regardless of when they were closed.
3043    pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
3044        log::debug!(
3045            "Purging closed orders{}",
3046            if buffer_secs > 0 {
3047                format!(" with buffer_secs={buffer_secs}")
3048            } else {
3049                String::new()
3050            }
3051        );
3052
3053        let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
3054
3055        let mut affected_order_list_ids: AHashSet<OrderListId> = AHashSet::new();
3056
3057        'outer: for client_order_id in self.index.orders_closed.clone() {
3058            let purge_target = self.orders.get(&client_order_id).and_then(|order_cell| {
3059                let order = order_cell.borrow();
3060                if order.is_closed()
3061                    && let Some(ts_closed) = order.ts_closed()
3062                    && ts_closed + buffer_ns <= ts_now
3063                {
3064                    let linked = order.linked_order_ids().map(<[_]>::to_vec);
3065                    let order_list_id = order.order_list_id();
3066                    Some((linked, order_list_id))
3067                } else {
3068                    None
3069                }
3070            });
3071
3072            let Some((linked, order_list_id)) = purge_target else {
3073                continue;
3074            };
3075
3076            // Check any linked orders (contingency orders)
3077            if let Some(linked_order_ids) = linked {
3078                for linked_order_id in &linked_order_ids {
3079                    if let Some(linked_order_cell) = self.orders.get(linked_order_id)
3080                        && linked_order_cell.borrow().is_open()
3081                    {
3082                        // Do not purge if linked order still open
3083                        continue 'outer;
3084                    }
3085                }
3086            }
3087
3088            if let Some(order_list_id) = order_list_id {
3089                affected_order_list_ids.insert(order_list_id);
3090            }
3091
3092            self.purge_order(client_order_id);
3093        }
3094
3095        for order_list_id in affected_order_list_ids {
3096            if let Some(order_list) = self.order_lists.get(&order_list_id) {
3097                let all_purged = order_list
3098                    .client_order_ids
3099                    .iter()
3100                    .all(|id| !self.orders.contains_key(id));
3101
3102                if all_purged {
3103                    self.order_lists.remove(&order_list_id);
3104                    log::info!("Purged {order_list_id}");
3105                }
3106            }
3107        }
3108    }
3109
3110    /// Purges all closed positions from the cache that are older than `buffer_secs`.
3111    pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
3112        log::debug!(
3113            "Purging closed positions{}",
3114            if buffer_secs > 0 {
3115                format!(" with buffer_secs={buffer_secs}")
3116            } else {
3117                String::new()
3118            }
3119        );
3120
3121        let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
3122
3123        for position_id in self.index.positions_closed.clone() {
3124            let should_purge = self.positions.get(&position_id).is_some_and(|cell| {
3125                let position = cell.borrow();
3126                position.is_closed()
3127                    && position
3128                        .ts_closed
3129                        .is_some_and(|ts_closed| ts_closed + buffer_ns <= ts_now)
3130            });
3131
3132            if should_purge {
3133                self.purge_position(position_id);
3134            }
3135        }
3136    }
3137
3138    /// Purges the order with the `client_order_id` from the cache (if found).
3139    ///
3140    /// For safety, an order is prevented from being purged if it's open.
3141    pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
3142        // Check if order exists and is safe to purge before removing
3143        let order_cell = self.orders.get(&client_order_id).cloned();
3144
3145        // Prevent purging open orders
3146        if let Some(ref order_cell) = order_cell
3147            && order_cell.borrow().is_open()
3148        {
3149            log::warn!("Order {client_order_id} found open when purging, skipping purge");
3150            return;
3151        }
3152
3153        // If order exists in cache, remove it and clean up order-specific indices
3154        if let Some(ref order_cell) = order_cell {
3155            let order = order_cell.borrow();
3156            // Safe to purge
3157            self.orders.remove(&client_order_id);
3158
3159            // Remove order from venue index
3160            if let Some(venue_orders) = self
3161                .index
3162                .venue_orders
3163                .get_mut(&order.instrument_id().venue)
3164            {
3165                venue_orders.remove(&client_order_id);
3166                if venue_orders.is_empty() {
3167                    self.index.venue_orders.remove(&order.instrument_id().venue);
3168                }
3169            }
3170
3171            // Remove venue order ID index if exists
3172            if let Some(venue_order_id) = order.venue_order_id() {
3173                self.index.venue_order_ids.remove(&venue_order_id);
3174            }
3175
3176            // Remove from instrument orders index
3177            if let Some(instrument_orders) =
3178                self.index.instrument_orders.get_mut(&order.instrument_id())
3179            {
3180                instrument_orders.remove(&client_order_id);
3181                if instrument_orders.is_empty() {
3182                    self.index.instrument_orders.remove(&order.instrument_id());
3183                }
3184            }
3185
3186            // Remove from position orders index if associated with a position
3187            if let Some(position_id) = order.position_id()
3188                && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
3189            {
3190                position_orders.remove(&client_order_id);
3191                if position_orders.is_empty() {
3192                    self.index.position_orders.remove(&position_id);
3193                }
3194            }
3195
3196            // Remove from exec algorithm orders index if it has an exec algorithm
3197            if let Some(exec_algorithm_id) = order.exec_algorithm_id()
3198                && let Some(exec_algorithm_orders) =
3199                    self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
3200            {
3201                exec_algorithm_orders.remove(&client_order_id);
3202                if exec_algorithm_orders.is_empty() {
3203                    self.index.exec_algorithm_orders.remove(&exec_algorithm_id);
3204                }
3205            }
3206
3207            // Clean up strategy orders reverse index
3208            if let Some(strategy_orders) = self.index.strategy_orders.get_mut(&order.strategy_id())
3209            {
3210                strategy_orders.remove(&client_order_id);
3211                if strategy_orders.is_empty() {
3212                    self.index.strategy_orders.remove(&order.strategy_id());
3213                }
3214            }
3215
3216            // Clean up account orders index
3217            if let Some(account_id) = order.account_id()
3218                && let Some(account_orders) = self.index.account_orders.get_mut(&account_id)
3219            {
3220                account_orders.remove(&client_order_id);
3221                if account_orders.is_empty() {
3222                    self.index.account_orders.remove(&account_id);
3223                }
3224            }
3225
3226            // Clean up exec spawn reverse index (if this order is a spawned child)
3227            if let Some(exec_spawn_id) = order.exec_spawn_id()
3228                && let Some(spawn_orders) = self.index.exec_spawn_orders.get_mut(&exec_spawn_id)
3229            {
3230                spawn_orders.remove(&client_order_id);
3231                if spawn_orders.is_empty() {
3232                    self.index.exec_spawn_orders.remove(&exec_spawn_id);
3233                }
3234            }
3235
3236            log::info!("Purged order {client_order_id}");
3237        } else {
3238            log::warn!("Order {client_order_id} not found when purging");
3239        }
3240
3241        // Always clean up order indices (even if order was not in cache)
3242        self.index.order_position.remove(&client_order_id);
3243        let strategy_id = self.index.order_strategy.remove(&client_order_id);
3244        self.index.order_client.remove(&client_order_id);
3245        self.index.client_order_ids.remove(&client_order_id);
3246
3247        // Clean up reverse index when order not in cache (using forward index)
3248        if let Some(strategy_id) = strategy_id
3249            && let Some(strategy_orders) = self.index.strategy_orders.get_mut(&strategy_id)
3250        {
3251            strategy_orders.remove(&client_order_id);
3252            if strategy_orders.is_empty() {
3253                self.index.strategy_orders.remove(&strategy_id);
3254            }
3255        }
3256
3257        // Remove spawn parent entry if this order was a spawn root
3258        self.index.exec_spawn_orders.remove(&client_order_id);
3259
3260        self.index.orders.remove(&client_order_id);
3261        self.index.orders_active_local.remove(&client_order_id);
3262        self.index.orders_open.remove(&client_order_id);
3263        self.index.orders_closed.remove(&client_order_id);
3264        self.index.orders_emulated.remove(&client_order_id);
3265        self.index.orders_inflight.remove(&client_order_id);
3266        self.index.orders_pending_cancel.remove(&client_order_id);
3267    }
3268
3269    /// Purges the position with the `position_id` from the cache (if found).
3270    ///
3271    /// For safety, a position is prevented from being purged if it's open.
3272    pub fn purge_position(&mut self, position_id: PositionId) {
3273        // Snapshot the position so we can release the borrow before mutating indexes.
3274        let position = self
3275            .positions
3276            .get(&position_id)
3277            .map(|cell| cell.borrow().clone());
3278
3279        // Prevent purging open positions
3280        if let Some(ref pos) = position
3281            && pos.is_open()
3282        {
3283            log::warn!("Position {position_id} found open when purging, skipping purge");
3284            return;
3285        }
3286
3287        // If position exists in cache, remove it and clean up position-specific indices
3288        if let Some(ref pos) = position {
3289            self.positions.remove(&position_id);
3290
3291            // Remove from venue positions index
3292            if let Some(venue_positions) =
3293                self.index.venue_positions.get_mut(&pos.instrument_id.venue)
3294            {
3295                venue_positions.remove(&position_id);
3296                if venue_positions.is_empty() {
3297                    self.index.venue_positions.remove(&pos.instrument_id.venue);
3298                }
3299            }
3300
3301            // Remove from instrument positions index
3302            if let Some(instrument_positions) =
3303                self.index.instrument_positions.get_mut(&pos.instrument_id)
3304            {
3305                instrument_positions.remove(&position_id);
3306                if instrument_positions.is_empty() {
3307                    self.index.instrument_positions.remove(&pos.instrument_id);
3308                }
3309            }
3310
3311            // Remove from strategy positions index
3312            if let Some(strategy_positions) =
3313                self.index.strategy_positions.get_mut(&pos.strategy_id)
3314            {
3315                strategy_positions.remove(&position_id);
3316                if strategy_positions.is_empty() {
3317                    self.index.strategy_positions.remove(&pos.strategy_id);
3318                }
3319            }
3320
3321            // Remove from account positions index
3322            if let Some(account_positions) = self.index.account_positions.get_mut(&pos.account_id) {
3323                account_positions.remove(&position_id);
3324                if account_positions.is_empty() {
3325                    self.index.account_positions.remove(&pos.account_id);
3326                }
3327            }
3328
3329            // Remove position ID from orders that reference it
3330            for client_order_id in pos.client_order_ids() {
3331                self.index.order_position.remove(&client_order_id);
3332            }
3333
3334            log::info!("Purged position {position_id}");
3335        } else {
3336            log::warn!("Position {position_id} not found when purging");
3337        }
3338
3339        // Always clean up position indices (even if position not in cache)
3340        self.index.position_strategy.remove(&position_id);
3341        self.index.position_orders.remove(&position_id);
3342        self.index.positions.remove(&position_id);
3343        self.index.positions_open.remove(&position_id);
3344        self.index.positions_closed.remove(&position_id);
3345
3346        // Always clean up position snapshots (even if position not in cache)
3347        self.position_snapshots.remove(&position_id);
3348    }
3349
3350    /// Purges the instrument with the `instrument_id` from the cache (if found).
3351    ///
3352    /// All cache-owned data keyed by the instrument is removed: the instrument record,
3353    /// any synthetic with the same id, order book and own-order-book state, quote/trade
3354    /// histories, mark/index/funding price histories, instrument status, bars for any
3355    /// `BarType` referencing the instrument, and the `instrument_orders` /
3356    /// `instrument_positions` index entries.
3357    ///
3358    /// For safety, an instrument is prevented from being purged while any associated
3359    /// order is non-terminal (anything not in `orders_closed`, including
3360    /// initialized, submitted, accepted, emulated, released, or inflight states) or
3361    /// any associated position is non-closed.
3362    ///
3363    /// Active subscriptions and other live data-engine state are not touched here;
3364    /// those belong to the data and execution engines.
3365    ///
3366    /// # Warning
3367    ///
3368    /// Intended for actors and strategies that have their own lifecycle logic for
3369    /// deciding when an instrument is no longer needed. Purging an instrument that any
3370    /// other actor, strategy, or engine still relies on may cause incorrect behavior
3371    /// (missing instrument lookups, lost market-data history). The caller is
3372    /// responsible for ensuring the instrument is no longer in use before purging.
3373    fn purge_instrument_inner(&mut self, instrument_id: InstrumentId, skip_order_guard: bool) {
3374        #[cfg(feature = "defi")]
3375        let defi_found = self.defi.pools.contains_key(&instrument_id)
3376            || self.defi.pool_profilers.contains_key(&instrument_id);
3377        #[cfg(not(feature = "defi"))]
3378        let defi_found = false;
3379
3380        let found = self.instruments.contains_key(&instrument_id)
3381            || self.synthetics.contains_key(&instrument_id)
3382            || defi_found;
3383
3384        if !found {
3385            log::warn!("Instrument {instrument_id} not found when purging");
3386            return;
3387        }
3388
3389        if !skip_order_guard && let Some(orders) = self.index.instrument_orders.get(&instrument_id)
3390        {
3391            let has_non_terminal = orders
3392                .iter()
3393                .any(|client_order_id| !self.index.orders_closed.contains(client_order_id));
3394
3395            if has_non_terminal {
3396                log::warn!(
3397                    "Instrument {instrument_id} has non-terminal orders when purging, skipping purge"
3398                );
3399                return;
3400            }
3401        }
3402
3403        if let Some(positions) = self.index.instrument_positions.get(&instrument_id) {
3404            let has_non_closed = positions
3405                .iter()
3406                .any(|position_id| !self.index.positions_closed.contains(position_id));
3407
3408            if has_non_closed {
3409                log::warn!(
3410                    "Instrument {instrument_id} has non-closed positions when purging, skipping purge"
3411                );
3412                return;
3413            }
3414        }
3415
3416        self.instruments.remove(&instrument_id);
3417        self.synthetics.remove(&instrument_id);
3418        self.books.remove(&instrument_id);
3419        self.own_books.remove(&instrument_id);
3420        self.quotes.remove(&instrument_id);
3421        self.trades.remove(&instrument_id);
3422        self.mark_prices.remove(&instrument_id);
3423        self.index_prices.remove(&instrument_id);
3424        self.funding_rates.remove(&instrument_id);
3425        self.instrument_statuses.remove(&instrument_id);
3426        self.greeks.remove(&instrument_id);
3427        self.option_greeks.remove(&instrument_id);
3428
3429        self.bars
3430            .retain(|bar_type, _| bar_type.instrument_id() != instrument_id);
3431
3432        #[cfg(feature = "defi")]
3433        {
3434            self.defi.pools.remove(&instrument_id);
3435            self.defi.pool_profilers.remove(&instrument_id);
3436        }
3437
3438        self.index.instrument_orders.remove(&instrument_id);
3439        self.index.instrument_positions.remove(&instrument_id);
3440
3441        log::info!("Purged instrument {instrument_id}");
3442    }
3443
3444    /// Purges the instrument with the `instrument_id` from the cache.
3445    ///
3446    /// This refuses to purge when associated orders or positions remain in
3447    /// non-terminal state.
3448    pub fn purge_instrument(&mut self, instrument_id: InstrumentId) {
3449        self.purge_instrument_inner(instrument_id, false);
3450    }
3451
3452    /// Purges the instrument with the `instrument_id` from the cache while skipping the
3453    /// non-terminal order guard.
3454    ///
3455    /// This still refuses to purge when any associated position is non-closed. Intended
3456    /// for actors which own an instrument-expiration lifecycle and have already invalidated
3457    /// any remaining order state externally, but may still observe order-terminal events
3458    /// arriving later than the cleanup decision. During that window, the order objects may
3459    /// still exist even though `instrument_orders` is removed from the cache index.
3460    pub fn purge_instrument_skip_order_guard(&mut self, instrument_id: InstrumentId) {
3461        self.purge_instrument_inner(instrument_id, true);
3462    }
3463
3464    /// Purges all account state events which are outside the lookback window.
3465    ///
3466    /// Only events which are outside the lookback window will be purged.
3467    /// A value of 0 means purge all account state events.
3468    pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
3469        log::debug!(
3470            "Purging account events{}",
3471            if lookback_secs > 0 {
3472                format!(" with lookback_secs={lookback_secs}")
3473            } else {
3474                String::new()
3475            }
3476        );
3477
3478        for account_cell in self.accounts.values() {
3479            let mut account = account_cell.borrow_mut();
3480            let event_count = account.event_count();
3481            account.purge_account_events(ts_now, lookback_secs);
3482            let count_diff = event_count - account.event_count();
3483            if count_diff > 0 {
3484                log::info!(
3485                    "Purged {} event(s) from account {}",
3486                    count_diff,
3487                    account.id()
3488                );
3489            }
3490        }
3491    }
3492
3493    /// Clears the caches index.
3494    pub fn clear_index(&mut self) {
3495        self.index.clear();
3496        log::debug!("Cleared index");
3497    }
3498
3499    /// Resets the cache.
3500    ///
3501    /// All stateful fields are reset to their initial value. Instruments,
3502    /// currencies and synthetics are retained when `drop_instruments_on_reset`
3503    /// is `false` so that repeated backtest runs can reuse the same dataset.
3504    pub fn reset(&mut self) {
3505        log::debug!("Resetting cache");
3506
3507        self.general.clear();
3508        self.books.clear();
3509        self.own_books.clear();
3510        self.quotes.clear();
3511        self.trades.clear();
3512        self.mark_xrates.clear();
3513        self.mark_prices.clear();
3514        self.index_prices.clear();
3515        self.funding_rates.clear();
3516        self.instrument_statuses.clear();
3517        self.bars.clear();
3518        self.accounts.clear();
3519        self.orders.clear();
3520        self.order_lists.clear();
3521        self.positions.clear();
3522        self.position_snapshots.clear();
3523        self.greeks.clear();
3524        self.yield_curves.clear();
3525
3526        if self.config.drop_instruments_on_reset {
3527            self.currencies.clear();
3528            self.instruments.clear();
3529            self.synthetics.clear();
3530        }
3531
3532        #[cfg(feature = "defi")]
3533        {
3534            self.defi.pools.clear();
3535            self.defi.pool_profilers.clear();
3536        }
3537
3538        self.clear_index();
3539
3540        log::info!("Reset cache");
3541    }
3542
3543    /// Dispose of the cache which will close any underlying database adapter.
3544    ///
3545    /// If closing the database connection fails, an error is logged.
3546    pub fn dispose(&mut self) {
3547        self.reset();
3548
3549        if let Some(database) = &mut self.database
3550            && let Err(e) = database.close()
3551        {
3552            log::error!("Failed to close database during dispose: {e}");
3553        }
3554    }
3555
3556    /// Flushes the caches database which permanently removes all persisted data.
3557    ///
3558    /// If flushing the database connection fails, an error is logged.
3559    pub fn flush_db(&mut self) {
3560        if let Some(database) = &mut self.database
3561            && let Err(e) = database.flush()
3562        {
3563            log::error!("Failed to flush database: {e}");
3564        }
3565    }
3566
3567    /// Adds a raw bytes `value` to the cache under the `key`.
3568    ///
3569    /// The cache stores only raw bytes; interpretation is the caller's responsibility.
3570    ///
3571    /// # Errors
3572    ///
3573    /// Returns an error if persisting the entry to the backing database fails.
3574    pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
3575        check_valid_string_ascii(key, stringify!(key))?;
3576        check_predicate_false(value.is_empty(), stringify!(value))?;
3577
3578        log::debug!("Adding general {key}");
3579        self.general.insert(key.to_string(), value.clone());
3580
3581        if let Some(database) = &mut self.database {
3582            database.add(key.to_string(), value)?;
3583        }
3584        Ok(())
3585    }
3586
3587    /// Adds an `OrderBook` to the cache.
3588    ///
3589    /// # Errors
3590    ///
3591    /// Returns an error if persisting the order book to the backing database fails.
3592    pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
3593        log::debug!("Adding `OrderBook` {}", book.instrument_id);
3594
3595        if self.config.save_market_data
3596            && let Some(database) = &mut self.database
3597        {
3598            database.add_order_book(&book)?;
3599        }
3600
3601        self.books.insert(book.instrument_id, book);
3602        Ok(())
3603    }
3604
3605    /// Adds an `OwnOrderBook` to the cache.
3606    ///
3607    /// # Errors
3608    ///
3609    /// Returns an error if persisting the own order book fails.
3610    pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
3611        log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
3612
3613        self.own_books.insert(own_book.instrument_id, own_book);
3614        Ok(())
3615    }
3616
3617    /// Adds the `mark_price` update to the cache.
3618    ///
3619    /// # Errors
3620    ///
3621    /// Returns an error if persisting the mark price to the backing database fails.
3622    pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
3623        log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
3624
3625        if self.config.save_market_data {
3626            // TODO: Placeholder and return Result for consistency
3627        }
3628
3629        let mark_prices_deque = self
3630            .mark_prices
3631            .entry(mark_price.instrument_id)
3632            .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
3633        mark_prices_deque.push_front(mark_price);
3634        Ok(())
3635    }
3636
3637    /// Adds the `index_price` update to the cache.
3638    ///
3639    /// # Errors
3640    ///
3641    /// Returns an error if persisting the index price to the backing database fails.
3642    pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
3643        log::debug!(
3644            "Adding `IndexPriceUpdate` for {}",
3645            index_price.instrument_id
3646        );
3647
3648        if self.config.save_market_data {
3649            // TODO: Placeholder and return Result for consistency
3650        }
3651
3652        let index_prices_deque = self
3653            .index_prices
3654            .entry(index_price.instrument_id)
3655            .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
3656        index_prices_deque.push_front(index_price);
3657        Ok(())
3658    }
3659
3660    /// Adds the `funding_rate` update to the cache.
3661    ///
3662    /// # Errors
3663    ///
3664    /// Returns an error if persisting the funding rate update to the backing database fails.
3665    pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
3666        log::debug!(
3667            "Adding `FundingRateUpdate` for {}",
3668            funding_rate.instrument_id
3669        );
3670
3671        if self.config.save_market_data {
3672            // TODO: Placeholder and return Result for consistency
3673        }
3674
3675        let funding_rates_deque = self
3676            .funding_rates
3677            .entry(funding_rate.instrument_id)
3678            .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
3679        funding_rates_deque.push_front(funding_rate);
3680        Ok(())
3681    }
3682
3683    /// Adds the given `funding rates` to the cache.
3684    ///
3685    /// # Errors
3686    ///
3687    /// Returns an error if persisting the trade ticks to the backing database fails.
3688    pub fn add_funding_rates(&mut self, funding_rates: &[FundingRateUpdate]) -> anyhow::Result<()> {
3689        check_slice_not_empty(funding_rates, stringify!(funding_rates))?;
3690
3691        let instrument_id = funding_rates[0].instrument_id;
3692        log::debug!(
3693            "Adding `FundingRateUpdate`[{}] {instrument_id}",
3694            funding_rates.len()
3695        );
3696
3697        if self.config.save_market_data
3698            && let Some(database) = &mut self.database
3699        {
3700            for funding_rate in funding_rates {
3701                database.add_funding_rate(funding_rate)?;
3702            }
3703        }
3704
3705        let funding_rate_deque = self
3706            .funding_rates
3707            .entry(instrument_id)
3708            .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
3709
3710        for funding_rate in funding_rates {
3711            funding_rate_deque.push_front(*funding_rate);
3712        }
3713        Ok(())
3714    }
3715
3716    /// Adds the `instrument_status` update to the cache.
3717    ///
3718    /// # Errors
3719    ///
3720    /// Returns an error if persisting the instrument status to the backing database fails.
3721    pub fn add_instrument_status(&mut self, status: InstrumentStatus) -> anyhow::Result<()> {
3722        log::debug!("Adding `InstrumentStatus` for {}", status.instrument_id);
3723
3724        if self.config.save_market_data {
3725            // TODO: Placeholder and return Result for consistency
3726        }
3727
3728        let statuses_deque = self
3729            .instrument_statuses
3730            .entry(status.instrument_id)
3731            .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
3732        statuses_deque.push_front(status);
3733        Ok(())
3734    }
3735
3736    /// Adds the `quote` tick to the cache.
3737    ///
3738    /// # Errors
3739    ///
3740    /// Returns an error if persisting the quote tick to the backing database fails.
3741    pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
3742        log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
3743
3744        if self.config.save_market_data
3745            && let Some(database) = &mut self.database
3746        {
3747            database.add_quote(&quote)?;
3748        }
3749
3750        let quotes_deque = self
3751            .quotes
3752            .entry(quote.instrument_id)
3753            .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
3754        quotes_deque.push_front(quote);
3755        Ok(())
3756    }
3757
3758    /// Adds the `quotes` to the cache.
3759    ///
3760    /// # Errors
3761    ///
3762    /// Returns an error if persisting the quote ticks to the backing database fails.
3763    pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
3764        check_slice_not_empty(quotes, stringify!(quotes))?;
3765
3766        let instrument_id = quotes[0].instrument_id;
3767        log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
3768
3769        if self.config.save_market_data
3770            && let Some(database) = &mut self.database
3771        {
3772            for quote in quotes {
3773                database.add_quote(quote)?;
3774            }
3775        }
3776
3777        let quotes_deque = self
3778            .quotes
3779            .entry(instrument_id)
3780            .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
3781
3782        for quote in quotes {
3783            quotes_deque.push_front(*quote);
3784        }
3785        Ok(())
3786    }
3787
3788    /// Adds the `trade` tick to the cache.
3789    ///
3790    /// # Errors
3791    ///
3792    /// Returns an error if persisting the trade tick to the backing database fails.
3793    pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
3794        log::debug!("Adding `TradeTick` {}", trade.instrument_id);
3795
3796        if self.config.save_market_data
3797            && let Some(database) = &mut self.database
3798        {
3799            database.add_trade(&trade)?;
3800        }
3801
3802        let trades_deque = self
3803            .trades
3804            .entry(trade.instrument_id)
3805            .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
3806        trades_deque.push_front(trade);
3807        Ok(())
3808    }
3809
3810    /// Adds the give `trades` to the cache.
3811    ///
3812    /// # Errors
3813    ///
3814    /// Returns an error if persisting the trade ticks to the backing database fails.
3815    pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
3816        check_slice_not_empty(trades, stringify!(trades))?;
3817
3818        let instrument_id = trades[0].instrument_id;
3819        log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
3820
3821        if self.config.save_market_data
3822            && let Some(database) = &mut self.database
3823        {
3824            for trade in trades {
3825                database.add_trade(trade)?;
3826            }
3827        }
3828
3829        let trades_deque = self
3830            .trades
3831            .entry(instrument_id)
3832            .or_insert_with(|| BoundedVecDeque::new(self.config.tick_capacity));
3833
3834        for trade in trades {
3835            trades_deque.push_front(*trade);
3836        }
3837        Ok(())
3838    }
3839
3840    /// Adds the `bar` to the cache.
3841    ///
3842    /// # Errors
3843    ///
3844    /// Returns an error if persisting the bar to the backing database fails.
3845    pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
3846        log::debug!("Adding `Bar` {}", bar.bar_type);
3847
3848        if self.config.save_market_data
3849            && let Some(database) = &mut self.database
3850        {
3851            database.add_bar(&bar)?;
3852        }
3853
3854        let bars = self
3855            .bars
3856            .entry(bar.bar_type)
3857            .or_insert_with(|| BoundedVecDeque::new(self.config.bar_capacity));
3858        bars.push_front(bar);
3859        Ok(())
3860    }
3861
3862    /// Adds the `bars` to the cache.
3863    ///
3864    /// # Errors
3865    ///
3866    /// Returns an error if persisting the bars to the backing database fails.
3867    pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
3868        check_slice_not_empty(bars, stringify!(bars))?;
3869
3870        let bar_type = bars[0].bar_type;
3871        log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
3872
3873        if self.config.save_market_data
3874            && let Some(database) = &mut self.database
3875        {
3876            for bar in bars {
3877                database.add_bar(bar)?;
3878            }
3879        }
3880
3881        let bars_deque = self
3882            .bars
3883            .entry(bar_type)
3884            .or_insert_with(|| BoundedVecDeque::new(self.config.bar_capacity));
3885
3886        for bar in bars {
3887            bars_deque.push_front(*bar);
3888        }
3889        Ok(())
3890    }
3891
3892    /// Adds the `greeks` data to the cache.
3893    ///
3894    /// # Errors
3895    ///
3896    /// Returns an error if persisting the greeks data to the backing database fails.
3897    pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
3898        log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
3899
3900        if self.config.save_market_data
3901            && let Some(_database) = &mut self.database
3902        {
3903            // TODO: Implement database.add_greeks(&greeks) when database adapter is updated
3904        }
3905
3906        self.greeks.insert(greeks.instrument_id, greeks);
3907        Ok(())
3908    }
3909
3910    /// Gets the greeks data for the `instrument_id`.
3911    pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
3912        self.greeks.get(instrument_id).cloned()
3913    }
3914
3915    /// Adds exchange-provided option greeks to the cache.
3916    pub fn add_option_greeks(&mut self, greeks: OptionGreeks) {
3917        log::debug!("Adding `OptionGreeks` {}", greeks.instrument_id);
3918        self.option_greeks.insert(greeks.instrument_id, greeks);
3919    }
3920
3921    /// Gets a reference to the exchange-provided option greeks for the `instrument_id`.
3922    #[must_use]
3923    pub fn option_greeks(&self, instrument_id: &InstrumentId) -> Option<&OptionGreeks> {
3924        self.option_greeks.get(instrument_id)
3925    }
3926
3927    /// Adds the `yield_curve` data to the cache.
3928    ///
3929    /// # Errors
3930    ///
3931    /// Returns an error if persisting the yield curve data to the backing database fails.
3932    pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
3933        log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
3934
3935        if self.config.save_market_data
3936            && let Some(_database) = &mut self.database
3937        {
3938            // TODO: Implement database.add_yield_curve(&yield_curve) when database adapter is updated
3939        }
3940
3941        self.yield_curves
3942            .insert(yield_curve.curve_name.clone(), yield_curve);
3943        Ok(())
3944    }
3945
3946    /// Gets the yield curve for the `key`.
3947    pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
3948        self.yield_curves.get(key).map(|curve| {
3949            let curve_clone = curve.clone();
3950            Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
3951                as Box<dyn Fn(f64) -> f64>
3952        })
3953    }
3954
3955    /// Adds the `currency` to the cache.
3956    ///
3957    /// # Errors
3958    ///
3959    /// Returns an error if persisting the currency to the backing database fails.
3960    pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
3961        if self.currencies.contains_key(&currency.code) {
3962            return Ok(());
3963        }
3964        log::debug!("Adding `Currency` {}", currency.code);
3965
3966        if let Some(database) = &mut self.database {
3967            database.add_currency(&currency)?;
3968        }
3969
3970        self.currencies.insert(currency.code, currency);
3971        Ok(())
3972    }
3973
3974    /// Adds the `instrument` to the cache.
3975    ///
3976    /// # Errors
3977    ///
3978    /// Returns an error if persisting the instrument to the backing database fails.
3979    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
3980        log::debug!("Adding `Instrument` {}", instrument.id());
3981
3982        // Ensure currencies exist in cache - safe to call repeatedly as add_currency is idempotent
3983        if let Some(base_currency) = instrument.base_currency() {
3984            self.add_currency(base_currency)?;
3985        }
3986        self.add_currency(instrument.quote_currency())?;
3987        self.add_currency(instrument.settlement_currency())?;
3988
3989        if let Some(database) = &mut self.database {
3990            database.add_instrument(&instrument)?;
3991        }
3992
3993        self.instruments.insert(instrument.id(), instrument);
3994        Ok(())
3995    }
3996
3997    /// Adds the `synthetic` instrument to the cache.
3998    ///
3999    /// # Errors
4000    ///
4001    /// Returns an error if persisting the synthetic instrument to the backing database fails.
4002    pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
4003        log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
4004
4005        if let Some(database) = &mut self.database {
4006            database.add_synthetic(&synthetic)?;
4007        }
4008
4009        self.synthetics.insert(synthetic.id, synthetic);
4010        Ok(())
4011    }
4012
4013    /// Adds the `account` to the cache.
4014    ///
4015    /// # Errors
4016    ///
4017    /// Returns an error if persisting the account to the backing database fails.
4018    pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
4019        log::debug!("Adding `Account` {}", account.id());
4020
4021        if let Some(database) = &mut self.database {
4022            database.add_account(&account)?;
4023        }
4024
4025        let account_id = account.id();
4026        self.accounts.insert(account_id, SharedCell::new(account));
4027        self.index
4028            .venue_account
4029            .insert(account_id.get_issuer(), account_id);
4030        Ok(())
4031    }
4032
4033    /// Indexes the `client_order_id` with the `venue_order_id`.
4034    ///
4035    /// The `overwrite` parameter determines whether to overwrite any existing cached identifier.
4036    ///
4037    /// # Errors
4038    ///
4039    /// Returns an error if the existing venue order ID conflicts and overwrite is false.
4040    pub fn add_venue_order_id(
4041        &mut self,
4042        client_order_id: &ClientOrderId,
4043        venue_order_id: &VenueOrderId,
4044        overwrite: bool,
4045    ) -> anyhow::Result<()> {
4046        if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
4047            && !overwrite
4048            && existing_venue_order_id != venue_order_id
4049        {
4050            anyhow::bail!(
4051                "Existing {existing_venue_order_id} for {client_order_id}
4052                    did not match the given {venue_order_id}.
4053                    If you are writing a test then try a different `venue_order_id`,
4054                    otherwise this is probably a bug."
4055            );
4056        }
4057
4058        self.index
4059            .client_order_ids
4060            .insert(*client_order_id, *venue_order_id);
4061        self.index
4062            .venue_order_ids
4063            .insert(*venue_order_id, *client_order_id);
4064
4065        Ok(())
4066    }
4067
4068    /// Adds the `order` to the cache indexed with any given identifiers.
4069    ///
4070    /// # Parameters
4071    ///
4072    /// `override_existing`: If the added order should 'override' any existing order and replace
4073    /// it in the cache. This is currently used for emulated orders which are
4074    /// being released and transformed into another type.
4075    ///
4076    /// # Errors
4077    ///
4078    /// Returns an error if not `replace_existing` and the `order.client_order_id` is already contained in the cache.
4079    pub fn add_order(
4080        &mut self,
4081        order: OrderAny,
4082        position_id: Option<PositionId>,
4083        client_id: Option<ClientId>,
4084        replace_existing: bool,
4085    ) -> anyhow::Result<()> {
4086        let instrument_id = order.instrument_id();
4087        let venue = instrument_id.venue;
4088        let client_order_id = order.client_order_id();
4089        let strategy_id = order.strategy_id();
4090        let exec_algorithm_id = order.exec_algorithm_id();
4091        let exec_spawn_id = order.exec_spawn_id();
4092
4093        if !replace_existing {
4094            check_key_not_in_map(
4095                &client_order_id,
4096                &self.orders,
4097                stringify!(client_order_id),
4098                stringify!(orders),
4099            )?;
4100        }
4101
4102        log::debug!("Adding {order:?}");
4103
4104        self.index.orders.insert(client_order_id);
4105
4106        if order.is_active_local() {
4107            self.index.orders_active_local.insert(client_order_id);
4108        }
4109        self.index
4110            .order_strategy
4111            .insert(client_order_id, strategy_id);
4112        self.index.strategies.insert(strategy_id);
4113
4114        // Update venue -> orders index
4115        self.index
4116            .venue_orders
4117            .entry(venue)
4118            .or_default()
4119            .insert(client_order_id);
4120
4121        // Update instrument -> orders index
4122        self.index
4123            .instrument_orders
4124            .entry(instrument_id)
4125            .or_default()
4126            .insert(client_order_id);
4127
4128        // Update strategy -> orders index
4129        self.index
4130            .strategy_orders
4131            .entry(strategy_id)
4132            .or_default()
4133            .insert(client_order_id);
4134
4135        // Update account -> orders index (if account_id known at creation)
4136        if let Some(account_id) = order.account_id() {
4137            self.index
4138                .account_orders
4139                .entry(account_id)
4140                .or_default()
4141                .insert(client_order_id);
4142        }
4143
4144        // Update exec_algorithm -> orders index
4145        if let Some(exec_algorithm_id) = exec_algorithm_id {
4146            self.index.exec_algorithms.insert(exec_algorithm_id);
4147
4148            self.index
4149                .exec_algorithm_orders
4150                .entry(exec_algorithm_id)
4151                .or_default()
4152                .insert(client_order_id);
4153        }
4154
4155        // Update exec_spawn -> orders index
4156        if let Some(exec_spawn_id) = exec_spawn_id {
4157            self.index
4158                .exec_spawn_orders
4159                .entry(exec_spawn_id)
4160                .or_default()
4161                .insert(client_order_id);
4162        }
4163
4164        // Update emulation index
4165        if let Some(emulation_trigger) = order.emulation_trigger()
4166            && emulation_trigger != TriggerType::NoTrigger
4167        {
4168            self.index.orders_emulated.insert(client_order_id);
4169        }
4170
4171        // Index position ID if provided
4172        if let Some(position_id) = position_id {
4173            self.add_position_id(
4174                &position_id,
4175                &order.instrument_id().venue,
4176                &client_order_id,
4177                &strategy_id,
4178            )?;
4179        }
4180
4181        // Index client ID if provided
4182        if let Some(client_id) = client_id {
4183            self.index.order_client.insert(client_order_id, client_id);
4184            log::debug!("Indexed {client_id:?}");
4185        }
4186
4187        if let Some(database) = &mut self.database {
4188            database.add_order(&order, client_id)?;
4189            // TODO: Implement
4190            // if self.config.snapshot_orders {
4191            //     database.snapshot_order_state(order)?;
4192            // }
4193        }
4194
4195        match self.orders.get(&client_order_id) {
4196            // Reuse the existing cell on replace so the canonical entry stays in place
4197            // rather than orphaning a stale cell.
4198            Some(order_cell) => *order_cell.borrow_mut() = order,
4199            None => {
4200                self.orders.insert(client_order_id, SharedCell::new(order));
4201            }
4202        }
4203
4204        Ok(())
4205    }
4206
4207    /// Adds the `order_list` to the cache.
4208    ///
4209    /// # Errors
4210    ///
4211    /// Returns an error if the order list ID is already contained in the cache.
4212    pub fn add_order_list(&mut self, order_list: OrderList) -> anyhow::Result<()> {
4213        let order_list_id = order_list.id;
4214        check_key_not_in_map(
4215            &order_list_id,
4216            &self.order_lists,
4217            stringify!(order_list_id),
4218            stringify!(order_lists),
4219        )?;
4220
4221        log::debug!("Adding {order_list:?}");
4222        self.order_lists.insert(order_list_id, order_list);
4223        Ok(())
4224    }
4225
4226    /// Indexes the `position_id` with the other given IDs.
4227    ///
4228    /// # Errors
4229    ///
4230    /// Returns an error if indexing position ID in the backing database fails.
4231    pub fn add_position_id(
4232        &mut self,
4233        position_id: &PositionId,
4234        venue: &Venue,
4235        client_order_id: &ClientOrderId,
4236        strategy_id: &StrategyId,
4237    ) -> anyhow::Result<()> {
4238        self.index
4239            .order_position
4240            .insert(*client_order_id, *position_id);
4241
4242        // Index: ClientOrderId -> PositionId
4243        if let Some(database) = &mut self.database {
4244            database.index_order_position(*client_order_id, *position_id)?;
4245        }
4246
4247        // Index: PositionId -> StrategyId
4248        self.index
4249            .position_strategy
4250            .insert(*position_id, *strategy_id);
4251
4252        // Index: PositionId -> set[ClientOrderId]
4253        self.index
4254            .position_orders
4255            .entry(*position_id)
4256            .or_default()
4257            .insert(*client_order_id);
4258
4259        // Index: StrategyId -> set[PositionId]
4260        self.index
4261            .strategy_positions
4262            .entry(*strategy_id)
4263            .or_default()
4264            .insert(*position_id);
4265
4266        // Index: Venue -> set[PositionId]
4267        self.index
4268            .venue_positions
4269            .entry(*venue)
4270            .or_default()
4271            .insert(*position_id);
4272
4273        Ok(())
4274    }
4275
4276    // Propagates parent OTO `position_id` to contingent children that are missing one.
4277    //
4278    // Recovers from a partial-write window during fill handling: the fill-time path in the
4279    // execution engine assigns `position_id` to each contingent child in a non-atomic loop
4280    // (`set_position_id` then `add_position_id`), so a crash mid-loop can leave the database
4281    // with the parent updated and some children un-updated. This pass re-applies any missing
4282    // assignments after load. Mirrors the Cython behaviour at
4283    // `nautilus_trader/cache/cache.pyx::_assign_position_id_to_contingencies`.
4284    fn assign_position_ids_to_contingencies(&mut self) {
4285        let mut assignments: Vec<(PositionId, ClientOrderId)> = Vec::new();
4286
4287        for parent_order_cell in self.orders.values() {
4288            let parent = parent_order_cell.borrow();
4289            if parent.contingency_type() != Some(ContingencyType::Oto) {
4290                continue;
4291            }
4292            let Some(parent_position_id) = parent.position_id() else {
4293                continue;
4294            };
4295            let Some(linked_order_ids) = parent.linked_order_ids() else {
4296                continue;
4297            };
4298
4299            for client_order_id in linked_order_ids {
4300                match self.orders.get(client_order_id) {
4301                    None => {
4302                        log::error!("Contingency order {client_order_id} not found");
4303                    }
4304                    Some(contingent_order_cell) => {
4305                        if contingent_order_cell.borrow().position_id().is_none() {
4306                            assignments.push((parent_position_id, *client_order_id));
4307                        }
4308                    }
4309                }
4310            }
4311        }
4312
4313        for (position_id, client_order_id) in assignments {
4314            let Some((venue, strategy_id)) = self.orders.get(&client_order_id).map(|order_cell| {
4315                let mut contingent = order_cell.borrow_mut();
4316                contingent.set_position_id(Some(position_id));
4317                (contingent.instrument_id().venue, contingent.strategy_id())
4318            }) else {
4319                continue;
4320            };
4321
4322            // Re-indexing through `add_position_id` also replays the database write, making the
4323            // recovered assignment durable across another restart.
4324            if let Err(e) =
4325                self.add_position_id(&position_id, &venue, &client_order_id, &strategy_id)
4326            {
4327                log::error!("Failed to re-index {client_order_id} -> {position_id}: {e}");
4328            }
4329        }
4330    }
4331
4332    /// Adds the `position` to the cache.
4333    ///
4334    /// # Errors
4335    ///
4336    /// Returns an error if persisting the position to the backing database fails.
4337    pub fn add_position(&mut self, position: &Position, _oms_type: OmsType) -> anyhow::Result<()> {
4338        self.positions
4339            .insert(position.id, SharedCell::new(position.clone()));
4340        self.index.positions.insert(position.id);
4341        self.index.positions_open.insert(position.id);
4342        self.index.positions_closed.remove(&position.id); // Cleanup for NETTING reopen
4343
4344        log::debug!("Adding {position}");
4345
4346        self.add_position_id(
4347            &position.id,
4348            &position.instrument_id.venue,
4349            &position.opening_order_id,
4350            &position.strategy_id,
4351        )?;
4352
4353        let venue = position.instrument_id.venue;
4354        let venue_positions = self.index.venue_positions.entry(venue).or_default();
4355        venue_positions.insert(position.id);
4356
4357        // Index: InstrumentId -> AHashSet
4358        let instrument_id = position.instrument_id;
4359        let instrument_positions = self
4360            .index
4361            .instrument_positions
4362            .entry(instrument_id)
4363            .or_default();
4364        instrument_positions.insert(position.id);
4365
4366        // Index: AccountId -> AHashSet<PositionId>
4367        self.index
4368            .account_positions
4369            .entry(position.account_id)
4370            .or_default()
4371            .insert(position.id);
4372
4373        if let Some(database) = &mut self.database {
4374            database.add_position(position)?;
4375            // TODO: Implement position snapshots
4376            // if self.snapshot_positions {
4377            //     database.snapshot_position_state(
4378            //         position,
4379            //         position.ts_last,
4380            //         self.calculate_unrealized_pnl(&position),
4381            //     )?;
4382            // }
4383        }
4384
4385        Ok(())
4386    }
4387
4388    /// Updates the `account` in the cache.
4389    ///
4390    /// Reuses the existing cell when present so any held [`AccountRef`] handles continue to point
4391    /// at the canonical entry; only inserts a new cell when the account is unknown.
4392    ///
4393    /// # Errors
4394    ///
4395    /// Returns an error if updating the account in the database fails.
4396    pub fn update_account(&mut self, account: &AccountAny) -> anyhow::Result<()> {
4397        let account_id = account.id();
4398        match self.accounts.get(&account_id) {
4399            Some(account_cell) => *account_cell.borrow_mut() = account.clone(),
4400            None => {
4401                self.accounts
4402                    .insert(account_id, SharedCell::new(account.clone()));
4403            }
4404        }
4405
4406        if let Some(database) = &mut self.database {
4407            database.update_account(account)?;
4408        }
4409        Ok(())
4410    }
4411
4412    /// Removes the `account` from the cache and returns it.
4413    ///
4414    /// This supports hot paths which need owned account mutation without
4415    /// cloning the account event history. The cache is the sole owner of the
4416    /// account cell (the field is private and accessors only hand out
4417    /// lifetime-scoped [`AccountRef`] borrows), so the value is moved out of
4418    /// its cell rather than cloned.
4419    ///
4420    /// # Panics
4421    ///
4422    /// Panics if the cache no longer holds the only strong handle to the
4423    /// account cell. This indicates an internal invariant violation: some
4424    /// component cloned the underlying [`SharedCell`] and held it past the
4425    /// scope of a single cache method.
4426    #[must_use]
4427    pub fn take_account(&mut self, account_id: &AccountId) -> Option<AccountAny> {
4428        self.accounts.remove(account_id).map(|cell| {
4429            let rc: Rc<RefCell<AccountAny>> = cell.into();
4430            Rc::try_unwrap(rc).map_or_else(
4431                |_| panic!("take_account: cache must be sole owner of {account_id} cell"),
4432                RefCell::into_inner,
4433            )
4434        })
4435    }
4436
4437    /// Caches the `account` in memory without updating the database.
4438    pub fn cache_account_owned(&mut self, account: AccountAny) {
4439        let account_id = account.id();
4440        self.index
4441            .venue_account
4442            .insert(account_id.get_issuer(), account_id);
4443        match self.accounts.get(&account_id) {
4444            Some(account_cell) => *account_cell.borrow_mut() = account,
4445            None => {
4446                self.accounts.insert(account_id, SharedCell::new(account));
4447            }
4448        }
4449    }
4450
4451    /// Updates the `account` in the cache, taking ownership of the updated account.
4452    ///
4453    /// # Errors
4454    ///
4455    /// Returns an error if updating the account in the database fails.
4456    pub fn update_account_owned(&mut self, account: AccountAny) -> anyhow::Result<()> {
4457        let account_id = account.id();
4458        self.cache_account_owned(account);
4459
4460        if let Some(database) = &mut self.database {
4461            let Some(account_cell) = self.accounts.get(&account_id) else {
4462                anyhow::bail!("Account {account_id} not found after cache update");
4463            };
4464            database.update_account(&account_cell.borrow())?;
4465        }
4466        Ok(())
4467    }
4468
4469    /// Applies an account state event to the cached account.
4470    ///
4471    /// Mutates the cached account in place to avoid cloning the account event
4472    /// history on the hot path; long-running sessions accumulate many events
4473    /// per account, so a snapshot-clone here would be O(history) per update.
4474    ///
4475    /// # Errors
4476    ///
4477    /// Returns an error if applying or persisting the account state fails.
4478    pub fn update_account_state(&mut self, event: &AccountState) -> anyhow::Result<()> {
4479        let Some(cell) = self.accounts.get(&event.account_id) else {
4480            return self.add_account(AccountAny::from_events(std::slice::from_ref(event))?);
4481        };
4482
4483        cell.borrow_mut().apply(event.clone())?;
4484
4485        if let Some(database) = &mut self.database {
4486            database.update_account(&cell.borrow())?;
4487        }
4488        Ok(())
4489    }
4490
4491    /// Replaces the cached `order` from a non-event snapshot.
4492    ///
4493    /// Prefer [`Self::update_order`] for lifecycle state changes. Use this only for order state
4494    /// that is not represented by [`OrderEventAny`].
4495    ///
4496    /// # Errors
4497    ///
4498    /// Returns an error if updating the order indexes or database fails.
4499    pub fn replace_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
4500        self.refresh_order(order)?;
4501
4502        let client_order_id = order.client_order_id();
4503        match self.orders.get(&client_order_id) {
4504            // Reuse the existing cell so the canonical entry stays in place rather than
4505            // orphaning a stale cell.
4506            Some(order_cell) => *order_cell.borrow_mut() = order.clone(),
4507            None => {
4508                self.orders
4509                    .insert(client_order_id, SharedCell::new(order.clone()));
4510            }
4511        }
4512
4513        Ok(())
4514    }
4515
4516    /// Updates the cached order by applying an event and refreshing derived cache state.
4517    ///
4518    /// # Errors
4519    ///
4520    /// Returns an error if the order is not found or rejects the event.
4521    pub fn update_order(&mut self, event: &OrderEventAny) -> anyhow::Result<OrderAny> {
4522        let event_client_order_id = event.client_order_id();
4523        let client_order_id = if self.order_exists(&event_client_order_id) {
4524            event_client_order_id
4525        } else if let Some(venue_order_id) = event.venue_order_id() {
4526            self.index
4527                .venue_order_ids
4528                .get(&venue_order_id)
4529                .copied()
4530                .ok_or(OrderError::NotFound(event_client_order_id))?
4531        } else {
4532            return Err(OrderError::NotFound(event_client_order_id).into());
4533        };
4534
4535        let order_cell = self
4536            .orders
4537            .get(&client_order_id)
4538            .cloned()
4539            .ok_or(OrderError::NotFound(client_order_id))?;
4540
4541        // Apply on a snapshot first so a fallible `apply` (e.g. invalid state
4542        // transition) leaves the canonical cell untouched. On success we swap the
4543        // post-event value back into the cell so subsequent reads see the new state.
4544        let mut snapshot = order_cell.borrow().clone();
4545        snapshot.apply(event.clone())?;
4546        *order_cell.borrow_mut() = snapshot.clone();
4547
4548        if let Err(e) = self.refresh_order(&snapshot) {
4549            log::error!("Error updating order in cache: {e}");
4550        }
4551
4552        Ok(snapshot)
4553    }
4554
4555    fn refresh_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
4556        let client_order_id = order.client_order_id();
4557
4558        if order.is_active_local() {
4559            self.index.orders_active_local.insert(client_order_id);
4560        } else {
4561            self.index.orders_active_local.remove(&client_order_id);
4562        }
4563
4564        // Update venue order ID
4565        if let Some(venue_order_id) = order.venue_order_id() {
4566            // If the order is being modified then we allow a changing `VenueOrderId` to accommodate
4567            // venues which use a cancel+replace update strategy.
4568            if !self.index.venue_order_ids.contains_key(&venue_order_id) {
4569                let overwrite = matches!(order.last_event(), OrderEventAny::Updated(_));
4570                if let Err(e) =
4571                    self.add_venue_order_id(&order.client_order_id(), &venue_order_id, overwrite)
4572                {
4573                    log::error!("Error indexing venue order ID in cache: {e}");
4574                }
4575            }
4576        }
4577
4578        // Update in-flight state
4579        if order.is_inflight() {
4580            self.index.orders_inflight.insert(client_order_id);
4581        } else {
4582            self.index.orders_inflight.remove(&client_order_id);
4583        }
4584
4585        // Update open/closed state
4586        if order.is_open() {
4587            self.index.orders_closed.remove(&client_order_id);
4588            self.index.orders_open.insert(client_order_id);
4589        } else if order.is_closed() {
4590            self.index.orders_open.remove(&client_order_id);
4591            self.index.orders_pending_cancel.remove(&client_order_id);
4592            self.index.orders_closed.insert(client_order_id);
4593        }
4594
4595        // Update emulation index
4596        if let Some(emulation_trigger) = order.emulation_trigger()
4597            && emulation_trigger != TriggerType::NoTrigger
4598            && !order.is_closed()
4599        {
4600            self.index.orders_emulated.insert(client_order_id);
4601        } else {
4602            self.index.orders_emulated.remove(&client_order_id);
4603        }
4604
4605        // Update account orders index when account_id becomes available
4606        if let Some(account_id) = order.account_id() {
4607            self.index
4608                .account_orders
4609                .entry(account_id)
4610                .or_default()
4611                .insert(client_order_id);
4612        }
4613
4614        // Update own book
4615        if !self.own_books.is_empty() {
4616            let own_book = self.own_order_book(&order.instrument_id());
4617            if (own_book.is_some() && order.is_closed()) || should_handle_own_book_order(order) {
4618                self.update_own_order_book(order);
4619            }
4620        }
4621
4622        if let Some(database) = &mut self.database {
4623            database.update_order(order.last_event())?;
4624            // TODO: Implement order snapshots
4625            // if self.snapshot_orders {
4626            //     database.snapshot_order_state(order)?;
4627            // }
4628        }
4629
4630        Ok(())
4631    }
4632
4633    /// Updates the `order` as pending cancel locally.
4634    pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
4635        self.index
4636            .orders_pending_cancel
4637            .insert(order.client_order_id());
4638    }
4639
4640    /// Updates the `position` in the cache.
4641    ///
4642    /// Reuses the existing cell when present so any held [`PositionRef`] handles continue to point
4643    /// at the canonical entry; only inserts a new cell when the position is unknown.
4644    ///
4645    /// # Errors
4646    ///
4647    /// Returns an error if updating the position in the database fails.
4648    pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
4649        // Update open/closed state
4650
4651        if position.is_open() {
4652            self.index.positions_open.insert(position.id);
4653            self.index.positions_closed.remove(&position.id);
4654        } else {
4655            self.index.positions_closed.insert(position.id);
4656            self.index.positions_open.remove(&position.id);
4657        }
4658
4659        if let Some(database) = &mut self.database {
4660            database.update_position(position)?;
4661            // TODO: Implement order snapshots
4662            // if self.snapshot_orders {
4663            //     database.snapshot_order_state(order)?;
4664            // }
4665        }
4666
4667        match self.positions.get(&position.id) {
4668            Some(position_cell) => *position_cell.borrow_mut() = position.clone(),
4669            None => {
4670                self.positions
4671                    .insert(position.id, SharedCell::new(position.clone()));
4672            }
4673        }
4674
4675        Ok(())
4676    }
4677
4678    /// Creates a snapshot of the `position` by cloning it, assigning a new ID,
4679    /// serializing it, and storing it in the position snapshots.
4680    ///
4681    /// # Errors
4682    ///
4683    /// Returns an error if serializing or storing the position snapshot fails.
4684    pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<CacheSnapshotRef> {
4685        let position_id = position.id;
4686
4687        let mut copied_position = position.clone();
4688        let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
4689        copied_position.id = PositionId::new(new_id);
4690
4691        // Serialize the position (TODO: temporarily just to JSON to remove a dependency)
4692        let position_serialized = serde_json::to_vec(&copied_position)?;
4693        let snapshot_index = self.position_snapshot_count(&position_id);
4694        let blob_ref = format!(
4695            "cache://position-snapshots/{}/{}",
4696            position_id.as_str(),
4697            snapshot_index,
4698        );
4699        let snapshot_blob = Bytes::from(position_serialized);
4700
4701        self.add(&blob_ref, snapshot_blob.clone())?;
4702        self.position_snapshots
4703            .entry(position_id)
4704            .or_default()
4705            .push(snapshot_blob.clone());
4706
4707        log::debug!("Snapshot {copied_position}");
4708        Ok(CacheSnapshotRef::new(blob_ref, snapshot_blob))
4709    }
4710
4711    /// Loads the cache-owned snapshot blob stored under `blob_ref`.
4712    ///
4713    /// The cache first checks in-memory snapshot state. When the blob is not present and a
4714    /// database adapter exists, the generic cache entries are loaded and checked for the same
4715    /// opaque reference.
4716    ///
4717    /// # Errors
4718    ///
4719    /// Returns an error if loading generic cache entries from the backing database fails.
4720    pub fn load_snapshot_blob(&mut self, blob_ref: &str) -> anyhow::Result<Option<Bytes>> {
4721        if let Some(blob) = self.snapshot_blob(blob_ref) {
4722            return Ok(Some(blob));
4723        }
4724
4725        if self.database.is_some() {
4726            self.cache_general()?;
4727        }
4728
4729        Ok(self.snapshot_blob(blob_ref))
4730    }
4731
4732    /// Restores the cache-owned snapshot blob stored under `blob_ref`.
4733    ///
4734    /// Only cache-owned `cache://position-snapshots/...` blobs are currently supported.
4735    ///
4736    /// # Errors
4737    ///
4738    /// Returns an error if the blob reference is unsupported, malformed, skips earlier
4739    /// snapshot frames, conflicts with an existing frame, or does not decode to the expected
4740    /// position snapshot.
4741    pub fn restore_snapshot_blob(&mut self, blob_ref: &str, blob: Bytes) -> anyhow::Result<()> {
4742        let (position_id, snapshot_index) = parse_position_snapshot_blob_ref(blob_ref)?;
4743        validate_position_snapshot_blob(&position_id, blob.as_ref())?;
4744
4745        let frames = self.position_snapshots.entry(position_id).or_default();
4746        match frames.get(snapshot_index) {
4747            Some(existing) if existing == &blob => {}
4748            Some(_) => {
4749                anyhow::bail!(
4750                    "position snapshot frame {snapshot_index} for {position_id} already exists with different bytes"
4751                );
4752            }
4753            None if frames.len() == snapshot_index => frames.push(blob.clone()),
4754            None => {
4755                anyhow::bail!(
4756                    "position snapshot blob_ref {blob_ref} skips missing frame {}",
4757                    frames.len()
4758                );
4759            }
4760        }
4761
4762        self.general.insert(blob_ref.to_string(), blob);
4763        Ok(())
4764    }
4765
4766    fn snapshot_blob(&self, blob_ref: &str) -> Option<Bytes> {
4767        if let Some(blob) = self.general.get(blob_ref) {
4768            return Some(blob.clone());
4769        }
4770
4771        let (position_id, snapshot_index) = parse_position_snapshot_blob_ref(blob_ref).ok()?;
4772        self.position_snapshots
4773            .get(&position_id)
4774            .and_then(|frames| frames.get(snapshot_index))
4775            .cloned()
4776    }
4777
4778    /// Creates a snapshot of the `position` state in the database.
4779    ///
4780    /// # Errors
4781    ///
4782    /// Returns an error if snapshotting the position state fails.
4783    pub fn snapshot_position_state(
4784        &mut self,
4785        position: &Position,
4786        ts_snapshot: UnixNanos,
4787        unrealized_pnl: Option<Money>,
4788        open_only: Option<bool>,
4789    ) -> anyhow::Result<()> {
4790        let open_only = open_only.unwrap_or(true);
4791
4792        if open_only && !position.is_open() {
4793            return Ok(());
4794        }
4795
4796        if let Some(database) = &mut self.database {
4797            database
4798                .snapshot_position_state(position, ts_snapshot, unrealized_pnl)
4799                .map_err(|e| {
4800                    log::error!(
4801                        "Failed to snapshot position state for {}: {e:?}",
4802                        position.id
4803                    );
4804                    e
4805                })?;
4806        } else {
4807            log::warn!(
4808                "Cannot snapshot position state for {} (no database configured)",
4809                position.id
4810            );
4811        }
4812
4813        Ok(())
4814    }
4815
4816    /// Gets the OMS type for the `position_id`.
4817    #[must_use]
4818    pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
4819        // Get OMS type from the index
4820        if self.index.position_strategy.contains_key(position_id) {
4821            // For now, we'll default to NETTING
4822            // TODO: Store and retrieve actual OMS type per position
4823            Some(OmsType::Netting)
4824        } else {
4825            None
4826        }
4827    }
4828
4829    /// Gets the serialized position snapshot frames for the `position_id`.
4830    ///
4831    /// Each element in the returned vector is one JSON-encoded [`Position`] snapshot,
4832    /// in the order they were taken.
4833    #[must_use]
4834    pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<Vec<u8>>> {
4835        self.position_snapshots
4836            .get(position_id)
4837            .map(|frames| frames.iter().map(|b| b.to_vec()).collect())
4838    }
4839
4840    /// Returns the number of stored snapshot frames for the `position_id`.
4841    ///
4842    /// Returns `0` when no frames are stored. Does not allocate or copy frame bytes.
4843    #[must_use]
4844    pub fn position_snapshot_count(&self, position_id: &PositionId) -> usize {
4845        self.position_snapshots.get(position_id).map_or(0, Vec::len)
4846    }
4847
4848    /// Returns all position snapshots with the given optional filters.
4849    ///
4850    /// When `position_id` is `Some`, only snapshots for that position are returned.
4851    /// When `account_id` is `Some`, snapshots are filtered to that account.
4852    /// Frames that fail to deserialize are skipped with a warning.
4853    #[must_use]
4854    pub fn position_snapshots(
4855        &self,
4856        position_id: Option<&PositionId>,
4857        account_id: Option<&AccountId>,
4858    ) -> Vec<Position> {
4859        let frames: Box<dyn Iterator<Item = &Bytes> + '_> = match position_id {
4860            Some(pid) => match self.position_snapshots.get(pid) {
4861                Some(v) => Box::new(v.iter()),
4862                None => Box::new(std::iter::empty()),
4863            },
4864            None => Box::new(self.position_snapshots.values().flat_map(|v| v.iter())),
4865        };
4866
4867        let mut results: Vec<Position> = frames
4868            .filter_map(|bytes| match serde_json::from_slice::<Position>(bytes) {
4869                Ok(position) => Some(position),
4870                Err(e) => {
4871                    log::warn!("Failed to decode position snapshot: {e}");
4872                    None
4873                }
4874            })
4875            .collect();
4876
4877        if let Some(aid) = account_id {
4878            results.retain(|p| p.account_id == *aid);
4879        }
4880
4881        results
4882    }
4883
4884    /// Returns position snapshots for `position_id` starting from the `skip`th frame.
4885    ///
4886    /// Use this to deserialize only newly appended snapshots when the caller already
4887    /// processed earlier frames. Returns an empty vector when no frames or fewer than
4888    /// `skip` frames are stored. Frames that fail to deserialize are skipped with a warning.
4889    #[must_use]
4890    pub fn position_snapshots_from(&self, position_id: &PositionId, skip: usize) -> Vec<Position> {
4891        let Some(frames) = self.position_snapshots.get(position_id) else {
4892            return Vec::new();
4893        };
4894
4895        frames
4896            .iter()
4897            .skip(skip)
4898            .filter_map(|bytes| match serde_json::from_slice::<Position>(bytes) {
4899                Ok(position) => Some(position),
4900                Err(e) => {
4901                    log::warn!("Failed to decode position snapshot: {e}");
4902                    None
4903                }
4904            })
4905            .collect()
4906    }
4907
4908    /// Gets position snapshot IDs for the `instrument_id`.
4909    #[must_use]
4910    pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> AHashSet<PositionId> {
4911        // Get snapshot position IDs that match the instrument
4912        let mut result = AHashSet::new();
4913
4914        for (position_id, _) in &self.position_snapshots {
4915            // Check if this position is for the requested instrument
4916            if let Some(position_cell) = self.positions.get(position_id)
4917                && position_cell.borrow().instrument_id == *instrument_id
4918            {
4919                result.insert(*position_id);
4920            }
4921        }
4922        result
4923    }
4924
4925    /// Snapshots the `order` state in the database.
4926    ///
4927    /// # Errors
4928    ///
4929    /// Returns an error if snapshotting the order state fails.
4930    pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
4931        let Some(database) = &self.database else {
4932            log::warn!(
4933                "Cannot snapshot order state for {} (no database configured)",
4934                order.client_order_id()
4935            );
4936            return Ok(());
4937        };
4938
4939        database.snapshot_order_state(order)
4940    }
4941
4942    // -- IDENTIFIER QUERIES ----------------------------------------------------------------------
4943
4944    // Collects references to the index sets that constrain an order query.
4945    //
4946    // Returns:
4947    // - `FilterSources::Unfiltered` when no filter is provided (the caller should iterate
4948    //   the full bucket).
4949    // - `FilterSources::Empty` when a filter is provided but the index has no entry for it
4950    //   (the resolved set is unconditionally empty, no further work needed).
4951    // - `FilterSources::Sets` with borrowed references to each filter source set.
4952    fn collect_order_filter_sources<'a>(
4953        &'a self,
4954        venue: Option<&Venue>,
4955        instrument_id: Option<&InstrumentId>,
4956        strategy_id: Option<&StrategyId>,
4957        account_id: Option<&AccountId>,
4958    ) -> FilterSources<'a, ClientOrderId> {
4959        let mut sources: Vec<&AHashSet<ClientOrderId>> = Vec::with_capacity(4);
4960
4961        if let Some(venue) = venue {
4962            match self.index.venue_orders.get(venue) {
4963                Some(set) => sources.push(set),
4964                None => return FilterSources::Empty,
4965            }
4966        }
4967
4968        if let Some(instrument_id) = instrument_id {
4969            match self.index.instrument_orders.get(instrument_id) {
4970                Some(set) => sources.push(set),
4971                None => return FilterSources::Empty,
4972            }
4973        }
4974
4975        if let Some(strategy_id) = strategy_id {
4976            match self.index.strategy_orders.get(strategy_id) {
4977                Some(set) => sources.push(set),
4978                None => return FilterSources::Empty,
4979            }
4980        }
4981
4982        if let Some(account_id) = account_id {
4983            match self.index.account_orders.get(account_id) {
4984                Some(set) => sources.push(set),
4985                None => return FilterSources::Empty,
4986            }
4987        }
4988
4989        if sources.is_empty() {
4990            FilterSources::Unfiltered
4991        } else {
4992            FilterSources::Sets(sources)
4993        }
4994    }
4995
4996    fn collect_position_filter_sources<'a>(
4997        &'a self,
4998        venue: Option<&Venue>,
4999        instrument_id: Option<&InstrumentId>,
5000        strategy_id: Option<&StrategyId>,
5001        account_id: Option<&AccountId>,
5002    ) -> FilterSources<'a, PositionId> {
5003        let mut sources: Vec<&AHashSet<PositionId>> = Vec::with_capacity(4);
5004
5005        if let Some(venue) = venue {
5006            match self.index.venue_positions.get(venue) {
5007                Some(set) => sources.push(set),
5008                None => return FilterSources::Empty,
5009            }
5010        }
5011
5012        if let Some(instrument_id) = instrument_id {
5013            match self.index.instrument_positions.get(instrument_id) {
5014                Some(set) => sources.push(set),
5015                None => return FilterSources::Empty,
5016            }
5017        }
5018
5019        if let Some(strategy_id) = strategy_id {
5020            match self.index.strategy_positions.get(strategy_id) {
5021                Some(set) => sources.push(set),
5022                None => return FilterSources::Empty,
5023            }
5024        }
5025
5026        if let Some(account_id) = account_id {
5027            match self.index.account_positions.get(account_id) {
5028                Some(set) => sources.push(set),
5029                None => return FilterSources::Empty,
5030            }
5031        }
5032
5033        if sources.is_empty() {
5034            FilterSources::Unfiltered
5035        } else {
5036            FilterSources::Sets(sources)
5037        }
5038    }
5039
5040    // Materializes the `ClientOrderId`s in `bucket` matching the optional filter parameters.
5041    //
5042    // Folds the bucket into the filter sources and runs a single size-ordered intersection,
5043    // avoiding the legacy two-step build-filter-set + bucket-intersection that allocated and
5044    // rehashed twice.
5045    fn query_orders_in_bucket(
5046        &self,
5047        bucket: &AHashSet<ClientOrderId>,
5048        venue: Option<&Venue>,
5049        instrument_id: Option<&InstrumentId>,
5050        strategy_id: Option<&StrategyId>,
5051        account_id: Option<&AccountId>,
5052    ) -> AHashSet<ClientOrderId> {
5053        match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
5054            FilterSources::Empty => AHashSet::new(),
5055            FilterSources::Unfiltered => bucket.clone(),
5056            FilterSources::Sets(sources) => intersect_pair_or_many(bucket, sources),
5057        }
5058    }
5059
5060    fn query_positions_in_bucket(
5061        &self,
5062        bucket: &AHashSet<PositionId>,
5063        venue: Option<&Venue>,
5064        instrument_id: Option<&InstrumentId>,
5065        strategy_id: Option<&StrategyId>,
5066        account_id: Option<&AccountId>,
5067    ) -> AHashSet<PositionId> {
5068        match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
5069            FilterSources::Empty => AHashSet::new(),
5070            FilterSources::Unfiltered => bucket.clone(),
5071            FilterSources::Sets(sources) => intersect_pair_or_many(bucket, sources),
5072        }
5073    }
5074
5075    // Returns a borrowed or owned view of the orders in `bucket` matching the optional filter
5076    // parameters. Avoids cloning the bucket when no filter narrows it.
5077    fn view_orders_in_bucket<'a>(
5078        &'a self,
5079        bucket: &'a AHashSet<ClientOrderId>,
5080        venue: Option<&Venue>,
5081        instrument_id: Option<&InstrumentId>,
5082        strategy_id: Option<&StrategyId>,
5083        account_id: Option<&AccountId>,
5084    ) -> Cow<'a, AHashSet<ClientOrderId>> {
5085        match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
5086            FilterSources::Empty => Cow::Owned(AHashSet::new()),
5087            FilterSources::Unfiltered => Cow::Borrowed(bucket),
5088            FilterSources::Sets(sources) => Cow::Owned(intersect_pair_or_many(bucket, sources)),
5089        }
5090    }
5091
5092    fn view_positions_in_bucket<'a>(
5093        &'a self,
5094        bucket: &'a AHashSet<PositionId>,
5095        venue: Option<&Venue>,
5096        instrument_id: Option<&InstrumentId>,
5097        strategy_id: Option<&StrategyId>,
5098        account_id: Option<&AccountId>,
5099    ) -> Cow<'a, AHashSet<PositionId>> {
5100        match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
5101            FilterSources::Empty => Cow::Owned(AHashSet::new()),
5102            FilterSources::Unfiltered => Cow::Borrowed(bucket),
5103            FilterSources::Sets(sources) => Cow::Owned(intersect_pair_or_many(bucket, sources)),
5104        }
5105    }
5106
5107    // Returns a lazy iterator yielding the [`ClientOrderId`]s in `bucket` matching the optional
5108    // filter parameters. Avoids any [`Vec`] or [`AHashSet`] materialization in the result path,
5109    // and (for multi-filter calls) drives intersection from the smallest source while looking
5110    // up membership in the rest.
5111    fn iter_orders_in_bucket<'a>(
5112        &'a self,
5113        bucket: &'a AHashSet<ClientOrderId>,
5114        venue: Option<&Venue>,
5115        instrument_id: Option<&InstrumentId>,
5116        strategy_id: Option<&StrategyId>,
5117        account_id: Option<&AccountId>,
5118    ) -> Box<dyn Iterator<Item = ClientOrderId> + 'a> {
5119        match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
5120            FilterSources::Empty => Box::new(std::iter::empty()),
5121            FilterSources::Unfiltered => Box::new(bucket.iter().copied()),
5122            FilterSources::Sets(mut sources) => {
5123                sources.push(bucket);
5124                sources.sort_unstable_by_key(|s| s.len());
5125                let driver = sources[0];
5126                let rest: Vec<&'a AHashSet<ClientOrderId>> = sources[1..].to_vec();
5127                Box::new(
5128                    driver
5129                        .iter()
5130                        .copied()
5131                        .filter(move |id| rest.iter().all(|s| s.contains(id))),
5132                )
5133            }
5134        }
5135    }
5136
5137    fn iter_positions_in_bucket<'a>(
5138        &'a self,
5139        bucket: &'a AHashSet<PositionId>,
5140        venue: Option<&Venue>,
5141        instrument_id: Option<&InstrumentId>,
5142        strategy_id: Option<&StrategyId>,
5143        account_id: Option<&AccountId>,
5144    ) -> Box<dyn Iterator<Item = PositionId> + 'a> {
5145        match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
5146            FilterSources::Empty => Box::new(std::iter::empty()),
5147            FilterSources::Unfiltered => Box::new(bucket.iter().copied()),
5148            FilterSources::Sets(mut sources) => {
5149                sources.push(bucket);
5150                sources.sort_unstable_by_key(|s| s.len());
5151                let driver = sources[0];
5152                let rest: Vec<&'a AHashSet<PositionId>> = sources[1..].to_vec();
5153                Box::new(
5154                    driver
5155                        .iter()
5156                        .copied()
5157                        .filter(move |id| rest.iter().all(|s| s.contains(id))),
5158                )
5159            }
5160        }
5161    }
5162
5163    // Counts orders in `bucket` matching the optional filter parameters.
5164    //
5165    // Drives intersection from the smallest filter source (or the bucket itself when no filter
5166    // is provided) and short-circuits by counting rather than collecting. With a side filter,
5167    // each candidate order is borrowed via its cell only long enough to inspect the side.
5168    fn count_orders_in_bucket(
5169        &self,
5170        bucket: &AHashSet<ClientOrderId>,
5171        venue: Option<&Venue>,
5172        instrument_id: Option<&InstrumentId>,
5173        strategy_id: Option<&StrategyId>,
5174        account_id: Option<&AccountId>,
5175        side: Option<OrderSide>,
5176    ) -> usize {
5177        let side = side.unwrap_or(OrderSide::NoOrderSide);
5178
5179        match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
5180            FilterSources::Empty => 0,
5181            FilterSources::Unfiltered => {
5182                if side == OrderSide::NoOrderSide {
5183                    bucket.len()
5184                } else {
5185                    bucket
5186                        .iter()
5187                        .filter(|id| self.order_side_matches(id, side))
5188                        .count()
5189                }
5190            }
5191            FilterSources::Sets(mut sources) => {
5192                sources.push(bucket);
5193                sources.sort_unstable_by_key(|s| s.len());
5194                let driver = sources[0];
5195                let rest = &sources[1..];
5196
5197                driver
5198                    .iter()
5199                    .filter(|id| rest.iter().all(|s| s.contains(id)))
5200                    .filter(|id| {
5201                        side == OrderSide::NoOrderSide || self.order_side_matches(id, side)
5202                    })
5203                    .count()
5204            }
5205        }
5206    }
5207
5208    fn count_positions_in_bucket(
5209        &self,
5210        bucket: &AHashSet<PositionId>,
5211        venue: Option<&Venue>,
5212        instrument_id: Option<&InstrumentId>,
5213        strategy_id: Option<&StrategyId>,
5214        account_id: Option<&AccountId>,
5215        side: Option<PositionSide>,
5216    ) -> usize {
5217        let side = side.unwrap_or(PositionSide::NoPositionSide);
5218
5219        match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
5220            FilterSources::Empty => 0,
5221            FilterSources::Unfiltered => {
5222                if side == PositionSide::NoPositionSide {
5223                    bucket.len()
5224                } else {
5225                    bucket
5226                        .iter()
5227                        .filter(|id| self.position_side_matches(id, side))
5228                        .count()
5229                }
5230            }
5231            FilterSources::Sets(mut sources) => {
5232                sources.push(bucket);
5233                sources.sort_unstable_by_key(|s| s.len());
5234                let driver = sources[0];
5235                let rest = &sources[1..];
5236
5237                driver
5238                    .iter()
5239                    .filter(|id| rest.iter().all(|s| s.contains(id)))
5240                    .filter(|id| {
5241                        side == PositionSide::NoPositionSide || self.position_side_matches(id, side)
5242                    })
5243                    .count()
5244            }
5245        }
5246    }
5247
5248    // Returns whether any order in `bucket` matches the optional filter parameters.
5249    //
5250    // Mirrors `count_orders_in_bucket` but short-circuits on the first match. Useful for
5251    // `is_empty`-style gating in hot paths where the caller only needs to know whether at
5252    // least one matching order exists.
5253    fn any_orders_in_bucket(
5254        &self,
5255        bucket: &AHashSet<ClientOrderId>,
5256        venue: Option<&Venue>,
5257        instrument_id: Option<&InstrumentId>,
5258        strategy_id: Option<&StrategyId>,
5259        account_id: Option<&AccountId>,
5260        side: Option<OrderSide>,
5261    ) -> bool {
5262        let side = side.unwrap_or(OrderSide::NoOrderSide);
5263
5264        match self.collect_order_filter_sources(venue, instrument_id, strategy_id, account_id) {
5265            FilterSources::Empty => false,
5266            FilterSources::Unfiltered => {
5267                if side == OrderSide::NoOrderSide {
5268                    !bucket.is_empty()
5269                } else {
5270                    bucket.iter().any(|id| self.order_side_matches(id, side))
5271                }
5272            }
5273            FilterSources::Sets(mut sources) => {
5274                sources.push(bucket);
5275                sources.sort_unstable_by_key(|s| s.len());
5276                let driver = sources[0];
5277                let rest = &sources[1..];
5278
5279                driver
5280                    .iter()
5281                    .filter(|id| rest.iter().all(|s| s.contains(id)))
5282                    .any(|id| side == OrderSide::NoOrderSide || self.order_side_matches(id, side))
5283            }
5284        }
5285    }
5286
5287    fn any_positions_in_bucket(
5288        &self,
5289        bucket: &AHashSet<PositionId>,
5290        venue: Option<&Venue>,
5291        instrument_id: Option<&InstrumentId>,
5292        strategy_id: Option<&StrategyId>,
5293        account_id: Option<&AccountId>,
5294        side: Option<PositionSide>,
5295    ) -> bool {
5296        let side = side.unwrap_or(PositionSide::NoPositionSide);
5297
5298        match self.collect_position_filter_sources(venue, instrument_id, strategy_id, account_id) {
5299            FilterSources::Empty => false,
5300            FilterSources::Unfiltered => {
5301                if side == PositionSide::NoPositionSide {
5302                    !bucket.is_empty()
5303                } else {
5304                    bucket.iter().any(|id| self.position_side_matches(id, side))
5305                }
5306            }
5307            FilterSources::Sets(mut sources) => {
5308                sources.push(bucket);
5309                sources.sort_unstable_by_key(|s| s.len());
5310                let driver = sources[0];
5311                let rest = &sources[1..];
5312
5313                driver
5314                    .iter()
5315                    .filter(|id| rest.iter().all(|s| s.contains(id)))
5316                    .any(|id| {
5317                        side == PositionSide::NoPositionSide || self.position_side_matches(id, side)
5318                    })
5319            }
5320        }
5321    }
5322
5323    fn order_side_matches(&self, client_order_id: &ClientOrderId, side: OrderSide) -> bool {
5324        self.orders
5325            .get(client_order_id)
5326            .is_some_and(|cell| cell.borrow().order_side() == side)
5327    }
5328
5329    fn position_side_matches(&self, position_id: &PositionId, side: PositionSide) -> bool {
5330        self.positions
5331            .get(position_id)
5332            .is_some_and(|cell| cell.borrow().side == side)
5333    }
5334
5335    /// Retrieves orders corresponding to the `client_order_ids`, optionally filtering by `side`.
5336    ///
5337    /// # Panics
5338    ///
5339    /// Panics if any `client_order_id` in the set is not found in the cache.
5340    fn get_orders_for_ids(
5341        &self,
5342        client_order_ids: &AHashSet<ClientOrderId>,
5343        side: Option<OrderSide>,
5344    ) -> Vec<OrderRef<'_>> {
5345        let side = side.unwrap_or(OrderSide::NoOrderSide);
5346        let mut orders = Vec::new();
5347
5348        for client_order_id in client_order_ids {
5349            let order_cell = self
5350                .orders
5351                .get(client_order_id)
5352                .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
5353            let order = OrderRef::new(order_cell.borrow());
5354
5355            if side == OrderSide::NoOrderSide || side == order.order_side() {
5356                orders.push(order);
5357            }
5358        }
5359
5360        // Sort so callers receive a deterministic Vec across runs; the
5361        // underlying client_order_ids set is AHash-backed.
5362        orders.sort_by_key(|o| o.client_order_id());
5363        orders
5364    }
5365
5366    /// Retrieves positions corresponding to the `position_ids`, optionally filtering by `side`.
5367    ///
5368    /// Each [`PositionRef`] in the returned vector borrows its underlying cell; mutating any of
5369    /// those positions while the vector is alive will panic at runtime. Drop the vector before
5370    /// issuing writes.
5371    ///
5372    /// # Panics
5373    ///
5374    /// Panics if any `position_id` in the set is not found in the cache.
5375    fn get_positions_for_ids(
5376        &self,
5377        position_ids: &AHashSet<PositionId>,
5378        side: Option<PositionSide>,
5379    ) -> Vec<PositionRef<'_>> {
5380        let side = side.unwrap_or(PositionSide::NoPositionSide);
5381        let mut positions = Vec::new();
5382
5383        for position_id in position_ids {
5384            let position_cell = self
5385                .positions
5386                .get(position_id)
5387                .unwrap_or_else(|| panic!("Position {position_id} not found"));
5388            let position = PositionRef::new(position_cell.borrow());
5389
5390            if side == PositionSide::NoPositionSide || side == position.side {
5391                positions.push(position);
5392            }
5393        }
5394
5395        // Sort so callers receive a deterministic Vec across runs; the
5396        // underlying position_ids set is AHash-backed.
5397        positions.sort_by_key(|p| p.id);
5398        positions
5399    }
5400
5401    /// Returns the `ClientOrderId`s of all orders.
5402    #[must_use]
5403    pub fn client_order_ids(
5404        &self,
5405        venue: Option<&Venue>,
5406        instrument_id: Option<&InstrumentId>,
5407        strategy_id: Option<&StrategyId>,
5408        account_id: Option<&AccountId>,
5409    ) -> AHashSet<ClientOrderId> {
5410        self.query_orders_in_bucket(
5411            &self.index.orders,
5412            venue,
5413            instrument_id,
5414            strategy_id,
5415            account_id,
5416        )
5417    }
5418
5419    /// Returns the `ClientOrderId`s of all open orders.
5420    #[must_use]
5421    pub fn client_order_ids_open(
5422        &self,
5423        venue: Option<&Venue>,
5424        instrument_id: Option<&InstrumentId>,
5425        strategy_id: Option<&StrategyId>,
5426        account_id: Option<&AccountId>,
5427    ) -> AHashSet<ClientOrderId> {
5428        self.query_orders_in_bucket(
5429            &self.index.orders_open,
5430            venue,
5431            instrument_id,
5432            strategy_id,
5433            account_id,
5434        )
5435    }
5436
5437    /// Returns the `ClientOrderId`s of all closed orders.
5438    #[must_use]
5439    pub fn client_order_ids_closed(
5440        &self,
5441        venue: Option<&Venue>,
5442        instrument_id: Option<&InstrumentId>,
5443        strategy_id: Option<&StrategyId>,
5444        account_id: Option<&AccountId>,
5445    ) -> AHashSet<ClientOrderId> {
5446        self.query_orders_in_bucket(
5447            &self.index.orders_closed,
5448            venue,
5449            instrument_id,
5450            strategy_id,
5451            account_id,
5452        )
5453    }
5454
5455    /// Returns the `ClientOrderId`s of all locally active orders.
5456    ///
5457    /// Locally active orders are in the `INITIALIZED`, `EMULATED`, or `RELEASED` state
5458    /// (a superset of emulated orders).
5459    #[must_use]
5460    pub fn client_order_ids_active_local(
5461        &self,
5462        venue: Option<&Venue>,
5463        instrument_id: Option<&InstrumentId>,
5464        strategy_id: Option<&StrategyId>,
5465        account_id: Option<&AccountId>,
5466    ) -> AHashSet<ClientOrderId> {
5467        self.query_orders_in_bucket(
5468            &self.index.orders_active_local,
5469            venue,
5470            instrument_id,
5471            strategy_id,
5472            account_id,
5473        )
5474    }
5475
5476    /// Returns the `ClientOrderId`s of all emulated orders.
5477    #[must_use]
5478    pub fn client_order_ids_emulated(
5479        &self,
5480        venue: Option<&Venue>,
5481        instrument_id: Option<&InstrumentId>,
5482        strategy_id: Option<&StrategyId>,
5483        account_id: Option<&AccountId>,
5484    ) -> AHashSet<ClientOrderId> {
5485        self.query_orders_in_bucket(
5486            &self.index.orders_emulated,
5487            venue,
5488            instrument_id,
5489            strategy_id,
5490            account_id,
5491        )
5492    }
5493
5494    /// Returns the `ClientOrderId`s of all in-flight orders.
5495    #[must_use]
5496    pub fn client_order_ids_inflight(
5497        &self,
5498        venue: Option<&Venue>,
5499        instrument_id: Option<&InstrumentId>,
5500        strategy_id: Option<&StrategyId>,
5501        account_id: Option<&AccountId>,
5502    ) -> AHashSet<ClientOrderId> {
5503        self.query_orders_in_bucket(
5504            &self.index.orders_inflight,
5505            venue,
5506            instrument_id,
5507            strategy_id,
5508            account_id,
5509        )
5510    }
5511
5512    /// Returns `PositionId`s of all positions.
5513    #[must_use]
5514    pub fn position_ids(
5515        &self,
5516        venue: Option<&Venue>,
5517        instrument_id: Option<&InstrumentId>,
5518        strategy_id: Option<&StrategyId>,
5519        account_id: Option<&AccountId>,
5520    ) -> AHashSet<PositionId> {
5521        self.query_positions_in_bucket(
5522            &self.index.positions,
5523            venue,
5524            instrument_id,
5525            strategy_id,
5526            account_id,
5527        )
5528    }
5529
5530    /// Returns the `PositionId`s of all open positions.
5531    #[must_use]
5532    pub fn position_open_ids(
5533        &self,
5534        venue: Option<&Venue>,
5535        instrument_id: Option<&InstrumentId>,
5536        strategy_id: Option<&StrategyId>,
5537        account_id: Option<&AccountId>,
5538    ) -> AHashSet<PositionId> {
5539        self.query_positions_in_bucket(
5540            &self.index.positions_open,
5541            venue,
5542            instrument_id,
5543            strategy_id,
5544            account_id,
5545        )
5546    }
5547
5548    /// Returns the `PositionId`s of all closed positions.
5549    #[must_use]
5550    pub fn position_closed_ids(
5551        &self,
5552        venue: Option<&Venue>,
5553        instrument_id: Option<&InstrumentId>,
5554        strategy_id: Option<&StrategyId>,
5555        account_id: Option<&AccountId>,
5556    ) -> AHashSet<PositionId> {
5557        self.query_positions_in_bucket(
5558            &self.index.positions_closed,
5559            venue,
5560            instrument_id,
5561            strategy_id,
5562            account_id,
5563        )
5564    }
5565
5566    /// Returns a borrowed view over the [`ClientOrderId`]s of all orders matching the optional
5567    /// filter parameters.
5568    ///
5569    /// The returned [`Cow`] borrows the underlying index when no filter is provided and only
5570    /// allocates an owned [`AHashSet`] when an intersection is required. Prefer this over
5571    /// [`Self::client_order_ids`] when the caller only needs to iterate or read membership.
5572    #[must_use]
5573    pub fn client_order_ids_view(
5574        &self,
5575        venue: Option<&Venue>,
5576        instrument_id: Option<&InstrumentId>,
5577        strategy_id: Option<&StrategyId>,
5578        account_id: Option<&AccountId>,
5579    ) -> Cow<'_, AHashSet<ClientOrderId>> {
5580        self.view_orders_in_bucket(
5581            &self.index.orders,
5582            venue,
5583            instrument_id,
5584            strategy_id,
5585            account_id,
5586        )
5587    }
5588
5589    /// Returns a borrowed view over the [`ClientOrderId`]s of all open orders.
5590    #[must_use]
5591    pub fn client_order_ids_open_view(
5592        &self,
5593        venue: Option<&Venue>,
5594        instrument_id: Option<&InstrumentId>,
5595        strategy_id: Option<&StrategyId>,
5596        account_id: Option<&AccountId>,
5597    ) -> Cow<'_, AHashSet<ClientOrderId>> {
5598        self.view_orders_in_bucket(
5599            &self.index.orders_open,
5600            venue,
5601            instrument_id,
5602            strategy_id,
5603            account_id,
5604        )
5605    }
5606
5607    /// Returns a borrowed view over the [`ClientOrderId`]s of all closed orders.
5608    #[must_use]
5609    pub fn client_order_ids_closed_view(
5610        &self,
5611        venue: Option<&Venue>,
5612        instrument_id: Option<&InstrumentId>,
5613        strategy_id: Option<&StrategyId>,
5614        account_id: Option<&AccountId>,
5615    ) -> Cow<'_, AHashSet<ClientOrderId>> {
5616        self.view_orders_in_bucket(
5617            &self.index.orders_closed,
5618            venue,
5619            instrument_id,
5620            strategy_id,
5621            account_id,
5622        )
5623    }
5624
5625    /// Returns a borrowed view over the [`ClientOrderId`]s of all locally active orders.
5626    #[must_use]
5627    pub fn client_order_ids_active_local_view(
5628        &self,
5629        venue: Option<&Venue>,
5630        instrument_id: Option<&InstrumentId>,
5631        strategy_id: Option<&StrategyId>,
5632        account_id: Option<&AccountId>,
5633    ) -> Cow<'_, AHashSet<ClientOrderId>> {
5634        self.view_orders_in_bucket(
5635            &self.index.orders_active_local,
5636            venue,
5637            instrument_id,
5638            strategy_id,
5639            account_id,
5640        )
5641    }
5642
5643    /// Returns a borrowed view over the [`ClientOrderId`]s of all emulated orders.
5644    #[must_use]
5645    pub fn client_order_ids_emulated_view(
5646        &self,
5647        venue: Option<&Venue>,
5648        instrument_id: Option<&InstrumentId>,
5649        strategy_id: Option<&StrategyId>,
5650        account_id: Option<&AccountId>,
5651    ) -> Cow<'_, AHashSet<ClientOrderId>> {
5652        self.view_orders_in_bucket(
5653            &self.index.orders_emulated,
5654            venue,
5655            instrument_id,
5656            strategy_id,
5657            account_id,
5658        )
5659    }
5660
5661    /// Returns a borrowed view over the [`ClientOrderId`]s of all in-flight orders.
5662    #[must_use]
5663    pub fn client_order_ids_inflight_view(
5664        &self,
5665        venue: Option<&Venue>,
5666        instrument_id: Option<&InstrumentId>,
5667        strategy_id: Option<&StrategyId>,
5668        account_id: Option<&AccountId>,
5669    ) -> Cow<'_, AHashSet<ClientOrderId>> {
5670        self.view_orders_in_bucket(
5671            &self.index.orders_inflight,
5672            venue,
5673            instrument_id,
5674            strategy_id,
5675            account_id,
5676        )
5677    }
5678
5679    /// Returns a borrowed view over the [`PositionId`]s of all positions.
5680    #[must_use]
5681    pub fn position_ids_view(
5682        &self,
5683        venue: Option<&Venue>,
5684        instrument_id: Option<&InstrumentId>,
5685        strategy_id: Option<&StrategyId>,
5686        account_id: Option<&AccountId>,
5687    ) -> Cow<'_, AHashSet<PositionId>> {
5688        self.view_positions_in_bucket(
5689            &self.index.positions,
5690            venue,
5691            instrument_id,
5692            strategy_id,
5693            account_id,
5694        )
5695    }
5696
5697    /// Returns a borrowed view over the [`PositionId`]s of all open positions.
5698    #[must_use]
5699    pub fn position_open_ids_view(
5700        &self,
5701        venue: Option<&Venue>,
5702        instrument_id: Option<&InstrumentId>,
5703        strategy_id: Option<&StrategyId>,
5704        account_id: Option<&AccountId>,
5705    ) -> Cow<'_, AHashSet<PositionId>> {
5706        self.view_positions_in_bucket(
5707            &self.index.positions_open,
5708            venue,
5709            instrument_id,
5710            strategy_id,
5711            account_id,
5712        )
5713    }
5714
5715    /// Returns a borrowed view over the [`PositionId`]s of all closed positions.
5716    #[must_use]
5717    pub fn position_closed_ids_view(
5718        &self,
5719        venue: Option<&Venue>,
5720        instrument_id: Option<&InstrumentId>,
5721        strategy_id: Option<&StrategyId>,
5722        account_id: Option<&AccountId>,
5723    ) -> Cow<'_, AHashSet<PositionId>> {
5724        self.view_positions_in_bucket(
5725            &self.index.positions_closed,
5726            venue,
5727            instrument_id,
5728            strategy_id,
5729            account_id,
5730        )
5731    }
5732
5733    /// Returns a lazy iterator yielding [`ClientOrderId`]s of all orders matching the optional
5734    /// filter parameters.
5735    ///
5736    /// Avoids the [`AHashSet`] allocation performed by [`Self::client_order_ids`]. Useful when
5737    /// the caller iterates the result once and discards it.
5738    pub fn iter_client_order_ids(
5739        &self,
5740        venue: Option<&Venue>,
5741        instrument_id: Option<&InstrumentId>,
5742        strategy_id: Option<&StrategyId>,
5743        account_id: Option<&AccountId>,
5744    ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
5745        self.iter_orders_in_bucket(
5746            &self.index.orders,
5747            venue,
5748            instrument_id,
5749            strategy_id,
5750            account_id,
5751        )
5752    }
5753
5754    /// Returns a lazy iterator yielding [`ClientOrderId`]s of all open orders.
5755    pub fn iter_client_order_ids_open(
5756        &self,
5757        venue: Option<&Venue>,
5758        instrument_id: Option<&InstrumentId>,
5759        strategy_id: Option<&StrategyId>,
5760        account_id: Option<&AccountId>,
5761    ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
5762        self.iter_orders_in_bucket(
5763            &self.index.orders_open,
5764            venue,
5765            instrument_id,
5766            strategy_id,
5767            account_id,
5768        )
5769    }
5770
5771    /// Returns a lazy iterator yielding [`ClientOrderId`]s of all closed orders.
5772    pub fn iter_client_order_ids_closed(
5773        &self,
5774        venue: Option<&Venue>,
5775        instrument_id: Option<&InstrumentId>,
5776        strategy_id: Option<&StrategyId>,
5777        account_id: Option<&AccountId>,
5778    ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
5779        self.iter_orders_in_bucket(
5780            &self.index.orders_closed,
5781            venue,
5782            instrument_id,
5783            strategy_id,
5784            account_id,
5785        )
5786    }
5787
5788    /// Returns a lazy iterator yielding [`ClientOrderId`]s of all locally active orders.
5789    pub fn iter_client_order_ids_active_local(
5790        &self,
5791        venue: Option<&Venue>,
5792        instrument_id: Option<&InstrumentId>,
5793        strategy_id: Option<&StrategyId>,
5794        account_id: Option<&AccountId>,
5795    ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
5796        self.iter_orders_in_bucket(
5797            &self.index.orders_active_local,
5798            venue,
5799            instrument_id,
5800            strategy_id,
5801            account_id,
5802        )
5803    }
5804
5805    /// Returns a lazy iterator yielding [`ClientOrderId`]s of all emulated orders.
5806    pub fn iter_client_order_ids_emulated(
5807        &self,
5808        venue: Option<&Venue>,
5809        instrument_id: Option<&InstrumentId>,
5810        strategy_id: Option<&StrategyId>,
5811        account_id: Option<&AccountId>,
5812    ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
5813        self.iter_orders_in_bucket(
5814            &self.index.orders_emulated,
5815            venue,
5816            instrument_id,
5817            strategy_id,
5818            account_id,
5819        )
5820    }
5821
5822    /// Returns a lazy iterator yielding [`ClientOrderId`]s of all in-flight orders.
5823    pub fn iter_client_order_ids_inflight(
5824        &self,
5825        venue: Option<&Venue>,
5826        instrument_id: Option<&InstrumentId>,
5827        strategy_id: Option<&StrategyId>,
5828        account_id: Option<&AccountId>,
5829    ) -> Box<dyn Iterator<Item = ClientOrderId> + '_> {
5830        self.iter_orders_in_bucket(
5831            &self.index.orders_inflight,
5832            venue,
5833            instrument_id,
5834            strategy_id,
5835            account_id,
5836        )
5837    }
5838
5839    /// Returns a lazy iterator yielding [`PositionId`]s of all positions matching the filters.
5840    pub fn iter_position_ids(
5841        &self,
5842        venue: Option<&Venue>,
5843        instrument_id: Option<&InstrumentId>,
5844        strategy_id: Option<&StrategyId>,
5845        account_id: Option<&AccountId>,
5846    ) -> Box<dyn Iterator<Item = PositionId> + '_> {
5847        self.iter_positions_in_bucket(
5848            &self.index.positions,
5849            venue,
5850            instrument_id,
5851            strategy_id,
5852            account_id,
5853        )
5854    }
5855
5856    /// Returns a lazy iterator yielding [`PositionId`]s of all open positions.
5857    pub fn iter_position_open_ids(
5858        &self,
5859        venue: Option<&Venue>,
5860        instrument_id: Option<&InstrumentId>,
5861        strategy_id: Option<&StrategyId>,
5862        account_id: Option<&AccountId>,
5863    ) -> Box<dyn Iterator<Item = PositionId> + '_> {
5864        self.iter_positions_in_bucket(
5865            &self.index.positions_open,
5866            venue,
5867            instrument_id,
5868            strategy_id,
5869            account_id,
5870        )
5871    }
5872
5873    /// Returns a lazy iterator yielding [`PositionId`]s of all closed positions.
5874    pub fn iter_position_closed_ids(
5875        &self,
5876        venue: Option<&Venue>,
5877        instrument_id: Option<&InstrumentId>,
5878        strategy_id: Option<&StrategyId>,
5879        account_id: Option<&AccountId>,
5880    ) -> Box<dyn Iterator<Item = PositionId> + '_> {
5881        self.iter_positions_in_bucket(
5882            &self.index.positions_closed,
5883            venue,
5884            instrument_id,
5885            strategy_id,
5886            account_id,
5887        )
5888    }
5889
5890    /// Returns the `ComponentId`s of all actors.
5891    #[must_use]
5892    pub fn actor_ids(&self) -> AHashSet<ComponentId> {
5893        self.index.actors.clone()
5894    }
5895
5896    /// Returns the `StrategyId`s of all strategies.
5897    #[must_use]
5898    pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
5899        self.index.strategies.clone()
5900    }
5901
5902    /// Returns the `ExecAlgorithmId`s of all execution algorithms.
5903    #[must_use]
5904    pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
5905        self.index.exec_algorithms.clone()
5906    }
5907
5908    // -- ORDER QUERIES ---------------------------------------------------------------------------
5909
5910    /// Gets a borrow of the order with the `client_order_id` (if found).
5911    ///
5912    /// The returned [`OrderRef`] is tied to the cache borrow's scope and panics at runtime if
5913    /// held across a mutation of the same order. Drop the borrow before dispatching events; if
5914    /// post-event state is required, perform a fresh lookup. Use [`Self::order_owned`] when an
5915    /// owned snapshot is needed for a boundary handover.
5916    #[must_use]
5917    pub fn order_ref(&self, client_order_id: &ClientOrderId) -> Option<OrderRef<'_>> {
5918        self.orders
5919            .get(client_order_id)
5920            .map(|order_cell| OrderRef::new(order_cell.borrow()))
5921    }
5922
5923    /// Gets a borrow of the order with the `client_order_id` (if found).
5924    ///
5925    /// Prefer [`Self::order_ref`] in new native code.
5926    #[must_use]
5927    pub fn order(&self, client_order_id: &ClientOrderId) -> Option<OrderRef<'_>> {
5928        self.order_ref(client_order_id)
5929    }
5930
5931    /// Gets a borrow of the order with the `client_order_id`.
5932    ///
5933    /// # Errors
5934    ///
5935    /// Returns [`OrderLookupError::NotFound`] when the order is not present in the cache.
5936    pub fn try_order_ref(
5937        &self,
5938        client_order_id: &ClientOrderId,
5939    ) -> Result<OrderRef<'_>, OrderLookupError> {
5940        self.orders
5941            .get(client_order_id)
5942            .map(|order_cell| OrderRef::new(order_cell.borrow()))
5943            .ok_or_else(|| OrderLookupError::not_found(*client_order_id))
5944    }
5945
5946    /// Gets a borrow of the order with the `client_order_id`.
5947    ///
5948    /// Prefer [`Self::try_order_ref`] in new native code.
5949    ///
5950    /// # Errors
5951    ///
5952    /// Returns [`OrderLookupError::NotFound`] when the order is not present in the cache.
5953    pub fn try_order(
5954        &self,
5955        client_order_id: &ClientOrderId,
5956    ) -> Result<OrderRef<'_>, OrderLookupError> {
5957        self.try_order_ref(client_order_id)
5958    }
5959
5960    /// Gets an exclusive write borrow of the order with the `client_order_id` (if found).
5961    ///
5962    /// Requires `&mut Cache` so cache writes are reachable only by privileged crates that hold
5963    /// `Rc<RefCell<Cache>>` directly. Adapter-facing code receives [`CacheView`], which only
5964    /// exposes immutable cache borrows and therefore cannot reach this method.
5965    ///
5966    /// While the returned [`OrderRefMut`] is alive, no other read or write of the same order is
5967    /// permitted. Drop the borrow before dispatching events or taking any other cache borrow that
5968    /// may re-enter the same order.
5969    #[must_use]
5970    pub fn order_mut(&mut self, client_order_id: &ClientOrderId) -> Option<OrderRefMut<'_>> {
5971        self.orders
5972            .get(client_order_id)
5973            .map(|order_cell| OrderRefMut::new(order_cell.borrow_mut()))
5974    }
5975
5976    /// Gets an owned copy of the order with the `client_order_id` (if found).
5977    ///
5978    /// Use when downstream needs an owned [`OrderAny`] that crosses a boundary (for example, an
5979    /// adapter `get_order` API). The copy will not reflect later cache mutations.
5980    #[must_use]
5981    pub fn order_owned(&self, client_order_id: &ClientOrderId) -> Option<OrderAny> {
5982        self.orders
5983            .get(client_order_id)
5984            .map(|order_cell| order_cell.borrow().clone())
5985    }
5986
5987    /// Gets an owned snapshot of the order with the `client_order_id`.
5988    ///
5989    /// # Errors
5990    ///
5991    /// Returns [`OrderLookupError::NotFound`] when the order is not present in the cache.
5992    pub fn try_order_owned(
5993        &self,
5994        client_order_id: &ClientOrderId,
5995    ) -> Result<OrderAny, OrderLookupError> {
5996        self.try_order_ref(client_order_id)
5997            .map(|order| order.cloned())
5998    }
5999
6000    /// Gets cloned orders for the given `client_order_ids`, logging an error for any missing.
6001    #[must_use]
6002    pub fn orders_for_ids(
6003        &self,
6004        client_order_ids: &[ClientOrderId],
6005        context: &dyn Display,
6006    ) -> Vec<OrderAny> {
6007        let mut orders = Vec::with_capacity(client_order_ids.len());
6008        for id in client_order_ids {
6009            match self.orders.get(id) {
6010                Some(order_cell) => orders.push(order_cell.borrow().clone()),
6011                None => log::error!("Order {id} not found in cache for {context}"),
6012            }
6013        }
6014        orders
6015    }
6016
6017    /// Gets a reference to the client order ID for the `venue_order_id` (if found).
6018    #[must_use]
6019    pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
6020        self.index.venue_order_ids.get(venue_order_id)
6021    }
6022
6023    /// Gets a reference to the venue order ID for the `client_order_id` (if found).
6024    #[must_use]
6025    pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
6026        self.index.client_order_ids.get(client_order_id)
6027    }
6028
6029    /// Gets a reference to the client ID indexed for then `client_order_id` (if found).
6030    #[must_use]
6031    pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
6032        self.index.order_client.get(client_order_id)
6033    }
6034
6035    /// Returns borrows of all orders matching the optional filter parameters.
6036    ///
6037    /// Each [`Ref`] in the returned vector borrows its underlying cell; mutating any of
6038    /// those orders while the vector is alive will panic at runtime. Drop the vector
6039    /// before issuing writes.
6040    #[must_use]
6041    pub fn orders_refs(
6042        &self,
6043        venue: Option<&Venue>,
6044        instrument_id: Option<&InstrumentId>,
6045        strategy_id: Option<&StrategyId>,
6046        account_id: Option<&AccountId>,
6047        side: Option<OrderSide>,
6048    ) -> Vec<OrderRef<'_>> {
6049        let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id, account_id);
6050        self.get_orders_for_ids(&client_order_ids, side)
6051    }
6052
6053    /// Returns borrows of all orders matching the optional filter parameters.
6054    ///
6055    /// Prefer [`Self::orders_refs`] in new native code.
6056    #[must_use]
6057    pub fn orders(
6058        &self,
6059        venue: Option<&Venue>,
6060        instrument_id: Option<&InstrumentId>,
6061        strategy_id: Option<&StrategyId>,
6062        account_id: Option<&AccountId>,
6063        side: Option<OrderSide>,
6064    ) -> Vec<OrderRef<'_>> {
6065        self.orders_refs(venue, instrument_id, strategy_id, account_id, side)
6066    }
6067
6068    /// Returns borrows of all open orders matching the optional filter parameters.
6069    #[must_use]
6070    pub fn orders_open_refs(
6071        &self,
6072        venue: Option<&Venue>,
6073        instrument_id: Option<&InstrumentId>,
6074        strategy_id: Option<&StrategyId>,
6075        account_id: Option<&AccountId>,
6076        side: Option<OrderSide>,
6077    ) -> Vec<OrderRef<'_>> {
6078        let client_order_ids =
6079            self.client_order_ids_open(venue, instrument_id, strategy_id, account_id);
6080        self.get_orders_for_ids(&client_order_ids, side)
6081    }
6082
6083    /// Returns borrows of all open orders matching the optional filter parameters.
6084    ///
6085    /// Prefer [`Self::orders_open_refs`] in new native code.
6086    #[must_use]
6087    pub fn orders_open(
6088        &self,
6089        venue: Option<&Venue>,
6090        instrument_id: Option<&InstrumentId>,
6091        strategy_id: Option<&StrategyId>,
6092        account_id: Option<&AccountId>,
6093        side: Option<OrderSide>,
6094    ) -> Vec<OrderRef<'_>> {
6095        self.orders_open_refs(venue, instrument_id, strategy_id, account_id, side)
6096    }
6097
6098    /// Returns borrows of all closed orders matching the optional filter parameters.
6099    #[must_use]
6100    pub fn orders_closed_refs(
6101        &self,
6102        venue: Option<&Venue>,
6103        instrument_id: Option<&InstrumentId>,
6104        strategy_id: Option<&StrategyId>,
6105        account_id: Option<&AccountId>,
6106        side: Option<OrderSide>,
6107    ) -> Vec<OrderRef<'_>> {
6108        let client_order_ids =
6109            self.client_order_ids_closed(venue, instrument_id, strategy_id, account_id);
6110        self.get_orders_for_ids(&client_order_ids, side)
6111    }
6112
6113    /// Returns borrows of all closed orders matching the optional filter parameters.
6114    ///
6115    /// Prefer [`Self::orders_closed_refs`] in new native code.
6116    #[must_use]
6117    pub fn orders_closed(
6118        &self,
6119        venue: Option<&Venue>,
6120        instrument_id: Option<&InstrumentId>,
6121        strategy_id: Option<&StrategyId>,
6122        account_id: Option<&AccountId>,
6123        side: Option<OrderSide>,
6124    ) -> Vec<OrderRef<'_>> {
6125        self.orders_closed_refs(venue, instrument_id, strategy_id, account_id, side)
6126    }
6127
6128    /// Returns borrows of all locally active orders matching the optional filter parameters.
6129    ///
6130    /// Locally active orders are in the `INITIALIZED`, `EMULATED`, or `RELEASED` state
6131    /// (a superset of emulated orders).
6132    #[must_use]
6133    pub fn orders_active_local_refs(
6134        &self,
6135        venue: Option<&Venue>,
6136        instrument_id: Option<&InstrumentId>,
6137        strategy_id: Option<&StrategyId>,
6138        account_id: Option<&AccountId>,
6139        side: Option<OrderSide>,
6140    ) -> Vec<OrderRef<'_>> {
6141        let client_order_ids =
6142            self.client_order_ids_active_local(venue, instrument_id, strategy_id, account_id);
6143        self.get_orders_for_ids(&client_order_ids, side)
6144    }
6145
6146    /// Returns borrows of all locally active orders matching the optional filter parameters.
6147    ///
6148    /// Prefer [`Self::orders_active_local_refs`] in new native code.
6149    #[must_use]
6150    pub fn orders_active_local(
6151        &self,
6152        venue: Option<&Venue>,
6153        instrument_id: Option<&InstrumentId>,
6154        strategy_id: Option<&StrategyId>,
6155        account_id: Option<&AccountId>,
6156        side: Option<OrderSide>,
6157    ) -> Vec<OrderRef<'_>> {
6158        self.orders_active_local_refs(venue, instrument_id, strategy_id, account_id, side)
6159    }
6160
6161    /// Returns borrows of all emulated orders matching the optional filter parameters.
6162    #[must_use]
6163    pub fn orders_emulated_refs(
6164        &self,
6165        venue: Option<&Venue>,
6166        instrument_id: Option<&InstrumentId>,
6167        strategy_id: Option<&StrategyId>,
6168        account_id: Option<&AccountId>,
6169        side: Option<OrderSide>,
6170    ) -> Vec<OrderRef<'_>> {
6171        let client_order_ids =
6172            self.client_order_ids_emulated(venue, instrument_id, strategy_id, account_id);
6173        self.get_orders_for_ids(&client_order_ids, side)
6174    }
6175
6176    /// Returns borrows of all emulated orders matching the optional filter parameters.
6177    ///
6178    /// Prefer [`Self::orders_emulated_refs`] in new native code.
6179    #[must_use]
6180    pub fn orders_emulated(
6181        &self,
6182        venue: Option<&Venue>,
6183        instrument_id: Option<&InstrumentId>,
6184        strategy_id: Option<&StrategyId>,
6185        account_id: Option<&AccountId>,
6186        side: Option<OrderSide>,
6187    ) -> Vec<OrderRef<'_>> {
6188        self.orders_emulated_refs(venue, instrument_id, strategy_id, account_id, side)
6189    }
6190
6191    /// Returns borrows of all in-flight orders matching the optional filter parameters.
6192    #[must_use]
6193    pub fn orders_inflight_refs(
6194        &self,
6195        venue: Option<&Venue>,
6196        instrument_id: Option<&InstrumentId>,
6197        strategy_id: Option<&StrategyId>,
6198        account_id: Option<&AccountId>,
6199        side: Option<OrderSide>,
6200    ) -> Vec<OrderRef<'_>> {
6201        let client_order_ids =
6202            self.client_order_ids_inflight(venue, instrument_id, strategy_id, account_id);
6203        self.get_orders_for_ids(&client_order_ids, side)
6204    }
6205
6206    /// Returns borrows of all in-flight orders matching the optional filter parameters.
6207    ///
6208    /// Prefer [`Self::orders_inflight_refs`] in new native code.
6209    #[must_use]
6210    pub fn orders_inflight(
6211        &self,
6212        venue: Option<&Venue>,
6213        instrument_id: Option<&InstrumentId>,
6214        strategy_id: Option<&StrategyId>,
6215        account_id: Option<&AccountId>,
6216        side: Option<OrderSide>,
6217    ) -> Vec<OrderRef<'_>> {
6218        self.orders_inflight_refs(venue, instrument_id, strategy_id, account_id, side)
6219    }
6220
6221    /// Returns borrows of all orders for the `position_id`.
6222    #[must_use]
6223    pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<OrderRef<'_>> {
6224        match self.index.position_orders.get(position_id) {
6225            Some(client_order_ids) => self.get_orders_for_ids(client_order_ids, None),
6226            None => Vec::new(),
6227        }
6228    }
6229
6230    /// Returns whether an order with the `client_order_id` exists.
6231    #[must_use]
6232    pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
6233        self.index.orders.contains(client_order_id)
6234    }
6235
6236    /// Returns whether an order with the `client_order_id` is open.
6237    #[must_use]
6238    pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
6239        self.index.orders_open.contains(client_order_id)
6240    }
6241
6242    /// Returns whether an order with the `client_order_id` is closed.
6243    #[must_use]
6244    pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
6245        self.index.orders_closed.contains(client_order_id)
6246    }
6247
6248    /// Returns whether an order with the `client_order_id` is locally active.
6249    ///
6250    /// Locally active orders are in the `INITIALIZED`, `EMULATED`, or `RELEASED` state
6251    /// (a superset of emulated orders).
6252    #[must_use]
6253    pub fn is_order_active_local(&self, client_order_id: &ClientOrderId) -> bool {
6254        self.index.orders_active_local.contains(client_order_id)
6255    }
6256
6257    /// Returns whether an order with the `client_order_id` is emulated.
6258    #[must_use]
6259    pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
6260        self.index.orders_emulated.contains(client_order_id)
6261    }
6262
6263    /// Returns whether an order with the `client_order_id` is in-flight.
6264    #[must_use]
6265    pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
6266        self.index.orders_inflight.contains(client_order_id)
6267    }
6268
6269    /// Returns whether an order with the `client_order_id` is `PENDING_CANCEL` locally.
6270    #[must_use]
6271    pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
6272        self.index.orders_pending_cancel.contains(client_order_id)
6273    }
6274
6275    /// Returns the count of all open orders.
6276    #[must_use]
6277    pub fn orders_open_count(
6278        &self,
6279        venue: Option<&Venue>,
6280        instrument_id: Option<&InstrumentId>,
6281        strategy_id: Option<&StrategyId>,
6282        account_id: Option<&AccountId>,
6283        side: Option<OrderSide>,
6284    ) -> usize {
6285        self.count_orders_in_bucket(
6286            &self.index.orders_open,
6287            venue,
6288            instrument_id,
6289            strategy_id,
6290            account_id,
6291            side,
6292        )
6293    }
6294
6295    /// Returns the count of all closed orders.
6296    #[must_use]
6297    pub fn orders_closed_count(
6298        &self,
6299        venue: Option<&Venue>,
6300        instrument_id: Option<&InstrumentId>,
6301        strategy_id: Option<&StrategyId>,
6302        account_id: Option<&AccountId>,
6303        side: Option<OrderSide>,
6304    ) -> usize {
6305        self.count_orders_in_bucket(
6306            &self.index.orders_closed,
6307            venue,
6308            instrument_id,
6309            strategy_id,
6310            account_id,
6311            side,
6312        )
6313    }
6314
6315    /// Returns the count of all locally active orders.
6316    ///
6317    /// Locally active orders are in the `INITIALIZED`, `EMULATED`, or `RELEASED` state
6318    /// (a superset of emulated orders).
6319    #[must_use]
6320    pub fn orders_active_local_count(
6321        &self,
6322        venue: Option<&Venue>,
6323        instrument_id: Option<&InstrumentId>,
6324        strategy_id: Option<&StrategyId>,
6325        account_id: Option<&AccountId>,
6326        side: Option<OrderSide>,
6327    ) -> usize {
6328        self.count_orders_in_bucket(
6329            &self.index.orders_active_local,
6330            venue,
6331            instrument_id,
6332            strategy_id,
6333            account_id,
6334            side,
6335        )
6336    }
6337
6338    /// Returns the count of all emulated orders.
6339    #[must_use]
6340    pub fn orders_emulated_count(
6341        &self,
6342        venue: Option<&Venue>,
6343        instrument_id: Option<&InstrumentId>,
6344        strategy_id: Option<&StrategyId>,
6345        account_id: Option<&AccountId>,
6346        side: Option<OrderSide>,
6347    ) -> usize {
6348        self.count_orders_in_bucket(
6349            &self.index.orders_emulated,
6350            venue,
6351            instrument_id,
6352            strategy_id,
6353            account_id,
6354            side,
6355        )
6356    }
6357
6358    /// Returns the count of all in-flight orders.
6359    #[must_use]
6360    pub fn orders_inflight_count(
6361        &self,
6362        venue: Option<&Venue>,
6363        instrument_id: Option<&InstrumentId>,
6364        strategy_id: Option<&StrategyId>,
6365        account_id: Option<&AccountId>,
6366        side: Option<OrderSide>,
6367    ) -> usize {
6368        self.count_orders_in_bucket(
6369            &self.index.orders_inflight,
6370            venue,
6371            instrument_id,
6372            strategy_id,
6373            account_id,
6374            side,
6375        )
6376    }
6377
6378    /// Returns the count of all orders.
6379    #[must_use]
6380    pub fn orders_total_count(
6381        &self,
6382        venue: Option<&Venue>,
6383        instrument_id: Option<&InstrumentId>,
6384        strategy_id: Option<&StrategyId>,
6385        account_id: Option<&AccountId>,
6386        side: Option<OrderSide>,
6387    ) -> usize {
6388        self.count_orders_in_bucket(
6389            &self.index.orders,
6390            venue,
6391            instrument_id,
6392            strategy_id,
6393            account_id,
6394            side,
6395        )
6396    }
6397
6398    /// Returns whether any open order matches the optional filter parameters.
6399    ///
6400    /// Short-circuits on the first match, avoiding the full intersection walk performed by
6401    /// [`Self::orders_open_count`]. Prefer this over `orders_open_count(...) > 0` when only
6402    /// existence matters.
6403    #[must_use]
6404    pub fn has_orders_open(
6405        &self,
6406        venue: Option<&Venue>,
6407        instrument_id: Option<&InstrumentId>,
6408        strategy_id: Option<&StrategyId>,
6409        account_id: Option<&AccountId>,
6410        side: Option<OrderSide>,
6411    ) -> bool {
6412        self.any_orders_in_bucket(
6413            &self.index.orders_open,
6414            venue,
6415            instrument_id,
6416            strategy_id,
6417            account_id,
6418            side,
6419        )
6420    }
6421
6422    /// Returns whether any closed order matches the optional filter parameters.
6423    #[must_use]
6424    pub fn has_orders_closed(
6425        &self,
6426        venue: Option<&Venue>,
6427        instrument_id: Option<&InstrumentId>,
6428        strategy_id: Option<&StrategyId>,
6429        account_id: Option<&AccountId>,
6430        side: Option<OrderSide>,
6431    ) -> bool {
6432        self.any_orders_in_bucket(
6433            &self.index.orders_closed,
6434            venue,
6435            instrument_id,
6436            strategy_id,
6437            account_id,
6438            side,
6439        )
6440    }
6441
6442    /// Returns whether any locally active order matches the optional filter parameters.
6443    ///
6444    /// Locally active orders are in the `INITIALIZED`, `EMULATED`, or `RELEASED` state.
6445    #[must_use]
6446    pub fn has_orders_active_local(
6447        &self,
6448        venue: Option<&Venue>,
6449        instrument_id: Option<&InstrumentId>,
6450        strategy_id: Option<&StrategyId>,
6451        account_id: Option<&AccountId>,
6452        side: Option<OrderSide>,
6453    ) -> bool {
6454        self.any_orders_in_bucket(
6455            &self.index.orders_active_local,
6456            venue,
6457            instrument_id,
6458            strategy_id,
6459            account_id,
6460            side,
6461        )
6462    }
6463
6464    /// Returns whether any emulated order matches the optional filter parameters.
6465    #[must_use]
6466    pub fn has_orders_emulated(
6467        &self,
6468        venue: Option<&Venue>,
6469        instrument_id: Option<&InstrumentId>,
6470        strategy_id: Option<&StrategyId>,
6471        account_id: Option<&AccountId>,
6472        side: Option<OrderSide>,
6473    ) -> bool {
6474        self.any_orders_in_bucket(
6475            &self.index.orders_emulated,
6476            venue,
6477            instrument_id,
6478            strategy_id,
6479            account_id,
6480            side,
6481        )
6482    }
6483
6484    /// Returns whether any in-flight order matches the optional filter parameters.
6485    #[must_use]
6486    pub fn has_orders_inflight(
6487        &self,
6488        venue: Option<&Venue>,
6489        instrument_id: Option<&InstrumentId>,
6490        strategy_id: Option<&StrategyId>,
6491        account_id: Option<&AccountId>,
6492        side: Option<OrderSide>,
6493    ) -> bool {
6494        self.any_orders_in_bucket(
6495            &self.index.orders_inflight,
6496            venue,
6497            instrument_id,
6498            strategy_id,
6499            account_id,
6500            side,
6501        )
6502    }
6503
6504    /// Returns whether any order (in any state) matches the optional filter parameters.
6505    #[must_use]
6506    pub fn has_orders(
6507        &self,
6508        venue: Option<&Venue>,
6509        instrument_id: Option<&InstrumentId>,
6510        strategy_id: Option<&StrategyId>,
6511        account_id: Option<&AccountId>,
6512        side: Option<OrderSide>,
6513    ) -> bool {
6514        self.any_orders_in_bucket(
6515            &self.index.orders,
6516            venue,
6517            instrument_id,
6518            strategy_id,
6519            account_id,
6520            side,
6521        )
6522    }
6523
6524    /// Returns the order list for the `order_list_id`.
6525    #[must_use]
6526    pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
6527        self.order_lists.get(order_list_id)
6528    }
6529
6530    /// Returns the order list for the `order_list_id`.
6531    ///
6532    /// # Errors
6533    ///
6534    /// Returns [`OrderListLookupError::NotFound`] when the order list is not present in the cache.
6535    pub fn try_order_list(
6536        &self,
6537        order_list_id: &OrderListId,
6538    ) -> Result<&OrderList, OrderListLookupError> {
6539        self.order_lists
6540            .get(order_list_id)
6541            .ok_or_else(|| OrderListLookupError::not_found(*order_list_id))
6542    }
6543
6544    /// Returns all order lists matching the optional filter parameters.
6545    #[must_use]
6546    pub fn order_lists(
6547        &self,
6548        venue: Option<&Venue>,
6549        instrument_id: Option<&InstrumentId>,
6550        strategy_id: Option<&StrategyId>,
6551        account_id: Option<&AccountId>,
6552    ) -> Vec<&OrderList> {
6553        let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
6554
6555        if let Some(venue) = venue {
6556            order_lists.retain(|ol| &ol.instrument_id.venue == venue);
6557        }
6558
6559        if let Some(instrument_id) = instrument_id {
6560            order_lists.retain(|ol| &ol.instrument_id == instrument_id);
6561        }
6562
6563        if let Some(strategy_id) = strategy_id {
6564            order_lists.retain(|ol| &ol.strategy_id == strategy_id);
6565        }
6566
6567        if let Some(account_id) = account_id {
6568            order_lists.retain(|ol| {
6569                ol.client_order_ids.iter().any(|client_order_id| {
6570                    self.orders.get(client_order_id).is_some_and(|order_cell| {
6571                        order_cell.borrow().account_id().as_ref() == Some(account_id)
6572                    })
6573                })
6574            });
6575        }
6576
6577        order_lists
6578    }
6579
6580    /// Returns whether an order list with the `order_list_id` exists.
6581    #[must_use]
6582    pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
6583        self.order_lists.contains_key(order_list_id)
6584    }
6585
6586    // -- EXEC ALGORITHM QUERIES ------------------------------------------------------------------
6587
6588    /// Returns references to all orders associated with the `exec_algorithm_id` matching the
6589    /// optional filter parameters.
6590    #[must_use]
6591    pub fn orders_for_exec_algorithm(
6592        &self,
6593        exec_algorithm_id: &ExecAlgorithmId,
6594        venue: Option<&Venue>,
6595        instrument_id: Option<&InstrumentId>,
6596        strategy_id: Option<&StrategyId>,
6597        account_id: Option<&AccountId>,
6598        side: Option<OrderSide>,
6599    ) -> Vec<OrderRef<'_>> {
6600        let Some(exec_algorithm_order_ids) =
6601            self.index.exec_algorithm_orders.get(exec_algorithm_id)
6602        else {
6603            return Vec::new();
6604        };
6605
6606        let filtered = self.query_orders_in_bucket(
6607            exec_algorithm_order_ids,
6608            venue,
6609            instrument_id,
6610            strategy_id,
6611            account_id,
6612        );
6613        self.get_orders_for_ids(&filtered, side)
6614    }
6615
6616    /// Returns references to all orders with the `exec_spawn_id`.
6617    #[must_use]
6618    pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<OrderRef<'_>> {
6619        match self.index.exec_spawn_orders.get(exec_spawn_id) {
6620            Some(ids) => self.get_orders_for_ids(ids, None),
6621            None => Vec::new(),
6622        }
6623    }
6624
6625    /// Returns the total order quantity for the `exec_spawn_id`.
6626    #[must_use]
6627    pub fn exec_spawn_total_quantity(
6628        &self,
6629        exec_spawn_id: &ClientOrderId,
6630        active_only: bool,
6631    ) -> Option<Quantity> {
6632        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
6633
6634        let mut total_quantity: Option<Quantity> = None;
6635
6636        for spawn_order in exec_spawn_orders {
6637            if active_only && spawn_order.is_closed() {
6638                continue;
6639            }
6640
6641            match total_quantity.as_mut() {
6642                Some(total) => *total = *total + spawn_order.quantity(),
6643                None => total_quantity = Some(spawn_order.quantity()),
6644            }
6645        }
6646
6647        total_quantity
6648    }
6649
6650    /// Returns the total filled quantity for all orders with the `exec_spawn_id`.
6651    #[must_use]
6652    pub fn exec_spawn_total_filled_qty(
6653        &self,
6654        exec_spawn_id: &ClientOrderId,
6655        active_only: bool,
6656    ) -> Option<Quantity> {
6657        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
6658
6659        let mut total_quantity: Option<Quantity> = None;
6660
6661        for spawn_order in exec_spawn_orders {
6662            if active_only && spawn_order.is_closed() {
6663                continue;
6664            }
6665
6666            match total_quantity.as_mut() {
6667                Some(total) => *total = *total + spawn_order.filled_qty(),
6668                None => total_quantity = Some(spawn_order.filled_qty()),
6669            }
6670        }
6671
6672        total_quantity
6673    }
6674
6675    /// Returns the total leaves quantity for all orders with the `exec_spawn_id`.
6676    #[must_use]
6677    pub fn exec_spawn_total_leaves_qty(
6678        &self,
6679        exec_spawn_id: &ClientOrderId,
6680        active_only: bool,
6681    ) -> Option<Quantity> {
6682        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
6683
6684        let mut total_quantity: Option<Quantity> = None;
6685
6686        for spawn_order in exec_spawn_orders {
6687            if active_only && spawn_order.is_closed() {
6688                continue;
6689            }
6690
6691            match total_quantity.as_mut() {
6692                Some(total) => *total = *total + spawn_order.leaves_qty(),
6693                None => total_quantity = Some(spawn_order.leaves_qty()),
6694            }
6695        }
6696
6697        total_quantity
6698    }
6699
6700    // -- POSITION QUERIES ------------------------------------------------------------------------
6701
6702    /// Returns a borrow of the position with the `position_id` (if found).
6703    #[must_use]
6704    pub fn position_ref(&self, position_id: &PositionId) -> Option<PositionRef<'_>> {
6705        self.positions
6706            .get(position_id)
6707            .map(|position_cell| PositionRef::new(position_cell.borrow()))
6708    }
6709
6710    /// Returns a borrow of the position with the `position_id` (if found).
6711    ///
6712    /// Prefer [`Self::position_ref`] in new native code.
6713    #[must_use]
6714    pub fn position(&self, position_id: &PositionId) -> Option<PositionRef<'_>> {
6715        self.position_ref(position_id)
6716    }
6717
6718    /// Returns a borrow of the position with the `position_id`.
6719    ///
6720    /// # Errors
6721    ///
6722    /// Returns [`PositionLookupError::NotFound`] when the position is not present in the cache.
6723    pub fn try_position_ref(
6724        &self,
6725        position_id: &PositionId,
6726    ) -> Result<PositionRef<'_>, PositionLookupError> {
6727        self.positions
6728            .get(position_id)
6729            .map(|position_cell| PositionRef::new(position_cell.borrow()))
6730            .ok_or_else(|| PositionLookupError::not_found(*position_id))
6731    }
6732
6733    /// Returns a borrow of the position with the `position_id`.
6734    ///
6735    /// Prefer [`Self::try_position_ref`] in new native code.
6736    ///
6737    /// # Errors
6738    ///
6739    /// Returns [`PositionLookupError::NotFound`] when the position is not present in the cache.
6740    pub fn try_position(
6741        &self,
6742        position_id: &PositionId,
6743    ) -> Result<PositionRef<'_>, PositionLookupError> {
6744        self.try_position_ref(position_id)
6745    }
6746
6747    /// Gets an exclusive write borrow of the position with the `position_id` (if found).
6748    ///
6749    /// Requires `&mut Cache` so cache writes are reachable only by privileged crates that hold
6750    /// `Rc<RefCell<Cache>>` directly. Adapter-facing code receives [`CacheView`], which only
6751    /// exposes immutable cache borrows and therefore cannot reach this method.
6752    ///
6753    /// While the returned [`PositionRefMut`] is alive, no other read or write of the same position
6754    /// is permitted. Drop the borrow before dispatching events or taking any other cache borrow
6755    /// that may re-enter the same position.
6756    #[must_use]
6757    pub fn position_mut(&mut self, position_id: &PositionId) -> Option<PositionRefMut<'_>> {
6758        self.positions
6759            .get(position_id)
6760            .map(|position_cell| PositionRefMut::new(position_cell.borrow_mut()))
6761    }
6762
6763    /// Gets an owned copy of the position with the `position_id` (if found).
6764    ///
6765    /// Use when downstream needs an owned [`Position`] that crosses a boundary. The copy will not
6766    /// reflect later cache mutations.
6767    #[must_use]
6768    pub fn position_owned(&self, position_id: &PositionId) -> Option<Position> {
6769        self.positions
6770            .get(position_id)
6771            .map(|position_cell| position_cell.borrow().clone())
6772    }
6773
6774    /// Returns a borrow of the position for the `client_order_id` (if found).
6775    #[must_use]
6776    pub fn position_for_order_ref(
6777        &self,
6778        client_order_id: &ClientOrderId,
6779    ) -> Option<PositionRef<'_>> {
6780        self.index
6781            .order_position
6782            .get(client_order_id)
6783            .and_then(|position_id| self.positions.get(position_id))
6784            .map(|position_cell| PositionRef::new(position_cell.borrow()))
6785    }
6786
6787    /// Returns a borrow of the position for the `client_order_id` (if found).
6788    ///
6789    /// Prefer [`Self::position_for_order_ref`] in new native code.
6790    #[must_use]
6791    pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<PositionRef<'_>> {
6792        self.position_for_order_ref(client_order_id)
6793    }
6794
6795    /// Returns a reference to the position ID for the `client_order_id` (if found).
6796    #[must_use]
6797    pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
6798        self.index.order_position.get(client_order_id)
6799    }
6800
6801    /// Returns borrows of all positions matching the optional filter parameters.
6802    ///
6803    /// Each [`PositionRef`] in the returned vector borrows its underlying cell; mutating any of
6804    /// those positions while the vector is alive will panic at runtime. Drop the vector before
6805    /// issuing writes.
6806    #[must_use]
6807    pub fn positions_refs(
6808        &self,
6809        venue: Option<&Venue>,
6810        instrument_id: Option<&InstrumentId>,
6811        strategy_id: Option<&StrategyId>,
6812        account_id: Option<&AccountId>,
6813        side: Option<PositionSide>,
6814    ) -> Vec<PositionRef<'_>> {
6815        let position_ids = self.position_ids(venue, instrument_id, strategy_id, account_id);
6816        self.get_positions_for_ids(&position_ids, side)
6817    }
6818
6819    /// Returns borrows of all positions matching the optional filter parameters.
6820    ///
6821    /// Prefer [`Self::positions_refs`] in new native code.
6822    #[must_use]
6823    pub fn positions(
6824        &self,
6825        venue: Option<&Venue>,
6826        instrument_id: Option<&InstrumentId>,
6827        strategy_id: Option<&StrategyId>,
6828        account_id: Option<&AccountId>,
6829        side: Option<PositionSide>,
6830    ) -> Vec<PositionRef<'_>> {
6831        self.positions_refs(venue, instrument_id, strategy_id, account_id, side)
6832    }
6833
6834    /// Returns borrows of all open positions matching the optional filter parameters.
6835    #[must_use]
6836    pub fn positions_open_refs(
6837        &self,
6838        venue: Option<&Venue>,
6839        instrument_id: Option<&InstrumentId>,
6840        strategy_id: Option<&StrategyId>,
6841        account_id: Option<&AccountId>,
6842        side: Option<PositionSide>,
6843    ) -> Vec<PositionRef<'_>> {
6844        let position_ids = self.position_open_ids(venue, instrument_id, strategy_id, account_id);
6845        self.get_positions_for_ids(&position_ids, side)
6846    }
6847
6848    /// Returns borrows of all open positions matching the optional filter parameters.
6849    ///
6850    /// Prefer [`Self::positions_open_refs`] in new native code.
6851    #[must_use]
6852    pub fn positions_open(
6853        &self,
6854        venue: Option<&Venue>,
6855        instrument_id: Option<&InstrumentId>,
6856        strategy_id: Option<&StrategyId>,
6857        account_id: Option<&AccountId>,
6858        side: Option<PositionSide>,
6859    ) -> Vec<PositionRef<'_>> {
6860        self.positions_open_refs(venue, instrument_id, strategy_id, account_id, side)
6861    }
6862
6863    /// Returns borrows of all closed positions matching the optional filter parameters.
6864    #[must_use]
6865    pub fn positions_closed_refs(
6866        &self,
6867        venue: Option<&Venue>,
6868        instrument_id: Option<&InstrumentId>,
6869        strategy_id: Option<&StrategyId>,
6870        account_id: Option<&AccountId>,
6871        side: Option<PositionSide>,
6872    ) -> Vec<PositionRef<'_>> {
6873        let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id, account_id);
6874        self.get_positions_for_ids(&position_ids, side)
6875    }
6876
6877    /// Returns borrows of all closed positions matching the optional filter parameters.
6878    ///
6879    /// Prefer [`Self::positions_closed_refs`] in new native code.
6880    #[must_use]
6881    pub fn positions_closed(
6882        &self,
6883        venue: Option<&Venue>,
6884        instrument_id: Option<&InstrumentId>,
6885        strategy_id: Option<&StrategyId>,
6886        account_id: Option<&AccountId>,
6887        side: Option<PositionSide>,
6888    ) -> Vec<PositionRef<'_>> {
6889        self.positions_closed_refs(venue, instrument_id, strategy_id, account_id, side)
6890    }
6891
6892    /// Returns whether a position with the `position_id` exists.
6893    #[must_use]
6894    pub fn position_exists(&self, position_id: &PositionId) -> bool {
6895        self.index.positions.contains(position_id)
6896    }
6897
6898    /// Returns whether a position with the `position_id` is open.
6899    #[must_use]
6900    pub fn is_position_open(&self, position_id: &PositionId) -> bool {
6901        self.index.positions_open.contains(position_id)
6902    }
6903
6904    /// Returns whether a position with the `position_id` is closed.
6905    #[must_use]
6906    pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
6907        self.index.positions_closed.contains(position_id)
6908    }
6909
6910    /// Returns the count of all open positions.
6911    #[must_use]
6912    pub fn positions_open_count(
6913        &self,
6914        venue: Option<&Venue>,
6915        instrument_id: Option<&InstrumentId>,
6916        strategy_id: Option<&StrategyId>,
6917        account_id: Option<&AccountId>,
6918        side: Option<PositionSide>,
6919    ) -> usize {
6920        self.count_positions_in_bucket(
6921            &self.index.positions_open,
6922            venue,
6923            instrument_id,
6924            strategy_id,
6925            account_id,
6926            side,
6927        )
6928    }
6929
6930    /// Returns the count of all closed positions.
6931    #[must_use]
6932    pub fn positions_closed_count(
6933        &self,
6934        venue: Option<&Venue>,
6935        instrument_id: Option<&InstrumentId>,
6936        strategy_id: Option<&StrategyId>,
6937        account_id: Option<&AccountId>,
6938        side: Option<PositionSide>,
6939    ) -> usize {
6940        self.count_positions_in_bucket(
6941            &self.index.positions_closed,
6942            venue,
6943            instrument_id,
6944            strategy_id,
6945            account_id,
6946            side,
6947        )
6948    }
6949
6950    /// Returns the count of all positions.
6951    #[must_use]
6952    pub fn positions_total_count(
6953        &self,
6954        venue: Option<&Venue>,
6955        instrument_id: Option<&InstrumentId>,
6956        strategy_id: Option<&StrategyId>,
6957        account_id: Option<&AccountId>,
6958        side: Option<PositionSide>,
6959    ) -> usize {
6960        self.count_positions_in_bucket(
6961            &self.index.positions,
6962            venue,
6963            instrument_id,
6964            strategy_id,
6965            account_id,
6966            side,
6967        )
6968    }
6969
6970    /// Returns whether any open position matches the optional filter parameters.
6971    ///
6972    /// Short-circuits on the first match, avoiding the full intersection walk performed by
6973    /// [`Self::positions_open_count`]. Prefer this over `positions_open_count(...) > 0` when
6974    /// only existence matters.
6975    #[must_use]
6976    pub fn has_positions_open(
6977        &self,
6978        venue: Option<&Venue>,
6979        instrument_id: Option<&InstrumentId>,
6980        strategy_id: Option<&StrategyId>,
6981        account_id: Option<&AccountId>,
6982        side: Option<PositionSide>,
6983    ) -> bool {
6984        self.any_positions_in_bucket(
6985            &self.index.positions_open,
6986            venue,
6987            instrument_id,
6988            strategy_id,
6989            account_id,
6990            side,
6991        )
6992    }
6993
6994    /// Returns whether any closed position matches the optional filter parameters.
6995    #[must_use]
6996    pub fn has_positions_closed(
6997        &self,
6998        venue: Option<&Venue>,
6999        instrument_id: Option<&InstrumentId>,
7000        strategy_id: Option<&StrategyId>,
7001        account_id: Option<&AccountId>,
7002        side: Option<PositionSide>,
7003    ) -> bool {
7004        self.any_positions_in_bucket(
7005            &self.index.positions_closed,
7006            venue,
7007            instrument_id,
7008            strategy_id,
7009            account_id,
7010            side,
7011        )
7012    }
7013
7014    /// Returns whether any position (open or closed) matches the optional filter parameters.
7015    #[must_use]
7016    pub fn has_positions(
7017        &self,
7018        venue: Option<&Venue>,
7019        instrument_id: Option<&InstrumentId>,
7020        strategy_id: Option<&StrategyId>,
7021        account_id: Option<&AccountId>,
7022        side: Option<PositionSide>,
7023    ) -> bool {
7024        self.any_positions_in_bucket(
7025            &self.index.positions,
7026            venue,
7027            instrument_id,
7028            strategy_id,
7029            account_id,
7030            side,
7031        )
7032    }
7033
7034    // -- STRATEGY QUERIES ------------------------------------------------------------------------
7035
7036    /// Gets a reference to the strategy ID for the `client_order_id` (if found).
7037    #[must_use]
7038    pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
7039        self.index.order_strategy.get(client_order_id)
7040    }
7041
7042    /// Gets a reference to the strategy ID for the `position_id` (if found).
7043    #[must_use]
7044    pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
7045        self.index.position_strategy.get(position_id)
7046    }
7047
7048    // -- GENERAL ---------------------------------------------------------------------------------
7049
7050    /// Gets a reference to the general value for the `key` (if found).
7051    ///
7052    /// # Errors
7053    ///
7054    /// Returns an error if the `key` is invalid.
7055    pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
7056        check_valid_string_ascii(key, stringify!(key))?;
7057
7058        Ok(self.general.get(key))
7059    }
7060
7061    // -- DATA QUERIES ----------------------------------------------------------------------------
7062
7063    /// Returns the price for the `instrument_id` and `price_type` (if found).
7064    ///
7065    /// # Panics
7066    ///
7067    /// Panics if `price_type` is [`PriceType::Mid`] and the quote price precision is already at
7068    /// the maximum fixed precision.
7069    #[must_use]
7070    pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
7071        match price_type {
7072            PriceType::Bid => self
7073                .quotes
7074                .get(instrument_id)
7075                .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
7076            PriceType::Ask => self
7077                .quotes
7078                .get(instrument_id)
7079                .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
7080            PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
7081                quotes.front().map(|quote| {
7082                    let mid = (quote.ask_price.as_decimal() + quote.bid_price.as_decimal())
7083                        / Decimal::TWO;
7084
7085                    Price::from_decimal_dp(mid, quote.bid_price.precision + 1)
7086                        .expect("Invalid mid price for Cache::price")
7087                })
7088            }),
7089            PriceType::Last => self
7090                .trades
7091                .get(instrument_id)
7092                .and_then(|trades| trades.front().map(|trade| trade.price)),
7093            PriceType::Mark => self
7094                .mark_prices
7095                .get(instrument_id)
7096                .and_then(|marks| marks.front().map(|mark| mark.value)),
7097        }
7098    }
7099
7100    /// Gets all quotes for the `instrument_id`.
7101    #[must_use]
7102    pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
7103        self.quotes
7104            .get(instrument_id)
7105            .map(|quotes| quotes.iter().copied().collect())
7106    }
7107
7108    /// Gets all trades for the `instrument_id`.
7109    #[must_use]
7110    pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
7111        self.trades
7112            .get(instrument_id)
7113            .map(|trades| trades.iter().copied().collect())
7114    }
7115
7116    /// Gets all mark price updates for the `instrument_id`.
7117    #[must_use]
7118    pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
7119        self.mark_prices
7120            .get(instrument_id)
7121            .map(|mark_prices| mark_prices.iter().copied().collect())
7122    }
7123
7124    /// Gets all index price updates for the `instrument_id`.
7125    #[must_use]
7126    pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
7127        self.index_prices
7128            .get(instrument_id)
7129            .map(|index_prices| index_prices.iter().copied().collect())
7130    }
7131
7132    /// Gets all funding rate updates for the `instrument_id`.
7133    #[must_use]
7134    pub fn funding_rates(&self, instrument_id: &InstrumentId) -> Option<Vec<FundingRateUpdate>> {
7135        self.funding_rates
7136            .get(instrument_id)
7137            .map(|funding_rates| funding_rates.iter().copied().collect())
7138    }
7139
7140    /// Gets all instrument status updates for the `instrument_id`.
7141    #[must_use]
7142    pub fn instrument_statuses(
7143        &self,
7144        instrument_id: &InstrumentId,
7145    ) -> Option<Vec<InstrumentStatus>> {
7146        self.instrument_statuses
7147            .get(instrument_id)
7148            .map(|statuses| statuses.iter().copied().collect())
7149    }
7150
7151    /// Gets all bars for the `bar_type`.
7152    #[must_use]
7153    pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
7154        self.bars
7155            .get(bar_type)
7156            .map(|bars| bars.iter().copied().collect())
7157    }
7158
7159    /// Gets a reference to the order book for the `instrument_id`.
7160    #[must_use]
7161    pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
7162        self.books.get(instrument_id)
7163    }
7164
7165    /// Gets a reference to the order book for the `instrument_id`.
7166    ///
7167    /// # Errors
7168    ///
7169    /// Returns [`OrderBookLookupError::NotFound`] when the order book is not present in the cache.
7170    pub fn try_order_book(
7171        &self,
7172        instrument_id: &InstrumentId,
7173    ) -> Result<&OrderBook, OrderBookLookupError> {
7174        self.books
7175            .get(instrument_id)
7176            .ok_or_else(|| OrderBookLookupError::not_found(*instrument_id))
7177    }
7178
7179    /// Gets a reference to the order book for the `instrument_id`.
7180    #[must_use]
7181    pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
7182        self.books.get_mut(instrument_id)
7183    }
7184
7185    /// Gets a reference to the own order book for the `instrument_id`.
7186    #[must_use]
7187    pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
7188        self.own_books.get(instrument_id)
7189    }
7190
7191    /// Gets a reference to the own order book for the `instrument_id`.
7192    ///
7193    /// # Errors
7194    ///
7195    /// Returns [`OwnOrderBookLookupError::NotFound`] when the own order book is not present in the
7196    /// cache.
7197    pub fn try_own_order_book(
7198        &self,
7199        instrument_id: &InstrumentId,
7200    ) -> Result<&OwnOrderBook, OwnOrderBookLookupError> {
7201        self.own_books
7202            .get(instrument_id)
7203            .ok_or_else(|| OwnOrderBookLookupError::not_found(*instrument_id))
7204    }
7205
7206    /// Gets a reference to the own order book for the `instrument_id`.
7207    #[must_use]
7208    pub fn own_order_book_mut(
7209        &mut self,
7210        instrument_id: &InstrumentId,
7211    ) -> Option<&mut OwnOrderBook> {
7212        self.own_books.get_mut(instrument_id)
7213    }
7214
7215    /// Gets a reference to the latest quote for the `instrument_id`.
7216    #[must_use]
7217    pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
7218        self.quotes
7219            .get(instrument_id)
7220            .and_then(|quotes| quotes.front())
7221    }
7222
7223    /// Gets a reference to the quote at `index` for the `instrument_id`.
7224    ///
7225    /// Index 0 is the most recent.
7226    #[must_use]
7227    pub fn quote_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<&QuoteTick> {
7228        self.quotes
7229            .get(instrument_id)
7230            .and_then(|quotes| quotes.get(index))
7231    }
7232
7233    /// Gets a reference to the latest trade for the `instrument_id`.
7234    #[must_use]
7235    pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
7236        self.trades
7237            .get(instrument_id)
7238            .and_then(|trades| trades.front())
7239    }
7240
7241    /// Gets a reference to the trade at `index` for the `instrument_id`.
7242    ///
7243    /// Index 0 is the most recent.
7244    #[must_use]
7245    pub fn trade_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<&TradeTick> {
7246        self.trades
7247            .get(instrument_id)
7248            .and_then(|trades| trades.get(index))
7249    }
7250
7251    /// Gets a reference to the latest mark price update for the `instrument_id`.
7252    #[must_use]
7253    pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
7254        self.mark_prices
7255            .get(instrument_id)
7256            .and_then(|mark_prices| mark_prices.front())
7257    }
7258
7259    /// Gets a reference to the latest index price update for the `instrument_id`.
7260    #[must_use]
7261    pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
7262        self.index_prices
7263            .get(instrument_id)
7264            .and_then(|index_prices| index_prices.front())
7265    }
7266
7267    /// Gets a reference to the latest funding rate update for the `instrument_id`.
7268    #[must_use]
7269    pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
7270        self.funding_rates
7271            .get(instrument_id)
7272            .and_then(|funding_rates| funding_rates.front())
7273    }
7274
7275    /// Gets a reference to the latest instrument status update for the `instrument_id`.
7276    #[must_use]
7277    pub fn instrument_status(&self, instrument_id: &InstrumentId) -> Option<&InstrumentStatus> {
7278        self.instrument_statuses
7279            .get(instrument_id)
7280            .and_then(|statuses| statuses.front())
7281    }
7282
7283    /// Gets a reference to the latest bar for the `bar_type`.
7284    #[must_use]
7285    pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
7286        self.bars.get(bar_type).and_then(|bars| bars.front())
7287    }
7288
7289    /// Gets a reference to the bar at `index` for the `bar_type`.
7290    ///
7291    /// Index 0 is the most recent.
7292    #[must_use]
7293    pub fn bar_at_index(&self, bar_type: &BarType, index: usize) -> Option<&Bar> {
7294        self.bars.get(bar_type).and_then(|bars| bars.get(index))
7295    }
7296
7297    /// Gets the order book update count for the `instrument_id`.
7298    #[must_use]
7299    pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
7300        self.books
7301            .get(instrument_id)
7302            .map_or(0, |book| book.update_count) as usize
7303    }
7304
7305    /// Gets the quote tick count for the `instrument_id`.
7306    #[must_use]
7307    pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
7308        self.quotes
7309            .get(instrument_id)
7310            .map_or(0, BoundedVecDeque::len)
7311    }
7312
7313    /// Gets the trade tick count for the `instrument_id`.
7314    #[must_use]
7315    pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
7316        self.trades
7317            .get(instrument_id)
7318            .map_or(0, BoundedVecDeque::len)
7319    }
7320
7321    /// Gets the bar count for the `instrument_id`.
7322    #[must_use]
7323    pub fn bar_count(&self, bar_type: &BarType) -> usize {
7324        self.bars.get(bar_type).map_or(0, BoundedVecDeque::len)
7325    }
7326
7327    /// Returns whether the cache contains an order book for the `instrument_id`.
7328    #[must_use]
7329    pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
7330        self.books.contains_key(instrument_id)
7331    }
7332
7333    /// Returns whether the cache contains quotes for the `instrument_id`.
7334    #[must_use]
7335    pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
7336        self.quote_count(instrument_id) > 0
7337    }
7338
7339    /// Returns whether the cache contains trades for the `instrument_id`.
7340    #[must_use]
7341    pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
7342        self.trade_count(instrument_id) > 0
7343    }
7344
7345    /// Returns whether the cache contains bars for the `bar_type`.
7346    #[must_use]
7347    pub fn has_bars(&self, bar_type: &BarType) -> bool {
7348        self.bar_count(bar_type) > 0
7349    }
7350
7351    #[must_use]
7352    pub fn get_xrate(
7353        &self,
7354        venue: Venue,
7355        from_currency: Currency,
7356        to_currency: Currency,
7357        price_type: PriceType,
7358    ) -> Option<Decimal> {
7359        if from_currency == to_currency {
7360            // When the source and target currencies are identical,
7361            // no conversion is needed; return an exchange rate of one.
7362            return Some(Decimal::ONE);
7363        }
7364
7365        let (bid_quote, ask_quote) = self.build_quote_table(&venue);
7366
7367        match get_exchange_rate(
7368            from_currency.code,
7369            to_currency.code,
7370            price_type,
7371            bid_quote,
7372            ask_quote,
7373        ) {
7374            Ok(rate) => rate,
7375            Err(e) => {
7376                log::error!("Failed to calculate xrate: {e}");
7377                None
7378            }
7379        }
7380    }
7381
7382    fn build_quote_table(
7383        &self,
7384        venue: &Venue,
7385    ) -> (AHashMap<Ustr, Decimal>, AHashMap<Ustr, Decimal>) {
7386        let mut bid_quotes = AHashMap::new();
7387        let mut ask_quotes = AHashMap::new();
7388
7389        for instrument_id in self.instruments.keys() {
7390            if instrument_id.venue != *venue {
7391                continue;
7392            }
7393
7394            let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
7395                if let Some(tick) = ticks.front() {
7396                    (tick.bid_price, tick.ask_price)
7397                } else {
7398                    continue; // Empty ticks vector
7399                }
7400            } else {
7401                let bid_bar = self
7402                    .bars
7403                    .iter()
7404                    .find(|(k, _)| {
7405                        k.instrument_id() == *instrument_id
7406                            && matches!(k.spec().price_type, PriceType::Bid)
7407                    })
7408                    .map(|(_, v)| v);
7409
7410                let ask_bar = self
7411                    .bars
7412                    .iter()
7413                    .find(|(k, _)| {
7414                        k.instrument_id() == *instrument_id
7415                            && matches!(k.spec().price_type, PriceType::Ask)
7416                    })
7417                    .map(|(_, v)| v);
7418
7419                match (bid_bar, ask_bar) {
7420                    (Some(bid), Some(ask)) => {
7421                        match (bid.front(), ask.front()) {
7422                            (Some(bid_bar), Some(ask_bar)) => (bid_bar.close, ask_bar.close),
7423                            _ => {
7424                                // Empty bar VecDeques
7425                                continue;
7426                            }
7427                        }
7428                    }
7429                    _ => continue,
7430                }
7431            };
7432
7433            bid_quotes.insert(instrument_id.symbol.inner(), bid_price.as_decimal());
7434            ask_quotes.insert(instrument_id.symbol.inner(), ask_price.as_decimal());
7435        }
7436
7437        (bid_quotes, ask_quotes)
7438    }
7439
7440    /// Returns the mark exchange rate for the given currency pair, or `None` if not set.
7441    #[must_use]
7442    pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
7443        self.mark_xrates.get(&(from_currency, to_currency)).copied()
7444    }
7445
7446    /// Sets the mark exchange rate for the given currency pair and automatically sets the inverse rate.
7447    ///
7448    /// # Panics
7449    ///
7450    /// Panics if `xrate` is not positive.
7451    pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
7452        assert!(xrate > 0.0, "xrate was zero");
7453        self.mark_xrates.insert((from_currency, to_currency), xrate);
7454        self.mark_xrates
7455            .insert((to_currency, from_currency), 1.0 / xrate);
7456    }
7457
7458    /// Clears the mark exchange rate for the given currency pair.
7459    pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
7460        let _ = self.mark_xrates.remove(&(from_currency, to_currency));
7461    }
7462
7463    /// Clears all mark exchange rates.
7464    pub fn clear_mark_xrates(&mut self) {
7465        self.mark_xrates.clear();
7466    }
7467
7468    /// Returns a reference to the currency for the `code` (if found).
7469    #[must_use]
7470    pub fn currency(&self, code: &Ustr) -> Option<&Currency> {
7471        self.currencies.get(code)
7472    }
7473
7474    /// Returns a reference to the currency for the `code`.
7475    ///
7476    /// # Errors
7477    ///
7478    /// Returns [`CurrencyLookupError::NotFound`] when the currency is not present in the cache.
7479    pub fn try_currency(&self, code: &Ustr) -> Result<&Currency, CurrencyLookupError> {
7480        self.currencies
7481            .get(code)
7482            .ok_or_else(|| CurrencyLookupError::not_found(*code))
7483    }
7484
7485    // -- INSTRUMENT QUERIES ----------------------------------------------------------------------
7486
7487    /// Returns a reference to the instrument for the `instrument_id` (if found).
7488    #[must_use]
7489    pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
7490        self.instruments.get(instrument_id)
7491    }
7492
7493    /// Returns a reference to the instrument for the `instrument_id`.
7494    ///
7495    /// # Errors
7496    ///
7497    /// Returns [`InstrumentLookupError::NotFound`] when the instrument is not present in the cache.
7498    pub fn try_instrument(
7499        &self,
7500        instrument_id: &InstrumentId,
7501    ) -> Result<&InstrumentAny, InstrumentLookupError> {
7502        self.instruments
7503            .get(instrument_id)
7504            .ok_or_else(|| InstrumentLookupError::not_found(*instrument_id))
7505    }
7506
7507    /// Returns references to all instrument IDs for the `venue`.
7508    #[must_use]
7509    pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
7510        match venue {
7511            Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
7512            None => self.instruments.keys().collect(),
7513        }
7514    }
7515
7516    /// Returns references to all instruments for the `venue`.
7517    #[must_use]
7518    pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
7519        self.instruments
7520            .values()
7521            .filter(|i| &i.id().venue == venue)
7522            .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
7523            .collect()
7524    }
7525
7526    /// Returns references to all instruments for the `venue` whose underlying
7527    /// equals `root` and whose [`InstrumentClass`] equals `class`.
7528    ///
7529    /// Use when expanding a parent-symbol subscription: filtering by class as
7530    /// well as root prevents leaves of a different class (e.g. options when
7531    /// the user asked for futures, or vice versa) from being pulled in.
7532    #[must_use]
7533    pub fn instruments_by_parent(
7534        &self,
7535        venue: &Venue,
7536        root: &Ustr,
7537        class: InstrumentClass,
7538    ) -> Vec<&InstrumentAny> {
7539        self.instruments
7540            .values()
7541            .filter(|i| &i.id().venue == venue)
7542            .filter(|i| i.underlying() == Some(*root))
7543            .filter(|i| i.instrument_class() == class)
7544            .collect()
7545    }
7546
7547    /// Returns references to all bar types contained in the cache.
7548    #[must_use]
7549    pub fn bar_types(
7550        &self,
7551        instrument_id: Option<&InstrumentId>,
7552        price_type: Option<&PriceType>,
7553        aggregation_source: AggregationSource,
7554    ) -> Vec<&BarType> {
7555        let mut bar_types = self
7556            .bars
7557            .keys()
7558            .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
7559            .collect::<Vec<&BarType>>();
7560
7561        if let Some(instrument_id) = instrument_id {
7562            bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
7563        }
7564
7565        if let Some(price_type) = price_type {
7566            bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
7567        }
7568
7569        bar_types
7570    }
7571
7572    // -- SYNTHETIC QUERIES -----------------------------------------------------------------------
7573
7574    /// Returns a reference to the synthetic instrument for the `instrument_id` (if found).
7575    #[must_use]
7576    pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
7577        self.synthetics.get(instrument_id)
7578    }
7579
7580    /// Returns a reference to the synthetic instrument for the `instrument_id`.
7581    ///
7582    /// # Errors
7583    ///
7584    /// Returns [`SyntheticInstrumentLookupError::NotFound`] when the synthetic instrument is not
7585    /// present in the cache.
7586    pub fn try_synthetic(
7587        &self,
7588        instrument_id: &InstrumentId,
7589    ) -> Result<&SyntheticInstrument, SyntheticInstrumentLookupError> {
7590        self.synthetics
7591            .get(instrument_id)
7592            .ok_or_else(|| SyntheticInstrumentLookupError::not_found(*instrument_id))
7593    }
7594
7595    /// Returns references to instrument IDs for all synthetic instruments contained in the cache.
7596    #[must_use]
7597    pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
7598        self.synthetics.keys().collect()
7599    }
7600
7601    /// Returns references to all synthetic instruments contained in the cache.
7602    #[must_use]
7603    pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
7604        self.synthetics.values().collect()
7605    }
7606
7607    // -- ACCOUNT QUERIES -----------------------------------------------------------------------
7608
7609    /// Returns a borrow of the account for the `account_id` (if found).
7610    #[must_use]
7611    pub fn account_ref(&self, account_id: &AccountId) -> Option<AccountRef<'_>> {
7612        self.accounts
7613            .get(account_id)
7614            .map(|account_cell| AccountRef::new(account_cell.borrow()))
7615    }
7616
7617    /// Returns a borrow of the account for the `account_id` (if found).
7618    ///
7619    /// Prefer [`Self::account_ref`] in new native code.
7620    #[must_use]
7621    pub fn account(&self, account_id: &AccountId) -> Option<AccountRef<'_>> {
7622        self.account_ref(account_id)
7623    }
7624
7625    /// Returns a borrow of the account for the `account_id`.
7626    ///
7627    /// # Errors
7628    ///
7629    /// Returns [`AccountLookupError::NotFound`] when the account is not present in the cache.
7630    pub fn try_account_ref(
7631        &self,
7632        account_id: &AccountId,
7633    ) -> Result<AccountRef<'_>, AccountLookupError> {
7634        self.accounts
7635            .get(account_id)
7636            .map(|account_cell| AccountRef::new(account_cell.borrow()))
7637            .ok_or_else(|| AccountLookupError::not_found(*account_id))
7638    }
7639
7640    /// Returns a borrow of the account for the `account_id`.
7641    ///
7642    /// Prefer [`Self::try_account_ref`] in new native code.
7643    ///
7644    /// # Errors
7645    ///
7646    /// Returns [`AccountLookupError::NotFound`] when the account is not present in the cache.
7647    pub fn try_account(
7648        &self,
7649        account_id: &AccountId,
7650    ) -> Result<AccountRef<'_>, AccountLookupError> {
7651        self.try_account_ref(account_id)
7652    }
7653
7654    /// Gets an exclusive write borrow of the account with the `account_id` (if found).
7655    ///
7656    /// Requires `&mut Cache` so cache writes are reachable only by privileged crates that hold
7657    /// `Rc<RefCell<Cache>>` directly. Adapter-facing code receives [`CacheView`], which only
7658    /// exposes immutable cache borrows and therefore cannot reach this method.
7659    ///
7660    /// While the returned [`AccountRefMut`] is alive, no other read or write of the same account
7661    /// is permitted. Drop the borrow before dispatching events or taking any other cache borrow
7662    /// that may re-enter the same account.
7663    #[must_use]
7664    pub fn account_mut(&mut self, account_id: &AccountId) -> Option<AccountRefMut<'_>> {
7665        self.accounts
7666            .get(account_id)
7667            .map(|account_cell| AccountRefMut::new(account_cell.borrow_mut()))
7668    }
7669
7670    /// Gets an owned snapshot of the account with the `account_id` (if found).
7671    ///
7672    /// Use when downstream needs an owned [`AccountAny`] that crosses a boundary. The snapshot
7673    /// will not reflect later cache mutations.
7674    #[must_use]
7675    pub fn account_owned(&self, account_id: &AccountId) -> Option<AccountAny> {
7676        self.accounts
7677            .get(account_id)
7678            .map(|account_cell| account_cell.borrow().clone())
7679    }
7680
7681    /// Returns a borrow of the account for the `venue` (if found).
7682    #[must_use]
7683    pub fn account_for_venue(&self, venue: &Venue) -> Option<AccountRef<'_>> {
7684        self.index
7685            .venue_account
7686            .get(venue)
7687            .and_then(|account_id| self.accounts.get(account_id))
7688            .map(|account_cell| AccountRef::new(account_cell.borrow()))
7689    }
7690
7691    /// Returns an owned snapshot of the account for the `venue` (if found).
7692    ///
7693    /// Use when downstream needs an owned [`AccountAny`] that crosses a boundary. The snapshot
7694    /// will not reflect later cache mutations.
7695    #[must_use]
7696    pub fn account_for_venue_owned(&self, venue: &Venue) -> Option<AccountAny> {
7697        self.index
7698            .venue_account
7699            .get(venue)
7700            .and_then(|account_id| self.accounts.get(account_id))
7701            .map(|account_cell| account_cell.borrow().clone())
7702    }
7703
7704    /// Returns a reference to the account ID for the `venue` (if found).
7705    #[must_use]
7706    pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
7707        self.index.venue_account.get(venue)
7708    }
7709
7710    /// Returns borrows of all accounts for the `account_id`.
7711    ///
7712    /// Each [`AccountRef`] in the returned vector borrows its underlying cell; mutating any of
7713    /// those accounts while the vector is alive will panic at runtime. Drop the vector before
7714    /// issuing writes.
7715    #[must_use]
7716    pub fn accounts(&self, account_id: &AccountId) -> Vec<AccountRef<'_>> {
7717        self.accounts
7718            .values()
7719            .filter(|account_cell| &account_cell.borrow().id() == account_id)
7720            .map(|account_cell| AccountRef::new(account_cell.borrow()))
7721            .collect()
7722    }
7723
7724    /// Returns owned copies of every account in the cache.
7725    #[must_use]
7726    pub fn accounts_all_owned(&self) -> Vec<AccountAny> {
7727        self.accounts
7728            .values()
7729            .map(|account_cell| account_cell.borrow().clone())
7730            .collect()
7731    }
7732
7733    /// Updates the own order book with an order.
7734    ///
7735    /// This method adds, updates, or removes an order from the own order book
7736    /// based on the order's current state.
7737    ///
7738    /// Orders without prices (MARKET, etc.) are skipped as they cannot be
7739    /// represented in own books.
7740    pub fn update_own_order_book(&mut self, order: &OrderAny) {
7741        if !order.has_price() {
7742            return;
7743        }
7744
7745        let instrument_id = order.instrument_id();
7746
7747        if !self.own_books.contains_key(&instrument_id) {
7748            if order.is_closed() {
7749                return;
7750            }
7751
7752            self.own_books
7753                .insert(instrument_id, OwnOrderBook::new(instrument_id));
7754        }
7755
7756        let Some(own_book) = self.own_books.get_mut(&instrument_id) else {
7757            return;
7758        };
7759
7760        let own_book_order = order.to_own_book_order();
7761
7762        if order.is_closed() {
7763            if let Err(e) = own_book.delete(own_book_order) {
7764                log::debug!(
7765                    "Failed to delete order {} from own book: {e}",
7766                    order.client_order_id(),
7767                );
7768            } else {
7769                log::debug!("Deleted order {} from own book", order.client_order_id());
7770            }
7771        } else {
7772            // Add or update the order in the own book
7773            if let Err(e) = own_book.update(own_book_order) {
7774                log::debug!(
7775                    "Failed to update order {} in own book: {e}; inserting instead",
7776                    order.client_order_id(),
7777                );
7778                own_book.add(own_book_order);
7779            }
7780            log::debug!("Updated order {} in own book", order.client_order_id());
7781        }
7782    }
7783
7784    /// Force removal of an order from own order books and clean up all indexes.
7785    ///
7786    /// This method is used when order event application fails and we need to ensure
7787    /// terminal orders are properly cleaned up from own books and all relevant indexes.
7788    /// Replicates the index cleanup that `update_order` performs for closed orders.
7789    pub fn force_remove_from_own_order_book(&mut self, client_order_id: &ClientOrderId) {
7790        let Some(order_cell) = self.orders.get(client_order_id) else {
7791            return;
7792        };
7793        let order = order_cell.borrow();
7794        let instrument_id = order.instrument_id();
7795        let own_book_order = if order.has_price() {
7796            Some(order.to_own_book_order())
7797        } else {
7798            None
7799        };
7800        drop(order);
7801
7802        self.index.orders_open.remove(client_order_id);
7803        self.index.orders_pending_cancel.remove(client_order_id);
7804        self.index.orders_inflight.remove(client_order_id);
7805        self.index.orders_emulated.remove(client_order_id);
7806        self.index.orders_active_local.remove(client_order_id);
7807
7808        if let Some(own_book) = self.own_books.get_mut(&instrument_id)
7809            && let Some(own_book_order) = own_book_order
7810        {
7811            if let Err(e) = own_book.delete(own_book_order) {
7812                log::debug!("Could not force delete {client_order_id} from own book: {e}");
7813            } else {
7814                log::debug!("Force deleted {client_order_id} from own book");
7815            }
7816        }
7817
7818        self.index.orders_closed.insert(*client_order_id);
7819    }
7820
7821    /// Audit all own order books against open and inflight order indexes.
7822    ///
7823    /// Ensures closed orders are removed from own order books. This includes both
7824    /// orders tracked in `orders_open` (`ACCEPTED`, `TRIGGERED`, `PENDING_*`, `PARTIALLY_FILLED`)
7825    /// and `orders_inflight` (`INITIALIZED`, `SUBMITTED`) to prevent false positives
7826    /// during venue latency windows.
7827    pub fn audit_own_order_books(&mut self) {
7828        log::debug!("Starting own books audit");
7829        let start = std::time::Instant::now();
7830
7831        // Build union of open and inflight orders for audit,
7832        // this prevents false positives for SUBMITTED orders during venue latency.
7833        let valid_order_ids: AHashSet<ClientOrderId> = self
7834            .index
7835            .orders_open
7836            .union(&self.index.orders_inflight)
7837            .copied()
7838            .collect();
7839
7840        for own_book in self.own_books.values_mut() {
7841            own_book.audit_open_orders(&valid_order_ids);
7842        }
7843
7844        log::debug!("Completed own books audit in {:?}", start.elapsed());
7845    }
7846}
7847
7848fn parse_position_snapshot_blob_ref(blob_ref: &str) -> anyhow::Result<(PositionId, usize)> {
7849    let Some(rest) = blob_ref.strip_prefix("cache://position-snapshots/") else {
7850        anyhow::bail!("unsupported cache snapshot blob_ref {blob_ref}");
7851    };
7852
7853    let Some((position_id, snapshot_index)) = rest.rsplit_once('/') else {
7854        anyhow::bail!("malformed position snapshot blob_ref {blob_ref}");
7855    };
7856
7857    if position_id.is_empty() {
7858        anyhow::bail!("position snapshot blob_ref {blob_ref} has empty position id");
7859    }
7860
7861    let snapshot_index = snapshot_index.parse::<usize>().map_err(|e| {
7862        anyhow::anyhow!("position snapshot blob_ref {blob_ref} has invalid frame index: {e}")
7863    })?;
7864
7865    Ok((PositionId::new(position_id), snapshot_index))
7866}
7867
7868fn validate_position_snapshot_blob(position_id: &PositionId, blob: &[u8]) -> anyhow::Result<()> {
7869    let snapshot = serde_json::from_slice::<Position>(blob)?;
7870    let expected_prefix = format!("{}-", position_id.as_str());
7871
7872    let Some(snapshot_uuid) = snapshot.id.as_str().strip_prefix(&expected_prefix) else {
7873        anyhow::bail!(
7874            "position snapshot id {} does not match blob_ref position {position_id}",
7875            snapshot.id
7876        );
7877    };
7878
7879    if UUID4::from_str(snapshot_uuid).is_err() {
7880        anyhow::bail!(
7881            "position snapshot id {} does not match blob_ref position {position_id}",
7882            snapshot.id
7883        );
7884    }
7885
7886    Ok(())
7887}