Skip to main content

nautilus_infrastructure/redis/
cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Redis-backed cache database for the system.
17//!
18//! # Architecture
19//!
20//! Uses two Redis connections with distinct roles:
21//! - **READ** (`self.con`): synchronous queries (`keys`, `read`, `load_all`),
22//!   owned by the main struct.
23//! - **WRITE**: owned by a background task on `get_runtime()`, receives
24//!   commands via an unbounded `tokio::sync::mpsc` channel.
25//!
26//! All write operations (`insert`, `update`, `delete`, `flush`) are routed
27//! through the command channel so they execute on the WRITE connection. This
28//! avoids cross-runtime I/O issues since the WRITE connection is always
29//! created on the Nautilus runtime.
30//!
31//! Synchronous callers (`close`, `flushdb_sync`) use `std::sync::mpsc` reply
32//! channels to block until the background task confirms completion. When
33//! called from the Nautilus runtime itself, `block_in_place` is used
34//! automatically to avoid stalling the worker thread.
35
36use std::{
37    collections::VecDeque,
38    fmt::Debug,
39    ops::ControlFlow,
40    pin::Pin,
41    sync::mpsc::{self, SyncSender},
42    time::Duration,
43};
44
45use ahash::AHashMap;
46use bytes::Bytes;
47use nautilus_common::{
48    cache::{
49        CacheConfig,
50        database::{CacheDatabaseAdapter, CacheMap},
51    },
52    enums::SerializationEncoding,
53    live::get_runtime,
54    logging::{log_task_awaiting, log_task_started, log_task_stopped},
55    signal::Signal,
56};
57use nautilus_core::{UUID4, UnixNanos, correctness::check_slice_not_empty};
58use nautilus_cryptography::providers::install_cryptographic_provider;
59use nautilus_model::{
60    accounts::AccountAny,
61    data::{Bar, CustomData, DataType, FundingRateUpdate, HasTsInit, QuoteTick, TradeTick},
62    events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
63    identifiers::{
64        AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
65        TraderId, VenueOrderId,
66    },
67    instruments::{InstrumentAny, SyntheticInstrument},
68    orderbook::OrderBook,
69    orders::OrderAny,
70    position::Position,
71    types::Currency,
72};
73use redis::{Pipeline, aio::ConnectionManager};
74use ustr::Ustr;
75
76use super::{REDIS_DELIMITER, REDIS_FLUSHDB, get_index_key};
77use crate::redis::{create_redis_connection, queries::DatabaseQueries};
78
79// Task and connection names
80const CACHE_READ: &str = "cache-read";
81const CACHE_WRITE: &str = "cache-write";
82const CACHE_PROCESS: &str = "cache-process";
83
84// Error constants
85const FAILED_TX_CHANNEL: &str = "Failed to send to channel";
86
87// Collection keys
88const INDEX: &str = "index";
89const GENERAL: &str = "general";
90const CURRENCIES: &str = "currencies";
91const INSTRUMENTS: &str = "instruments";
92const SYNTHETICS: &str = "synthetics";
93const ACCOUNTS: &str = "accounts";
94const ORDERS: &str = "orders";
95const POSITIONS: &str = "positions";
96const ACTORS: &str = "actors";
97const STRATEGIES: &str = "strategies";
98const SNAPSHOTS: &str = "snapshots";
99const HEALTH: &str = "health";
100const CUSTOM: &str = "custom";
101
102// Index keys
103const INDEX_ORDER_IDS: &str = "index:order_ids";
104const INDEX_ORDER_POSITION: &str = "index:order_position";
105const INDEX_ORDER_CLIENT: &str = "index:order_client";
106const INDEX_ORDERS: &str = "index:orders";
107const INDEX_ORDERS_OPEN: &str = "index:orders_open";
108const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
109const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
110const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
111const INDEX_POSITIONS: &str = "index:positions";
112const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
113const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
114
115/// A type of database operation.
116#[derive(Clone, Debug)]
117pub enum DatabaseOperation {
118    Insert,
119    Update,
120    Delete,
121    Flush(SyncSender<()>),
122    Close,
123}
124
125/// Represents a database command to be performed which may be executed in a task.
126#[derive(Clone, Debug)]
127pub struct DatabaseCommand {
128    /// The database operation type.
129    pub op_type: DatabaseOperation,
130    /// The primary key for the operation.
131    pub key: Option<String>,
132    /// The data payload for the operation.
133    pub payload: Option<Vec<Bytes>>,
134}
135
136impl DatabaseCommand {
137    /// Creates a new [`DatabaseCommand`] instance.
138    #[must_use]
139    pub const fn new(op_type: DatabaseOperation, key: String, payload: Option<Vec<Bytes>>) -> Self {
140        Self {
141            op_type,
142            key: Some(key),
143            payload,
144        }
145    }
146
147    /// Initialize a `Close` database command, this is meant to close the database cache channel.
148    #[must_use]
149    pub const fn close() -> Self {
150        Self {
151            op_type: DatabaseOperation::Close,
152            key: None,
153            payload: None,
154        }
155    }
156}
157
158#[cfg_attr(
159    feature = "python",
160    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
161)]
162pub struct RedisCacheDatabase {
163    pub con: ConnectionManager,
164    pub trader_id: TraderId,
165    pub trader_key: String,
166    pub encoding: SerializationEncoding,
167    pub bulk_read_batch_size: Option<usize>,
168    tx: tokio::sync::mpsc::UnboundedSender<DatabaseCommand>,
169    handle: Option<tokio::task::JoinHandle<()>>,
170}
171
172impl Debug for RedisCacheDatabase {
173    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174        f.debug_struct(stringify!(RedisCacheDatabase))
175            .field("trader_id", &self.trader_id)
176            .field("encoding", &self.encoding)
177            .finish()
178    }
179}
180
181impl RedisCacheDatabase {
182    /// Creates a new [`RedisCacheDatabase`] instance for the given `trader_id`, `instance_id`, and `config`.
183    ///
184    /// # Errors
185    ///
186    /// Returns an error if:
187    /// - The database configuration is missing in `config`.
188    /// - Establishing the Redis connection fails.
189    /// - The command processing task cannot be spawned.
190    pub async fn new(
191        trader_id: TraderId,
192        instance_id: UUID4,
193        config: CacheConfig,
194    ) -> anyhow::Result<Self> {
195        install_cryptographic_provider();
196
197        let db_config = config
198            .database
199            .as_ref()
200            .ok_or_else(|| anyhow::anyhow!("No database config"))?;
201        let con = create_redis_connection(CACHE_READ, db_config.clone()).await?;
202
203        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseCommand>();
204        let trader_key = get_trader_key(trader_id, instance_id, &config);
205        let trader_key_clone = trader_key.clone();
206        let encoding = config.encoding;
207        let bulk_read_batch_size = config.bulk_read_batch_size;
208
209        let handle = get_runtime().spawn(async move {
210            if let Err(e) = process_commands(rx, trader_key_clone, config.clone()).await {
211                log::error!("Error in task '{CACHE_PROCESS}': {e}");
212            }
213        });
214
215        Ok(Self {
216            con,
217            trader_id,
218            trader_key,
219            encoding,
220            bulk_read_batch_size,
221            tx,
222            handle: Some(handle),
223        })
224    }
225
226    #[must_use]
227    pub const fn get_encoding(&self) -> SerializationEncoding {
228        self.encoding
229    }
230
231    #[must_use]
232    pub fn get_trader_key(&self) -> &str {
233        &self.trader_key
234    }
235
236    pub fn close(&mut self) {
237        log::debug!("Closing");
238
239        let Some(handle) = self.handle.take() else {
240            log::debug!("Already closed");
241            return;
242        };
243
244        if let Err(e) = self.tx.send(DatabaseCommand::close()) {
245            log::debug!("Error sending close command: {e:?}");
246        }
247
248        log_task_awaiting(CACHE_PROCESS);
249
250        let (tx, rx) = mpsc::sync_channel(1);
251
252        get_runtime().spawn(async move {
253            if let Err(e) = handle.await {
254                log::error!("Error awaiting task '{CACHE_PROCESS}': {e:?}");
255            }
256            let _ = tx.send(());
257        });
258        let _ = blocking_recv(&rx);
259
260        log::debug!("Closed");
261    }
262
263    pub async fn flushdb(&mut self) {
264        if let Err(e) = redis::cmd(REDIS_FLUSHDB)
265            .query_async::<()>(&mut self.con)
266            .await
267        {
268            log::error!("Failed to flush database: {e:?}");
269        }
270    }
271
272    /// Sends a flush command through the background task channel and blocks
273    /// until it completes. Safe to call from any runtime context.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if the command channel is closed or the reply is lost.
278    pub fn flushdb_sync(&self) -> anyhow::Result<()> {
279        let (reply_tx, reply_rx) = mpsc::sync_channel(1);
280        let cmd = DatabaseCommand {
281            op_type: DatabaseOperation::Flush(reply_tx),
282            key: None,
283            payload: None,
284        };
285        self.tx
286            .send(cmd)
287            .map_err(|e| anyhow::anyhow!("{FAILED_TX_CHANNEL}: {e}"))?;
288        blocking_recv(&reply_rx).map_err(|e| anyhow::anyhow!("Failed to flush database: {e}"))?;
289        Ok(())
290    }
291
292    /// Retrieves all keys matching the given `pattern` from Redis for this trader.
293    ///
294    /// # Errors
295    ///
296    /// Returns an error if the underlying Redis scan operation fails.
297    pub async fn keys(&mut self, pattern: &str) -> anyhow::Result<Vec<String>> {
298        let pattern = format!("{}{REDIS_DELIMITER}{pattern}", self.trader_key);
299        DatabaseQueries::scan_keys(&mut self.con, pattern).await
300    }
301
302    /// Reads the value(s) associated with `key` for this trader from Redis.
303    ///
304    /// # Errors
305    ///
306    /// Returns an error if the underlying Redis read operation fails.
307    pub async fn read(&mut self, key: &str) -> anyhow::Result<Vec<Bytes>> {
308        DatabaseQueries::read(&self.con, &self.trader_key, key).await
309    }
310
311    /// Reads multiple values using bulk operations for efficiency.
312    ///
313    /// # Errors
314    ///
315    /// Returns an error if the underlying Redis read operation fails.
316    pub async fn read_bulk(&mut self, keys: &[String]) -> anyhow::Result<Vec<Option<Bytes>>> {
317        match self.bulk_read_batch_size {
318            Some(batch_size) => {
319                DatabaseQueries::read_bulk_batched(&self.con, keys, batch_size).await
320            }
321            None => DatabaseQueries::read_bulk(&self.con, keys).await,
322        }
323    }
324
325    /// Loads custom data from Redis matching the given `data_type` (blocking).
326    ///
327    /// Spawns the async query on the global Nautilus runtime and blocks until
328    /// the result arrives via a channel. Safe from any thread context (Python,
329    /// test runtimes, plain threads).
330    ///
331    /// # Errors
332    ///
333    /// Returns an error if the query fails or the reply channel is closed.
334    pub fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
335        let con = self.con.clone();
336        let trader_key = self.trader_key.clone();
337        let data_type = data_type.clone();
338        let (tx, rx) = mpsc::channel();
339
340        get_runtime().spawn(async move {
341            let result = DatabaseQueries::load_custom_data(&con, &trader_key, &data_type).await;
342            if let Err(e) = tx.send(result) {
343                log::error!("Failed to send custom data result for '{data_type}': {e:?}");
344            }
345        });
346
347        blocking_recv(&rx).map_err(|e| anyhow::anyhow!("load_custom_data channel closed: {e}"))?
348    }
349
350    /// Sends an insert command for `key` with optional `payload` to Redis via the background task.
351    ///
352    /// # Errors
353    ///
354    /// Returns an error if the command cannot be sent to the background task channel.
355    pub fn insert(&self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
356        let op = DatabaseCommand::new(DatabaseOperation::Insert, key, payload);
357        match self.tx.send(op) {
358            Ok(()) => Ok(()),
359            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
360        }
361    }
362
363    /// Stores custom data in Redis (key format: `custom:<ts_init_020>:<uuid>`, value: full JSON).
364    ///
365    /// # Errors
366    ///
367    /// Returns an error if serialization fails or the insert command cannot be sent.
368    pub fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
369        let json_bytes = serde_json::to_vec(data)
370            .map_err(|e| anyhow::anyhow!("CustomData serialization failed: {e}"))?;
371        let ts_init = data.ts_init().as_u64();
372        let key = format!(
373            "{CUSTOM}{REDIS_DELIMITER}{:020}{REDIS_DELIMITER}{}",
374            ts_init,
375            UUID4::new()
376        );
377        self.insert(key, Some(vec![Bytes::from(json_bytes)]))
378    }
379
380    /// Sends an update command for `key` with optional `payload` to Redis via the background task.
381    ///
382    /// # Errors
383    ///
384    /// Returns an error if the command cannot be sent to the background task channel.
385    pub fn update(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
386        let op = DatabaseCommand::new(DatabaseOperation::Update, key, payload);
387        match self.tx.send(op) {
388            Ok(()) => Ok(()),
389            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
390        }
391    }
392
393    /// Sends a delete command for `key` with optional `payload` to Redis via the background task.
394    ///
395    /// # Errors
396    ///
397    /// Returns an error if the command cannot be sent to the background task channel.
398    pub fn delete(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
399        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, payload);
400        match self.tx.send(op) {
401            Ok(()) => Ok(()),
402            Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
403        }
404    }
405
406    /// Delete the given order from the database with full index cleanup.
407    ///
408    /// # Errors
409    ///
410    /// Returns an error if the command cannot be sent to the background task channel.
411    pub fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
412        let order_id_bytes = Bytes::from(client_order_id.to_string());
413
414        // Delete the order itself
415        let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
416        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
417        self.tx
418            .send(op)
419            .map_err(|e| anyhow::anyhow!("Failed to send delete order command: {e}"))?;
420
421        // Delete from all order indexes
422        let index_keys = [
423            INDEX_ORDER_IDS,
424            INDEX_ORDERS,
425            INDEX_ORDERS_OPEN,
426            INDEX_ORDERS_CLOSED,
427            INDEX_ORDERS_EMULATED,
428            INDEX_ORDERS_INFLIGHT,
429        ];
430
431        for index_key in &index_keys {
432            let key = (*index_key).to_string();
433            let payload = vec![order_id_bytes.clone()];
434            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
435            self.tx
436                .send(op)
437                .map_err(|e| anyhow::anyhow!("Failed to send delete order index command: {e}"))?;
438        }
439
440        // Delete from hash indexes
441        let hash_indexes = [INDEX_ORDER_POSITION, INDEX_ORDER_CLIENT];
442        for index_key in &hash_indexes {
443            let key = (*index_key).to_string();
444            let payload = vec![order_id_bytes.clone()];
445            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
446            self.tx.send(op).map_err(|e| {
447                anyhow::anyhow!("Failed to send delete order hash index command: {e}")
448            })?;
449        }
450
451        Ok(())
452    }
453
454    /// Delete the given position from the database with full index cleanup.
455    ///
456    /// # Errors
457    ///
458    /// Returns an error if the command cannot be sent to the background task channel.
459    pub fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
460        let position_id_bytes = Bytes::from(position_id.to_string());
461
462        // Delete the position itself
463        let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
464        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
465        self.tx
466            .send(op)
467            .map_err(|e| anyhow::anyhow!("Failed to send delete position command: {e}"))?;
468
469        // Delete from all position indexes
470        let index_keys = [
471            INDEX_POSITIONS,
472            INDEX_POSITIONS_OPEN,
473            INDEX_POSITIONS_CLOSED,
474        ];
475
476        for index_key in &index_keys {
477            let key = (*index_key).to_string();
478            let payload = vec![position_id_bytes.clone()];
479            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
480            self.tx.send(op).map_err(|e| {
481                anyhow::anyhow!("Failed to send delete position index command: {e}")
482            })?;
483        }
484
485        Ok(())
486    }
487
488    /// Delete the given account event from the database.
489    ///
490    /// # Errors
491    ///
492    /// Returns an error if the command cannot be sent to the background task channel.
493    pub fn delete_account_event(
494        &self,
495        _account_id: &AccountId,
496        _event_id: &str,
497    ) -> anyhow::Result<()> {
498        log::warn!("Deleting account events currently a no-op (pending redesign)");
499        Ok(())
500    }
501}
502
503fn blocking_recv<T>(rx: &mpsc::Receiver<T>) -> Result<T, mpsc::RecvError> {
504    let on_nautilus_runtime = tokio::runtime::Handle::try_current()
505        .ok()
506        .is_some_and(|h| h.id() == get_runtime().handle().id());
507
508    if on_nautilus_runtime {
509        tokio::task::block_in_place(|| rx.recv())
510    } else {
511        rx.recv()
512    }
513}
514
515async fn process_commands(
516    mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseCommand>,
517    trader_key: String,
518    config: CacheConfig,
519) -> anyhow::Result<()> {
520    log_task_started(CACHE_PROCESS);
521
522    let db_config = config
523        .database
524        .as_ref()
525        .ok_or_else(|| anyhow::anyhow!("No database config"))?;
526    let mut con = create_redis_connection(CACHE_WRITE, db_config.clone()).await?;
527
528    // Buffering
529    let mut buffer: VecDeque<DatabaseCommand> = VecDeque::new();
530    let buffer_interval = Duration::from_millis(config.buffer_interval_ms.unwrap_or(0) as u64);
531
532    // A sleep used to trigger periodic flushing of the buffer.
533    // When `buffer_interval` is zero we skip using the timer and flush immediately
534    // after every message.
535    let flush_timer = tokio::time::sleep(buffer_interval);
536    tokio::pin!(flush_timer);
537
538    // Continue to receive and handle messages until channel is hung up
539    loop {
540        tokio::select! {
541            maybe_cmd = rx.recv() => {
542                let result = handle_command(
543                    maybe_cmd,
544                    &mut buffer,
545                    buffer_interval,
546                    &mut con,
547                    &trader_key,
548                ).await;
549
550                if result.is_break() {
551                    break;
552                }
553            }
554            () = &mut flush_timer, if !buffer_interval.is_zero() => {
555                flush_buffer(&mut buffer, &mut con, &trader_key, &mut flush_timer, buffer_interval).await;
556            }
557        }
558    }
559
560    // Drain any remaining messages
561    if !buffer.is_empty() {
562        drain_buffer(&mut con, &trader_key, &mut buffer).await;
563    }
564
565    log_task_stopped(CACHE_PROCESS);
566    Ok(())
567}
568
569async fn handle_command(
570    maybe_cmd: Option<DatabaseCommand>,
571    buffer: &mut VecDeque<DatabaseCommand>,
572    buffer_interval: Duration,
573    con: &mut ConnectionManager,
574    trader_key: &str,
575) -> ControlFlow<()> {
576    let Some(cmd) = maybe_cmd else {
577        log::debug!("Command channel closed");
578        return ControlFlow::Break(());
579    };
580
581    log::trace!("Received {cmd:?}");
582
583    match cmd.op_type {
584        DatabaseOperation::Close => {
585            if !buffer.is_empty() {
586                drain_buffer(con, trader_key, buffer).await;
587            }
588            return ControlFlow::Break(());
589        }
590        DatabaseOperation::Flush(reply_tx) => {
591            if !buffer.is_empty() {
592                drain_buffer(con, trader_key, buffer).await;
593            }
594
595            if let Err(e) = redis::cmd(REDIS_FLUSHDB).query_async::<()>(con).await {
596                log::error!("Failed to flush database: {e:?}");
597            }
598            let _ = reply_tx.send(());
599            return ControlFlow::Continue(());
600        }
601        _ => {}
602    }
603
604    buffer.push_back(cmd);
605
606    if buffer_interval.is_zero() {
607        drain_buffer(con, trader_key, buffer).await;
608    }
609
610    ControlFlow::Continue(())
611}
612
613async fn flush_buffer(
614    buffer: &mut VecDeque<DatabaseCommand>,
615    con: &mut ConnectionManager,
616    trader_key: &str,
617    flush_timer: &mut Pin<&mut tokio::time::Sleep>,
618    buffer_interval: Duration,
619) {
620    if !buffer.is_empty() {
621        drain_buffer(con, trader_key, buffer).await;
622    }
623    flush_timer
624        .as_mut()
625        .reset(tokio::time::Instant::now() + buffer_interval);
626}
627
628async fn drain_buffer(
629    conn: &mut ConnectionManager,
630    trader_key: &str,
631    buffer: &mut VecDeque<DatabaseCommand>,
632) {
633    let mut pipe = redis::pipe();
634    pipe.atomic();
635
636    for msg in buffer.drain(..) {
637        let key = if let Some(key) = msg.key {
638            key
639        } else {
640            log::error!("Null key found for message: {msg:?}");
641            continue;
642        };
643        let collection = match get_collection_key(&key) {
644            Ok(collection) => collection,
645            Err(e) => {
646                log::error!("{e}");
647                continue; // Continue to next message
648            }
649        };
650
651        let key = format!("{trader_key}{REDIS_DELIMITER}{}", &key);
652
653        match msg.op_type {
654            DatabaseOperation::Insert => {
655                if let Some(payload) = msg.payload {
656                    log::debug!("Processing INSERT for collection: {collection}, key: {key}");
657                    if let Err(e) = insert(&mut pipe, collection, &key, &payload) {
658                        log::error!("{e}");
659                    }
660                } else {
661                    log::error!("Null `payload` for `insert`");
662                }
663            }
664            DatabaseOperation::Update => {
665                if let Some(payload) = msg.payload {
666                    log::debug!("Processing UPDATE for collection: {collection}, key: {key}");
667                    if let Err(e) = update(&mut pipe, collection, &key, &payload) {
668                        log::error!("{e}");
669                    }
670                } else {
671                    log::error!("Null `payload` for `update`");
672                }
673            }
674            DatabaseOperation::Delete => {
675                log::debug!(
676                    "Processing DELETE for collection: {}, key: {}, payload: {:?}",
677                    collection,
678                    key,
679                    msg.payload.as_ref().map(std::vec::Vec::len)
680                );
681                // `payload` can be `None` for a delete operation
682                if let Err(e) = delete(&mut pipe, collection, &key, msg.payload) {
683                    log::error!("{e}");
684                }
685            }
686            DatabaseOperation::Close => panic!("Close command should not be drained"),
687            DatabaseOperation::Flush(_) => panic!("Flush command should not be drained"),
688        }
689    }
690
691    if let Err(e) = pipe.query_async::<()>(conn).await {
692        log::error!("{e}");
693    }
694}
695
696fn insert(pipe: &mut Pipeline, collection: &str, key: &str, value: &[Bytes]) -> anyhow::Result<()> {
697    check_slice_not_empty(value, stringify!(value))?;
698
699    match collection {
700        INDEX => insert_index(pipe, key, value),
701        GENERAL => {
702            insert_string(pipe, key, value[0].as_ref());
703            Ok(())
704        }
705        CURRENCIES => {
706            insert_string(pipe, key, value[0].as_ref());
707            Ok(())
708        }
709        INSTRUMENTS => {
710            insert_string(pipe, key, value[0].as_ref());
711            Ok(())
712        }
713        SYNTHETICS => {
714            insert_string(pipe, key, value[0].as_ref());
715            Ok(())
716        }
717        ACCOUNTS => {
718            insert_list(pipe, key, value[0].as_ref());
719            Ok(())
720        }
721        ORDERS => {
722            insert_list(pipe, key, value[0].as_ref());
723            Ok(())
724        }
725        POSITIONS => {
726            insert_list(pipe, key, value[0].as_ref());
727            Ok(())
728        }
729        ACTORS => {
730            insert_string(pipe, key, value[0].as_ref());
731            Ok(())
732        }
733        STRATEGIES => {
734            insert_string(pipe, key, value[0].as_ref());
735            Ok(())
736        }
737        SNAPSHOTS => {
738            insert_list(pipe, key, value[0].as_ref());
739            Ok(())
740        }
741        HEALTH => {
742            insert_string(pipe, key, value[0].as_ref());
743            Ok(())
744        }
745        CUSTOM => {
746            insert_string(pipe, key, value[0].as_ref());
747            Ok(())
748        }
749        _ => anyhow::bail!("Unsupported operation: `insert` for collection '{collection}'"),
750    }
751}
752
753fn insert_index(pipe: &mut Pipeline, key: &str, value: &[Bytes]) -> anyhow::Result<()> {
754    let index_key = get_index_key(key)?;
755    match index_key {
756        INDEX_ORDER_IDS => {
757            insert_set(pipe, key, value[0].as_ref());
758            Ok(())
759        }
760        INDEX_ORDER_POSITION => {
761            insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
762            Ok(())
763        }
764        INDEX_ORDER_CLIENT => {
765            insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
766            Ok(())
767        }
768        INDEX_ORDERS => {
769            insert_set(pipe, key, value[0].as_ref());
770            Ok(())
771        }
772        INDEX_ORDERS_OPEN => {
773            insert_set(pipe, key, value[0].as_ref());
774            Ok(())
775        }
776        INDEX_ORDERS_CLOSED => {
777            insert_set(pipe, key, value[0].as_ref());
778            Ok(())
779        }
780        INDEX_ORDERS_EMULATED => {
781            insert_set(pipe, key, value[0].as_ref());
782            Ok(())
783        }
784        INDEX_ORDERS_INFLIGHT => {
785            insert_set(pipe, key, value[0].as_ref());
786            Ok(())
787        }
788        INDEX_POSITIONS => {
789            insert_set(pipe, key, value[0].as_ref());
790            Ok(())
791        }
792        INDEX_POSITIONS_OPEN => {
793            insert_set(pipe, key, value[0].as_ref());
794            Ok(())
795        }
796        INDEX_POSITIONS_CLOSED => {
797            insert_set(pipe, key, value[0].as_ref());
798            Ok(())
799        }
800        _ => anyhow::bail!("Index unknown '{index_key}' on insert"),
801    }
802}
803
804fn insert_string(pipe: &mut Pipeline, key: &str, value: &[u8]) {
805    pipe.set(key, value);
806}
807
808fn insert_set(pipe: &mut Pipeline, key: &str, value: &[u8]) {
809    pipe.sadd(key, value);
810}
811
812fn insert_hset(pipe: &mut Pipeline, key: &str, name: &[u8], value: &[u8]) {
813    pipe.hset(key, name, value);
814}
815
816fn insert_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
817    pipe.rpush(key, value);
818}
819
820fn update(pipe: &mut Pipeline, collection: &str, key: &str, value: &[Bytes]) -> anyhow::Result<()> {
821    check_slice_not_empty(value, stringify!(value))?;
822
823    match collection {
824        ACCOUNTS => {
825            update_list(pipe, key, value[0].as_ref());
826            Ok(())
827        }
828        ORDERS => {
829            update_list(pipe, key, value[0].as_ref());
830            Ok(())
831        }
832        POSITIONS => {
833            update_list(pipe, key, value[0].as_ref());
834            Ok(())
835        }
836        _ => anyhow::bail!("Unsupported operation: `update` for collection '{collection}'"),
837    }
838}
839
840fn update_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
841    pipe.rpush_exists(key, value);
842}
843
844fn delete(
845    pipe: &mut Pipeline,
846    collection: &str,
847    key: &str,
848    value: Option<Vec<Bytes>>,
849) -> anyhow::Result<()> {
850    log::debug!(
851        "delete: collection={}, key={}, has_payload={}",
852        collection,
853        key,
854        value.is_some()
855    );
856
857    match collection {
858        INDEX => delete_from_index(pipe, key, value),
859        ORDERS => {
860            delete_string(pipe, key);
861            Ok(())
862        }
863        POSITIONS => {
864            delete_string(pipe, key);
865            Ok(())
866        }
867        ACCOUNTS => {
868            delete_string(pipe, key);
869            Ok(())
870        }
871        ACTORS => {
872            delete_string(pipe, key);
873            Ok(())
874        }
875        STRATEGIES => {
876            delete_string(pipe, key);
877            Ok(())
878        }
879        _ => anyhow::bail!("Unsupported operation: `delete` for collection '{collection}'"),
880    }
881}
882
883fn delete_from_index(
884    pipe: &mut Pipeline,
885    key: &str,
886    value: Option<Vec<Bytes>>,
887) -> anyhow::Result<()> {
888    let value = value.ok_or_else(|| anyhow::anyhow!("Empty `payload` for `delete` '{key}'"))?;
889    let index_key = get_index_key(key)?;
890
891    match index_key {
892        INDEX_ORDER_IDS => {
893            remove_from_set(pipe, key, value[0].as_ref());
894            Ok(())
895        }
896        INDEX_ORDER_POSITION => {
897            remove_from_hash(pipe, key, value[0].as_ref());
898            Ok(())
899        }
900        INDEX_ORDER_CLIENT => {
901            remove_from_hash(pipe, key, value[0].as_ref());
902            Ok(())
903        }
904        INDEX_ORDERS => {
905            remove_from_set(pipe, key, value[0].as_ref());
906            Ok(())
907        }
908        INDEX_ORDERS_OPEN => {
909            remove_from_set(pipe, key, value[0].as_ref());
910            Ok(())
911        }
912        INDEX_ORDERS_CLOSED => {
913            remove_from_set(pipe, key, value[0].as_ref());
914            Ok(())
915        }
916        INDEX_ORDERS_EMULATED => {
917            remove_from_set(pipe, key, value[0].as_ref());
918            Ok(())
919        }
920        INDEX_ORDERS_INFLIGHT => {
921            remove_from_set(pipe, key, value[0].as_ref());
922            Ok(())
923        }
924        INDEX_POSITIONS => {
925            remove_from_set(pipe, key, value[0].as_ref());
926            Ok(())
927        }
928        INDEX_POSITIONS_OPEN => {
929            remove_from_set(pipe, key, value[0].as_ref());
930            Ok(())
931        }
932        INDEX_POSITIONS_CLOSED => {
933            remove_from_set(pipe, key, value[0].as_ref());
934            Ok(())
935        }
936        _ => anyhow::bail!("Unsupported index operation: remove from '{index_key}'"),
937    }
938}
939
940fn remove_from_set(pipe: &mut Pipeline, key: &str, member: &[u8]) {
941    pipe.srem(key, member);
942}
943
944fn remove_from_hash(pipe: &mut Pipeline, key: &str, field: &[u8]) {
945    pipe.hdel(key, field);
946}
947
948fn delete_string(pipe: &mut Pipeline, key: &str) {
949    pipe.del(key);
950}
951
952fn get_trader_key(trader_id: TraderId, instance_id: UUID4, config: &CacheConfig) -> String {
953    let mut key = String::new();
954
955    if config.use_trader_prefix {
956        key.push_str("trader-");
957    }
958
959    key.push_str(trader_id.as_str());
960
961    if config.use_instance_id {
962        key.push(REDIS_DELIMITER);
963        key.push_str(&format!("{instance_id}"));
964    }
965
966    key
967}
968
969fn get_collection_key(key: &str) -> anyhow::Result<&str> {
970    key.split_once(REDIS_DELIMITER)
971        .map(|(collection, _)| collection)
972        .ok_or_else(|| {
973            anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
974        })
975}
976
977#[allow(dead_code)]
978#[derive(Debug)]
979pub struct RedisCacheDatabaseAdapter {
980    pub encoding: SerializationEncoding,
981    pub database: RedisCacheDatabase,
982}
983
984#[allow(dead_code)]
985#[allow(unused)]
986#[async_trait::async_trait]
987impl CacheDatabaseAdapter for RedisCacheDatabaseAdapter {
988    fn close(&mut self) -> anyhow::Result<()> {
989        self.database.close();
990        Ok(())
991    }
992
993    fn flush(&mut self) -> anyhow::Result<()> {
994        self.database.flushdb_sync()
995    }
996
997    async fn load_all(&self) -> anyhow::Result<CacheMap> {
998        log::debug!("Loading all data");
999
1000        let (
1001            currencies,
1002            instruments,
1003            synthetics,
1004            accounts,
1005            orders,
1006            positions,
1007            greeks,
1008            yield_curves,
1009        ) = tokio::try_join!(
1010            self.load_currencies(),
1011            self.load_instruments(),
1012            self.load_synthetics(),
1013            self.load_accounts(),
1014            self.load_orders(),
1015            self.load_positions(),
1016            self.load_greeks(),
1017            self.load_yield_curves()
1018        )
1019        .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
1020
1021        Ok(CacheMap {
1022            currencies,
1023            instruments,
1024            synthetics,
1025            accounts,
1026            orders,
1027            positions,
1028            greeks,
1029            yield_curves,
1030        })
1031    }
1032
1033    fn load(&self) -> anyhow::Result<AHashMap<String, Bytes>> {
1034        // self.database.load()
1035        Ok(AHashMap::new()) // TODO
1036    }
1037
1038    async fn load_currencies(&self) -> anyhow::Result<AHashMap<Ustr, Currency>> {
1039        DatabaseQueries::load_currencies(
1040            &self.database.con,
1041            &self.database.trader_key,
1042            self.encoding,
1043        )
1044        .await
1045    }
1046
1047    async fn load_instruments(&self) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
1048        DatabaseQueries::load_instruments(
1049            &self.database.con,
1050            &self.database.trader_key,
1051            self.encoding,
1052        )
1053        .await
1054    }
1055
1056    async fn load_synthetics(&self) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
1057        DatabaseQueries::load_synthetics(
1058            &self.database.con,
1059            &self.database.trader_key,
1060            self.encoding,
1061        )
1062        .await
1063    }
1064
1065    async fn load_accounts(&self) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
1066        DatabaseQueries::load_accounts(&self.database.con, &self.database.trader_key, self.encoding)
1067            .await
1068    }
1069
1070    async fn load_orders(&self) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
1071        DatabaseQueries::load_orders(&self.database.con, &self.database.trader_key, self.encoding)
1072            .await
1073    }
1074
1075    async fn load_positions(&self) -> anyhow::Result<AHashMap<PositionId, Position>> {
1076        DatabaseQueries::load_positions(
1077            &self.database.con,
1078            &self.database.trader_key,
1079            self.encoding,
1080        )
1081        .await
1082    }
1083
1084    fn load_index_order_position(&self) -> anyhow::Result<AHashMap<ClientOrderId, Position>> {
1085        todo!()
1086    }
1087
1088    fn load_index_order_client(&self) -> anyhow::Result<AHashMap<ClientOrderId, ClientId>> {
1089        todo!()
1090    }
1091
1092    async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
1093        DatabaseQueries::load_currency(
1094            &self.database.con,
1095            &self.database.trader_key,
1096            code,
1097            self.encoding,
1098        )
1099        .await
1100    }
1101
1102    async fn load_instrument(
1103        &self,
1104        instrument_id: &InstrumentId,
1105    ) -> anyhow::Result<Option<InstrumentAny>> {
1106        DatabaseQueries::load_instrument(
1107            &self.database.con,
1108            &self.database.trader_key,
1109            instrument_id,
1110            self.encoding,
1111        )
1112        .await
1113    }
1114
1115    async fn load_synthetic(
1116        &self,
1117        instrument_id: &InstrumentId,
1118    ) -> anyhow::Result<Option<SyntheticInstrument>> {
1119        DatabaseQueries::load_synthetic(
1120            &self.database.con,
1121            &self.database.trader_key,
1122            instrument_id,
1123            self.encoding,
1124        )
1125        .await
1126    }
1127
1128    async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
1129        DatabaseQueries::load_account(
1130            &self.database.con,
1131            &self.database.trader_key,
1132            account_id,
1133            self.encoding,
1134        )
1135        .await
1136    }
1137
1138    async fn load_order(
1139        &self,
1140        client_order_id: &ClientOrderId,
1141    ) -> anyhow::Result<Option<OrderAny>> {
1142        DatabaseQueries::load_order(
1143            &self.database.con,
1144            &self.database.trader_key,
1145            client_order_id,
1146            self.encoding,
1147        )
1148        .await
1149    }
1150
1151    async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
1152        DatabaseQueries::load_position(
1153            &self.database.con,
1154            &self.database.trader_key,
1155            position_id,
1156            self.encoding,
1157        )
1158        .await
1159    }
1160
1161    fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<AHashMap<String, Bytes>> {
1162        todo!()
1163    }
1164
1165    fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<AHashMap<String, Bytes>> {
1166        todo!()
1167    }
1168
1169    fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
1170        anyhow::bail!("Loading signals from Redis cache adapter not supported")
1171    }
1172
1173    fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
1174        self.database.load_custom_data(data_type)
1175    }
1176
1177    fn load_order_snapshot(
1178        &self,
1179        client_order_id: &ClientOrderId,
1180    ) -> anyhow::Result<Option<OrderSnapshot>> {
1181        anyhow::bail!("Loading order snapshots from Redis cache adapter not supported")
1182    }
1183
1184    fn load_position_snapshot(
1185        &self,
1186        position_id: &PositionId,
1187    ) -> anyhow::Result<Option<PositionSnapshot>> {
1188        anyhow::bail!("Loading position snapshots from Redis cache adapter not supported")
1189    }
1190
1191    fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
1192        anyhow::bail!("Loading quote data for Redis cache adapter not supported")
1193    }
1194
1195    fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
1196        anyhow::bail!("Loading market data for Redis cache adapter not supported")
1197    }
1198
1199    fn load_funding_rates(
1200        &self,
1201        instrument_id: &InstrumentId,
1202    ) -> anyhow::Result<Vec<FundingRateUpdate>> {
1203        anyhow::bail!("Loading market data for Redis cache adapter not supported")
1204    }
1205
1206    fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
1207        anyhow::bail!("Loading market data for Redis cache adapter not supported")
1208    }
1209
1210    fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
1211        todo!()
1212    }
1213
1214    fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
1215        todo!()
1216    }
1217
1218    fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
1219        todo!()
1220    }
1221
1222    fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
1223        todo!()
1224    }
1225
1226    fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
1227        todo!()
1228    }
1229
1230    fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
1231        todo!()
1232    }
1233
1234    fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
1235        todo!()
1236    }
1237
1238    fn add_position(&self, position: &Position) -> anyhow::Result<()> {
1239        todo!()
1240    }
1241
1242    fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
1243        todo!()
1244    }
1245
1246    fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
1247        anyhow::bail!("Saving market data for Redis cache adapter not supported")
1248    }
1249
1250    fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
1251        anyhow::bail!("Saving signals for Redis cache adapter not supported")
1252    }
1253
1254    fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
1255        let json_bytes = serde_json::to_vec(data)
1256            .map_err(|e| anyhow::anyhow!("CustomData serialization failed: {e}"))?;
1257        let ts_init = data.ts_init().as_u64();
1258        let key = format!(
1259            "{CUSTOM}{REDIS_DELIMITER}{:020}{REDIS_DELIMITER}{}",
1260            ts_init,
1261            UUID4::new()
1262        );
1263        self.database
1264            .insert(key, Some(vec![Bytes::from(json_bytes)]))
1265    }
1266
1267    fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
1268        anyhow::bail!("Saving market data for Redis cache adapter not supported")
1269    }
1270
1271    fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
1272        anyhow::bail!("Saving market data for Redis cache adapter not supported")
1273    }
1274
1275    fn add_funding_rate(&self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
1276        anyhow::bail!("Saving market data for Redis cache adapter not supported")
1277    }
1278
1279    fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
1280        anyhow::bail!("Saving market data for Redis cache adapter not supported")
1281    }
1282
1283    fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
1284        todo!()
1285    }
1286
1287    fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
1288        todo!()
1289    }
1290
1291    fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
1292        let order_id_bytes = Bytes::from(client_order_id.to_string());
1293
1294        log::debug!("Deleting order: {client_order_id} from Redis");
1295        log::debug!("Trader key: {}", self.database.trader_key);
1296
1297        // Delete the order itself
1298        let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
1299        log::debug!("Deleting order key: {key}");
1300        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
1301        self.database
1302            .tx
1303            .send(op)
1304            .map_err(|e| anyhow::anyhow!("Failed to send delete order command: {e}"))?;
1305
1306        // Delete from all order indexes
1307        let index_keys = [
1308            INDEX_ORDER_IDS,
1309            INDEX_ORDERS,
1310            INDEX_ORDERS_OPEN,
1311            INDEX_ORDERS_CLOSED,
1312            INDEX_ORDERS_EMULATED,
1313            INDEX_ORDERS_INFLIGHT,
1314        ];
1315
1316        for index_key in &index_keys {
1317            let key = (*index_key).to_string();
1318            log::debug!("Deleting from index: {key} (order_id: {client_order_id})");
1319            let payload = vec![order_id_bytes.clone()];
1320            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1321            self.database
1322                .tx
1323                .send(op)
1324                .map_err(|e| anyhow::anyhow!("Failed to send delete order index command: {e}"))?;
1325        }
1326
1327        // Delete from hash indexes
1328        let hash_indexes = [INDEX_ORDER_POSITION, INDEX_ORDER_CLIENT];
1329        for index_key in &hash_indexes {
1330            let key = (*index_key).to_string();
1331            log::debug!("Deleting from hash index: {key} (order_id: {client_order_id})");
1332            let payload = vec![order_id_bytes.clone()];
1333            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1334            self.database.tx.send(op).map_err(|e| {
1335                anyhow::anyhow!("Failed to send delete order hash index command: {e}")
1336            })?;
1337        }
1338
1339        log::debug!("Sent all delete commands for order: {client_order_id}");
1340        Ok(())
1341    }
1342
1343    fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
1344        let position_id_bytes = Bytes::from(position_id.to_string());
1345
1346        // Delete the position itself
1347        let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
1348        let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
1349        self.database
1350            .tx
1351            .send(op)
1352            .map_err(|e| anyhow::anyhow!("Failed to send delete position command: {e}"))?;
1353
1354        // Delete from all position indexes
1355        let index_keys = [
1356            INDEX_POSITIONS,
1357            INDEX_POSITIONS_OPEN,
1358            INDEX_POSITIONS_CLOSED,
1359        ];
1360
1361        for index_key in &index_keys {
1362            let key = (*index_key).to_string();
1363            let payload = vec![position_id_bytes.clone()];
1364            let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1365            self.database.tx.send(op).map_err(|e| {
1366                anyhow::anyhow!("Failed to send delete position index command: {e}")
1367            })?;
1368        }
1369
1370        Ok(())
1371    }
1372
1373    fn delete_account_event(&self, account_id: &AccountId, event_id: &str) -> anyhow::Result<()> {
1374        todo!()
1375    }
1376
1377    fn index_venue_order_id(
1378        &self,
1379        client_order_id: ClientOrderId,
1380        venue_order_id: VenueOrderId,
1381    ) -> anyhow::Result<()> {
1382        todo!()
1383    }
1384
1385    fn index_order_position(
1386        &self,
1387        client_order_id: ClientOrderId,
1388        position_id: PositionId,
1389    ) -> anyhow::Result<()> {
1390        todo!()
1391    }
1392
1393    fn update_actor(&self) -> anyhow::Result<()> {
1394        todo!()
1395    }
1396
1397    fn update_strategy(&self) -> anyhow::Result<()> {
1398        todo!()
1399    }
1400
1401    fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
1402        todo!()
1403    }
1404
1405    fn update_order(&self, order_event: &OrderEventAny) -> anyhow::Result<()> {
1406        todo!()
1407    }
1408
1409    fn update_position(&self, position: &Position) -> anyhow::Result<()> {
1410        todo!()
1411    }
1412
1413    fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
1414        todo!()
1415    }
1416
1417    fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
1418        todo!()
1419    }
1420
1421    fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
1422        todo!()
1423    }
1424}
1425
1426#[cfg(test)]
1427mod tests {
1428    use rstest::rstest;
1429
1430    use super::*;
1431
1432    #[rstest]
1433    fn test_get_trader_key_with_prefix_and_instance_id() {
1434        let trader_id = TraderId::from("tester-123");
1435        let instance_id = UUID4::new();
1436        let config = CacheConfig {
1437            use_instance_id: true,
1438            ..Default::default()
1439        };
1440
1441        let key = get_trader_key(trader_id, instance_id, &config);
1442        assert!(key.starts_with("trader-tester-123:"));
1443        assert!(key.ends_with(&instance_id.to_string()));
1444    }
1445
1446    #[rstest]
1447    fn test_get_collection_key_valid() {
1448        let key = "collection:123";
1449        assert_eq!(get_collection_key(key).unwrap(), "collection");
1450    }
1451
1452    #[rstest]
1453    fn test_get_collection_key_invalid() {
1454        let key = "no_delimiter";
1455        assert!(get_collection_key(key).is_err());
1456    }
1457}