1use std::{
37 collections::VecDeque,
38 fmt::Debug,
39 ops::ControlFlow,
40 pin::Pin,
41 sync::mpsc::{self, SyncSender},
42 time::Duration,
43};
44
45use ahash::AHashMap;
46use bytes::Bytes;
47use nautilus_common::{
48 cache::{
49 CacheConfig,
50 database::{CacheDatabaseAdapter, CacheMap},
51 },
52 enums::SerializationEncoding,
53 live::get_runtime,
54 logging::{log_task_awaiting, log_task_started, log_task_stopped},
55 signal::Signal,
56};
57use nautilus_core::{UUID4, UnixNanos, correctness::check_slice_not_empty};
58use nautilus_cryptography::providers::install_cryptographic_provider;
59use nautilus_model::{
60 accounts::AccountAny,
61 data::{Bar, CustomData, DataType, FundingRateUpdate, HasTsInit, QuoteTick, TradeTick},
62 events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
63 identifiers::{
64 AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
65 TraderId, VenueOrderId,
66 },
67 instruments::{InstrumentAny, SyntheticInstrument},
68 orderbook::OrderBook,
69 orders::OrderAny,
70 position::Position,
71 types::Currency,
72};
73use redis::{Pipeline, aio::ConnectionManager};
74use ustr::Ustr;
75
76use super::{REDIS_DELIMITER, REDIS_FLUSHDB, get_index_key};
77use crate::redis::{create_redis_connection, queries::DatabaseQueries};
78
79const CACHE_READ: &str = "cache-read";
81const CACHE_WRITE: &str = "cache-write";
82const CACHE_PROCESS: &str = "cache-process";
83
84const FAILED_TX_CHANNEL: &str = "Failed to send to channel";
86
87const INDEX: &str = "index";
89const GENERAL: &str = "general";
90const CURRENCIES: &str = "currencies";
91const INSTRUMENTS: &str = "instruments";
92const SYNTHETICS: &str = "synthetics";
93const ACCOUNTS: &str = "accounts";
94const ORDERS: &str = "orders";
95const POSITIONS: &str = "positions";
96const ACTORS: &str = "actors";
97const STRATEGIES: &str = "strategies";
98const SNAPSHOTS: &str = "snapshots";
99const HEALTH: &str = "health";
100const CUSTOM: &str = "custom";
101
102const INDEX_ORDER_IDS: &str = "index:order_ids";
104const INDEX_ORDER_POSITION: &str = "index:order_position";
105const INDEX_ORDER_CLIENT: &str = "index:order_client";
106const INDEX_ORDERS: &str = "index:orders";
107const INDEX_ORDERS_OPEN: &str = "index:orders_open";
108const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
109const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
110const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
111const INDEX_POSITIONS: &str = "index:positions";
112const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
113const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
114
115#[derive(Clone, Debug)]
117pub enum DatabaseOperation {
118 Insert,
119 Update,
120 Delete,
121 Flush(SyncSender<()>),
122 Close,
123}
124
125#[derive(Clone, Debug)]
127pub struct DatabaseCommand {
128 pub op_type: DatabaseOperation,
130 pub key: Option<String>,
132 pub payload: Option<Vec<Bytes>>,
134}
135
136impl DatabaseCommand {
137 #[must_use]
139 pub const fn new(op_type: DatabaseOperation, key: String, payload: Option<Vec<Bytes>>) -> Self {
140 Self {
141 op_type,
142 key: Some(key),
143 payload,
144 }
145 }
146
147 #[must_use]
149 pub const fn close() -> Self {
150 Self {
151 op_type: DatabaseOperation::Close,
152 key: None,
153 payload: None,
154 }
155 }
156}
157
158#[cfg_attr(
159 feature = "python",
160 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
161)]
162pub struct RedisCacheDatabase {
163 pub con: ConnectionManager,
164 pub trader_id: TraderId,
165 pub trader_key: String,
166 pub encoding: SerializationEncoding,
167 pub bulk_read_batch_size: Option<usize>,
168 tx: tokio::sync::mpsc::UnboundedSender<DatabaseCommand>,
169 handle: Option<tokio::task::JoinHandle<()>>,
170}
171
172impl Debug for RedisCacheDatabase {
173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174 f.debug_struct(stringify!(RedisCacheDatabase))
175 .field("trader_id", &self.trader_id)
176 .field("encoding", &self.encoding)
177 .finish()
178 }
179}
180
181impl RedisCacheDatabase {
182 pub async fn new(
191 trader_id: TraderId,
192 instance_id: UUID4,
193 config: CacheConfig,
194 ) -> anyhow::Result<Self> {
195 install_cryptographic_provider();
196
197 let db_config = config
198 .database
199 .as_ref()
200 .ok_or_else(|| anyhow::anyhow!("No database config"))?;
201 let con = create_redis_connection(CACHE_READ, db_config.clone()).await?;
202
203 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseCommand>();
204 let trader_key = get_trader_key(trader_id, instance_id, &config);
205 let trader_key_clone = trader_key.clone();
206 let encoding = config.encoding;
207 let bulk_read_batch_size = config.bulk_read_batch_size;
208
209 let handle = get_runtime().spawn(async move {
210 if let Err(e) = process_commands(rx, trader_key_clone, config.clone()).await {
211 log::error!("Error in task '{CACHE_PROCESS}': {e}");
212 }
213 });
214
215 Ok(Self {
216 con,
217 trader_id,
218 trader_key,
219 encoding,
220 bulk_read_batch_size,
221 tx,
222 handle: Some(handle),
223 })
224 }
225
226 #[must_use]
227 pub const fn get_encoding(&self) -> SerializationEncoding {
228 self.encoding
229 }
230
231 #[must_use]
232 pub fn get_trader_key(&self) -> &str {
233 &self.trader_key
234 }
235
236 pub fn close(&mut self) {
237 log::debug!("Closing");
238
239 let Some(handle) = self.handle.take() else {
240 log::debug!("Already closed");
241 return;
242 };
243
244 if let Err(e) = self.tx.send(DatabaseCommand::close()) {
245 log::debug!("Error sending close command: {e:?}");
246 }
247
248 log_task_awaiting(CACHE_PROCESS);
249
250 let (tx, rx) = mpsc::sync_channel(1);
251
252 get_runtime().spawn(async move {
253 if let Err(e) = handle.await {
254 log::error!("Error awaiting task '{CACHE_PROCESS}': {e:?}");
255 }
256 let _ = tx.send(());
257 });
258 let _ = blocking_recv(&rx);
259
260 log::debug!("Closed");
261 }
262
263 pub async fn flushdb(&mut self) {
264 if let Err(e) = redis::cmd(REDIS_FLUSHDB)
265 .query_async::<()>(&mut self.con)
266 .await
267 {
268 log::error!("Failed to flush database: {e:?}");
269 }
270 }
271
272 pub fn flushdb_sync(&self) -> anyhow::Result<()> {
279 let (reply_tx, reply_rx) = mpsc::sync_channel(1);
280 let cmd = DatabaseCommand {
281 op_type: DatabaseOperation::Flush(reply_tx),
282 key: None,
283 payload: None,
284 };
285 self.tx
286 .send(cmd)
287 .map_err(|e| anyhow::anyhow!("{FAILED_TX_CHANNEL}: {e}"))?;
288 blocking_recv(&reply_rx).map_err(|e| anyhow::anyhow!("Failed to flush database: {e}"))?;
289 Ok(())
290 }
291
292 pub async fn keys(&mut self, pattern: &str) -> anyhow::Result<Vec<String>> {
298 let pattern = format!("{}{REDIS_DELIMITER}{pattern}", self.trader_key);
299 DatabaseQueries::scan_keys(&mut self.con, pattern).await
300 }
301
302 pub async fn read(&mut self, key: &str) -> anyhow::Result<Vec<Bytes>> {
308 DatabaseQueries::read(&self.con, &self.trader_key, key).await
309 }
310
311 pub async fn read_bulk(&mut self, keys: &[String]) -> anyhow::Result<Vec<Option<Bytes>>> {
317 match self.bulk_read_batch_size {
318 Some(batch_size) => {
319 DatabaseQueries::read_bulk_batched(&self.con, keys, batch_size).await
320 }
321 None => DatabaseQueries::read_bulk(&self.con, keys).await,
322 }
323 }
324
325 pub fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
335 let con = self.con.clone();
336 let trader_key = self.trader_key.clone();
337 let data_type = data_type.clone();
338 let (tx, rx) = mpsc::channel();
339
340 get_runtime().spawn(async move {
341 let result = DatabaseQueries::load_custom_data(&con, &trader_key, &data_type).await;
342 if let Err(e) = tx.send(result) {
343 log::error!("Failed to send custom data result for '{data_type}': {e:?}");
344 }
345 });
346
347 blocking_recv(&rx).map_err(|e| anyhow::anyhow!("load_custom_data channel closed: {e}"))?
348 }
349
350 pub fn insert(&self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
356 let op = DatabaseCommand::new(DatabaseOperation::Insert, key, payload);
357 match self.tx.send(op) {
358 Ok(()) => Ok(()),
359 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
360 }
361 }
362
363 pub fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
369 let json_bytes = serde_json::to_vec(data)
370 .map_err(|e| anyhow::anyhow!("CustomData serialization failed: {e}"))?;
371 let ts_init = data.ts_init().as_u64();
372 let key = format!(
373 "{CUSTOM}{REDIS_DELIMITER}{:020}{REDIS_DELIMITER}{}",
374 ts_init,
375 UUID4::new()
376 );
377 self.insert(key, Some(vec![Bytes::from(json_bytes)]))
378 }
379
380 pub fn update(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
386 let op = DatabaseCommand::new(DatabaseOperation::Update, key, payload);
387 match self.tx.send(op) {
388 Ok(()) => Ok(()),
389 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
390 }
391 }
392
393 pub fn delete(&mut self, key: String, payload: Option<Vec<Bytes>>) -> anyhow::Result<()> {
399 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, payload);
400 match self.tx.send(op) {
401 Ok(()) => Ok(()),
402 Err(e) => anyhow::bail!("{FAILED_TX_CHANNEL}: {e}"),
403 }
404 }
405
406 pub fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
412 let order_id_bytes = Bytes::from(client_order_id.to_string());
413
414 let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
416 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
417 self.tx
418 .send(op)
419 .map_err(|e| anyhow::anyhow!("Failed to send delete order command: {e}"))?;
420
421 let index_keys = [
423 INDEX_ORDER_IDS,
424 INDEX_ORDERS,
425 INDEX_ORDERS_OPEN,
426 INDEX_ORDERS_CLOSED,
427 INDEX_ORDERS_EMULATED,
428 INDEX_ORDERS_INFLIGHT,
429 ];
430
431 for index_key in &index_keys {
432 let key = (*index_key).to_string();
433 let payload = vec![order_id_bytes.clone()];
434 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
435 self.tx
436 .send(op)
437 .map_err(|e| anyhow::anyhow!("Failed to send delete order index command: {e}"))?;
438 }
439
440 let hash_indexes = [INDEX_ORDER_POSITION, INDEX_ORDER_CLIENT];
442 for index_key in &hash_indexes {
443 let key = (*index_key).to_string();
444 let payload = vec![order_id_bytes.clone()];
445 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
446 self.tx.send(op).map_err(|e| {
447 anyhow::anyhow!("Failed to send delete order hash index command: {e}")
448 })?;
449 }
450
451 Ok(())
452 }
453
454 pub fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
460 let position_id_bytes = Bytes::from(position_id.to_string());
461
462 let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
464 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
465 self.tx
466 .send(op)
467 .map_err(|e| anyhow::anyhow!("Failed to send delete position command: {e}"))?;
468
469 let index_keys = [
471 INDEX_POSITIONS,
472 INDEX_POSITIONS_OPEN,
473 INDEX_POSITIONS_CLOSED,
474 ];
475
476 for index_key in &index_keys {
477 let key = (*index_key).to_string();
478 let payload = vec![position_id_bytes.clone()];
479 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
480 self.tx.send(op).map_err(|e| {
481 anyhow::anyhow!("Failed to send delete position index command: {e}")
482 })?;
483 }
484
485 Ok(())
486 }
487
488 pub fn delete_account_event(
494 &self,
495 _account_id: &AccountId,
496 _event_id: &str,
497 ) -> anyhow::Result<()> {
498 log::warn!("Deleting account events currently a no-op (pending redesign)");
499 Ok(())
500 }
501}
502
503fn blocking_recv<T>(rx: &mpsc::Receiver<T>) -> Result<T, mpsc::RecvError> {
504 let on_nautilus_runtime = tokio::runtime::Handle::try_current()
505 .ok()
506 .is_some_and(|h| h.id() == get_runtime().handle().id());
507
508 if on_nautilus_runtime {
509 tokio::task::block_in_place(|| rx.recv())
510 } else {
511 rx.recv()
512 }
513}
514
515async fn process_commands(
516 mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseCommand>,
517 trader_key: String,
518 config: CacheConfig,
519) -> anyhow::Result<()> {
520 log_task_started(CACHE_PROCESS);
521
522 let db_config = config
523 .database
524 .as_ref()
525 .ok_or_else(|| anyhow::anyhow!("No database config"))?;
526 let mut con = create_redis_connection(CACHE_WRITE, db_config.clone()).await?;
527
528 let mut buffer: VecDeque<DatabaseCommand> = VecDeque::new();
530 let buffer_interval = Duration::from_millis(config.buffer_interval_ms.unwrap_or(0) as u64);
531
532 let flush_timer = tokio::time::sleep(buffer_interval);
536 tokio::pin!(flush_timer);
537
538 loop {
540 tokio::select! {
541 maybe_cmd = rx.recv() => {
542 let result = handle_command(
543 maybe_cmd,
544 &mut buffer,
545 buffer_interval,
546 &mut con,
547 &trader_key,
548 ).await;
549
550 if result.is_break() {
551 break;
552 }
553 }
554 () = &mut flush_timer, if !buffer_interval.is_zero() => {
555 flush_buffer(&mut buffer, &mut con, &trader_key, &mut flush_timer, buffer_interval).await;
556 }
557 }
558 }
559
560 if !buffer.is_empty() {
562 drain_buffer(&mut con, &trader_key, &mut buffer).await;
563 }
564
565 log_task_stopped(CACHE_PROCESS);
566 Ok(())
567}
568
569async fn handle_command(
570 maybe_cmd: Option<DatabaseCommand>,
571 buffer: &mut VecDeque<DatabaseCommand>,
572 buffer_interval: Duration,
573 con: &mut ConnectionManager,
574 trader_key: &str,
575) -> ControlFlow<()> {
576 let Some(cmd) = maybe_cmd else {
577 log::debug!("Command channel closed");
578 return ControlFlow::Break(());
579 };
580
581 log::trace!("Received {cmd:?}");
582
583 match cmd.op_type {
584 DatabaseOperation::Close => {
585 if !buffer.is_empty() {
586 drain_buffer(con, trader_key, buffer).await;
587 }
588 return ControlFlow::Break(());
589 }
590 DatabaseOperation::Flush(reply_tx) => {
591 if !buffer.is_empty() {
592 drain_buffer(con, trader_key, buffer).await;
593 }
594
595 if let Err(e) = redis::cmd(REDIS_FLUSHDB).query_async::<()>(con).await {
596 log::error!("Failed to flush database: {e:?}");
597 }
598 let _ = reply_tx.send(());
599 return ControlFlow::Continue(());
600 }
601 _ => {}
602 }
603
604 buffer.push_back(cmd);
605
606 if buffer_interval.is_zero() {
607 drain_buffer(con, trader_key, buffer).await;
608 }
609
610 ControlFlow::Continue(())
611}
612
613async fn flush_buffer(
614 buffer: &mut VecDeque<DatabaseCommand>,
615 con: &mut ConnectionManager,
616 trader_key: &str,
617 flush_timer: &mut Pin<&mut tokio::time::Sleep>,
618 buffer_interval: Duration,
619) {
620 if !buffer.is_empty() {
621 drain_buffer(con, trader_key, buffer).await;
622 }
623 flush_timer
624 .as_mut()
625 .reset(tokio::time::Instant::now() + buffer_interval);
626}
627
628async fn drain_buffer(
629 conn: &mut ConnectionManager,
630 trader_key: &str,
631 buffer: &mut VecDeque<DatabaseCommand>,
632) {
633 let mut pipe = redis::pipe();
634 pipe.atomic();
635
636 for msg in buffer.drain(..) {
637 let key = if let Some(key) = msg.key {
638 key
639 } else {
640 log::error!("Null key found for message: {msg:?}");
641 continue;
642 };
643 let collection = match get_collection_key(&key) {
644 Ok(collection) => collection,
645 Err(e) => {
646 log::error!("{e}");
647 continue; }
649 };
650
651 let key = format!("{trader_key}{REDIS_DELIMITER}{}", &key);
652
653 match msg.op_type {
654 DatabaseOperation::Insert => {
655 if let Some(payload) = msg.payload {
656 log::debug!("Processing INSERT for collection: {collection}, key: {key}");
657 if let Err(e) = insert(&mut pipe, collection, &key, &payload) {
658 log::error!("{e}");
659 }
660 } else {
661 log::error!("Null `payload` for `insert`");
662 }
663 }
664 DatabaseOperation::Update => {
665 if let Some(payload) = msg.payload {
666 log::debug!("Processing UPDATE for collection: {collection}, key: {key}");
667 if let Err(e) = update(&mut pipe, collection, &key, &payload) {
668 log::error!("{e}");
669 }
670 } else {
671 log::error!("Null `payload` for `update`");
672 }
673 }
674 DatabaseOperation::Delete => {
675 log::debug!(
676 "Processing DELETE for collection: {}, key: {}, payload: {:?}",
677 collection,
678 key,
679 msg.payload.as_ref().map(std::vec::Vec::len)
680 );
681 if let Err(e) = delete(&mut pipe, collection, &key, msg.payload) {
683 log::error!("{e}");
684 }
685 }
686 DatabaseOperation::Close => panic!("Close command should not be drained"),
687 DatabaseOperation::Flush(_) => panic!("Flush command should not be drained"),
688 }
689 }
690
691 if let Err(e) = pipe.query_async::<()>(conn).await {
692 log::error!("{e}");
693 }
694}
695
696fn insert(pipe: &mut Pipeline, collection: &str, key: &str, value: &[Bytes]) -> anyhow::Result<()> {
697 check_slice_not_empty(value, stringify!(value))?;
698
699 match collection {
700 INDEX => insert_index(pipe, key, value),
701 GENERAL => {
702 insert_string(pipe, key, value[0].as_ref());
703 Ok(())
704 }
705 CURRENCIES => {
706 insert_string(pipe, key, value[0].as_ref());
707 Ok(())
708 }
709 INSTRUMENTS => {
710 insert_string(pipe, key, value[0].as_ref());
711 Ok(())
712 }
713 SYNTHETICS => {
714 insert_string(pipe, key, value[0].as_ref());
715 Ok(())
716 }
717 ACCOUNTS => {
718 insert_list(pipe, key, value[0].as_ref());
719 Ok(())
720 }
721 ORDERS => {
722 insert_list(pipe, key, value[0].as_ref());
723 Ok(())
724 }
725 POSITIONS => {
726 insert_list(pipe, key, value[0].as_ref());
727 Ok(())
728 }
729 ACTORS => {
730 insert_string(pipe, key, value[0].as_ref());
731 Ok(())
732 }
733 STRATEGIES => {
734 insert_string(pipe, key, value[0].as_ref());
735 Ok(())
736 }
737 SNAPSHOTS => {
738 insert_list(pipe, key, value[0].as_ref());
739 Ok(())
740 }
741 HEALTH => {
742 insert_string(pipe, key, value[0].as_ref());
743 Ok(())
744 }
745 CUSTOM => {
746 insert_string(pipe, key, value[0].as_ref());
747 Ok(())
748 }
749 _ => anyhow::bail!("Unsupported operation: `insert` for collection '{collection}'"),
750 }
751}
752
753fn insert_index(pipe: &mut Pipeline, key: &str, value: &[Bytes]) -> anyhow::Result<()> {
754 let index_key = get_index_key(key)?;
755 match index_key {
756 INDEX_ORDER_IDS => {
757 insert_set(pipe, key, value[0].as_ref());
758 Ok(())
759 }
760 INDEX_ORDER_POSITION => {
761 insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
762 Ok(())
763 }
764 INDEX_ORDER_CLIENT => {
765 insert_hset(pipe, key, value[0].as_ref(), value[1].as_ref());
766 Ok(())
767 }
768 INDEX_ORDERS => {
769 insert_set(pipe, key, value[0].as_ref());
770 Ok(())
771 }
772 INDEX_ORDERS_OPEN => {
773 insert_set(pipe, key, value[0].as_ref());
774 Ok(())
775 }
776 INDEX_ORDERS_CLOSED => {
777 insert_set(pipe, key, value[0].as_ref());
778 Ok(())
779 }
780 INDEX_ORDERS_EMULATED => {
781 insert_set(pipe, key, value[0].as_ref());
782 Ok(())
783 }
784 INDEX_ORDERS_INFLIGHT => {
785 insert_set(pipe, key, value[0].as_ref());
786 Ok(())
787 }
788 INDEX_POSITIONS => {
789 insert_set(pipe, key, value[0].as_ref());
790 Ok(())
791 }
792 INDEX_POSITIONS_OPEN => {
793 insert_set(pipe, key, value[0].as_ref());
794 Ok(())
795 }
796 INDEX_POSITIONS_CLOSED => {
797 insert_set(pipe, key, value[0].as_ref());
798 Ok(())
799 }
800 _ => anyhow::bail!("Index unknown '{index_key}' on insert"),
801 }
802}
803
804fn insert_string(pipe: &mut Pipeline, key: &str, value: &[u8]) {
805 pipe.set(key, value);
806}
807
808fn insert_set(pipe: &mut Pipeline, key: &str, value: &[u8]) {
809 pipe.sadd(key, value);
810}
811
812fn insert_hset(pipe: &mut Pipeline, key: &str, name: &[u8], value: &[u8]) {
813 pipe.hset(key, name, value);
814}
815
816fn insert_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
817 pipe.rpush(key, value);
818}
819
820fn update(pipe: &mut Pipeline, collection: &str, key: &str, value: &[Bytes]) -> anyhow::Result<()> {
821 check_slice_not_empty(value, stringify!(value))?;
822
823 match collection {
824 ACCOUNTS => {
825 update_list(pipe, key, value[0].as_ref());
826 Ok(())
827 }
828 ORDERS => {
829 update_list(pipe, key, value[0].as_ref());
830 Ok(())
831 }
832 POSITIONS => {
833 update_list(pipe, key, value[0].as_ref());
834 Ok(())
835 }
836 _ => anyhow::bail!("Unsupported operation: `update` for collection '{collection}'"),
837 }
838}
839
840fn update_list(pipe: &mut Pipeline, key: &str, value: &[u8]) {
841 pipe.rpush_exists(key, value);
842}
843
844fn delete(
845 pipe: &mut Pipeline,
846 collection: &str,
847 key: &str,
848 value: Option<Vec<Bytes>>,
849) -> anyhow::Result<()> {
850 log::debug!(
851 "delete: collection={}, key={}, has_payload={}",
852 collection,
853 key,
854 value.is_some()
855 );
856
857 match collection {
858 INDEX => delete_from_index(pipe, key, value),
859 ORDERS => {
860 delete_string(pipe, key);
861 Ok(())
862 }
863 POSITIONS => {
864 delete_string(pipe, key);
865 Ok(())
866 }
867 ACCOUNTS => {
868 delete_string(pipe, key);
869 Ok(())
870 }
871 ACTORS => {
872 delete_string(pipe, key);
873 Ok(())
874 }
875 STRATEGIES => {
876 delete_string(pipe, key);
877 Ok(())
878 }
879 _ => anyhow::bail!("Unsupported operation: `delete` for collection '{collection}'"),
880 }
881}
882
883fn delete_from_index(
884 pipe: &mut Pipeline,
885 key: &str,
886 value: Option<Vec<Bytes>>,
887) -> anyhow::Result<()> {
888 let value = value.ok_or_else(|| anyhow::anyhow!("Empty `payload` for `delete` '{key}'"))?;
889 let index_key = get_index_key(key)?;
890
891 match index_key {
892 INDEX_ORDER_IDS => {
893 remove_from_set(pipe, key, value[0].as_ref());
894 Ok(())
895 }
896 INDEX_ORDER_POSITION => {
897 remove_from_hash(pipe, key, value[0].as_ref());
898 Ok(())
899 }
900 INDEX_ORDER_CLIENT => {
901 remove_from_hash(pipe, key, value[0].as_ref());
902 Ok(())
903 }
904 INDEX_ORDERS => {
905 remove_from_set(pipe, key, value[0].as_ref());
906 Ok(())
907 }
908 INDEX_ORDERS_OPEN => {
909 remove_from_set(pipe, key, value[0].as_ref());
910 Ok(())
911 }
912 INDEX_ORDERS_CLOSED => {
913 remove_from_set(pipe, key, value[0].as_ref());
914 Ok(())
915 }
916 INDEX_ORDERS_EMULATED => {
917 remove_from_set(pipe, key, value[0].as_ref());
918 Ok(())
919 }
920 INDEX_ORDERS_INFLIGHT => {
921 remove_from_set(pipe, key, value[0].as_ref());
922 Ok(())
923 }
924 INDEX_POSITIONS => {
925 remove_from_set(pipe, key, value[0].as_ref());
926 Ok(())
927 }
928 INDEX_POSITIONS_OPEN => {
929 remove_from_set(pipe, key, value[0].as_ref());
930 Ok(())
931 }
932 INDEX_POSITIONS_CLOSED => {
933 remove_from_set(pipe, key, value[0].as_ref());
934 Ok(())
935 }
936 _ => anyhow::bail!("Unsupported index operation: remove from '{index_key}'"),
937 }
938}
939
940fn remove_from_set(pipe: &mut Pipeline, key: &str, member: &[u8]) {
941 pipe.srem(key, member);
942}
943
944fn remove_from_hash(pipe: &mut Pipeline, key: &str, field: &[u8]) {
945 pipe.hdel(key, field);
946}
947
948fn delete_string(pipe: &mut Pipeline, key: &str) {
949 pipe.del(key);
950}
951
952fn get_trader_key(trader_id: TraderId, instance_id: UUID4, config: &CacheConfig) -> String {
953 let mut key = String::new();
954
955 if config.use_trader_prefix {
956 key.push_str("trader-");
957 }
958
959 key.push_str(trader_id.as_str());
960
961 if config.use_instance_id {
962 key.push(REDIS_DELIMITER);
963 key.push_str(&format!("{instance_id}"));
964 }
965
966 key
967}
968
969fn get_collection_key(key: &str) -> anyhow::Result<&str> {
970 key.split_once(REDIS_DELIMITER)
971 .map(|(collection, _)| collection)
972 .ok_or_else(|| {
973 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
974 })
975}
976
977#[allow(dead_code)]
978#[derive(Debug)]
979pub struct RedisCacheDatabaseAdapter {
980 pub encoding: SerializationEncoding,
981 pub database: RedisCacheDatabase,
982}
983
984#[allow(dead_code)]
985#[allow(unused)]
986#[async_trait::async_trait]
987impl CacheDatabaseAdapter for RedisCacheDatabaseAdapter {
988 fn close(&mut self) -> anyhow::Result<()> {
989 self.database.close();
990 Ok(())
991 }
992
993 fn flush(&mut self) -> anyhow::Result<()> {
994 self.database.flushdb_sync()
995 }
996
997 async fn load_all(&self) -> anyhow::Result<CacheMap> {
998 log::debug!("Loading all data");
999
1000 let (
1001 currencies,
1002 instruments,
1003 synthetics,
1004 accounts,
1005 orders,
1006 positions,
1007 greeks,
1008 yield_curves,
1009 ) = tokio::try_join!(
1010 self.load_currencies(),
1011 self.load_instruments(),
1012 self.load_synthetics(),
1013 self.load_accounts(),
1014 self.load_orders(),
1015 self.load_positions(),
1016 self.load_greeks(),
1017 self.load_yield_curves()
1018 )
1019 .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
1020
1021 Ok(CacheMap {
1022 currencies,
1023 instruments,
1024 synthetics,
1025 accounts,
1026 orders,
1027 positions,
1028 greeks,
1029 yield_curves,
1030 })
1031 }
1032
1033 fn load(&self) -> anyhow::Result<AHashMap<String, Bytes>> {
1034 Ok(AHashMap::new()) }
1037
1038 async fn load_currencies(&self) -> anyhow::Result<AHashMap<Ustr, Currency>> {
1039 DatabaseQueries::load_currencies(
1040 &self.database.con,
1041 &self.database.trader_key,
1042 self.encoding,
1043 )
1044 .await
1045 }
1046
1047 async fn load_instruments(&self) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
1048 DatabaseQueries::load_instruments(
1049 &self.database.con,
1050 &self.database.trader_key,
1051 self.encoding,
1052 )
1053 .await
1054 }
1055
1056 async fn load_synthetics(&self) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
1057 DatabaseQueries::load_synthetics(
1058 &self.database.con,
1059 &self.database.trader_key,
1060 self.encoding,
1061 )
1062 .await
1063 }
1064
1065 async fn load_accounts(&self) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
1066 DatabaseQueries::load_accounts(&self.database.con, &self.database.trader_key, self.encoding)
1067 .await
1068 }
1069
1070 async fn load_orders(&self) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
1071 DatabaseQueries::load_orders(&self.database.con, &self.database.trader_key, self.encoding)
1072 .await
1073 }
1074
1075 async fn load_positions(&self) -> anyhow::Result<AHashMap<PositionId, Position>> {
1076 DatabaseQueries::load_positions(
1077 &self.database.con,
1078 &self.database.trader_key,
1079 self.encoding,
1080 )
1081 .await
1082 }
1083
1084 fn load_index_order_position(&self) -> anyhow::Result<AHashMap<ClientOrderId, Position>> {
1085 todo!()
1086 }
1087
1088 fn load_index_order_client(&self) -> anyhow::Result<AHashMap<ClientOrderId, ClientId>> {
1089 todo!()
1090 }
1091
1092 async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
1093 DatabaseQueries::load_currency(
1094 &self.database.con,
1095 &self.database.trader_key,
1096 code,
1097 self.encoding,
1098 )
1099 .await
1100 }
1101
1102 async fn load_instrument(
1103 &self,
1104 instrument_id: &InstrumentId,
1105 ) -> anyhow::Result<Option<InstrumentAny>> {
1106 DatabaseQueries::load_instrument(
1107 &self.database.con,
1108 &self.database.trader_key,
1109 instrument_id,
1110 self.encoding,
1111 )
1112 .await
1113 }
1114
1115 async fn load_synthetic(
1116 &self,
1117 instrument_id: &InstrumentId,
1118 ) -> anyhow::Result<Option<SyntheticInstrument>> {
1119 DatabaseQueries::load_synthetic(
1120 &self.database.con,
1121 &self.database.trader_key,
1122 instrument_id,
1123 self.encoding,
1124 )
1125 .await
1126 }
1127
1128 async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
1129 DatabaseQueries::load_account(
1130 &self.database.con,
1131 &self.database.trader_key,
1132 account_id,
1133 self.encoding,
1134 )
1135 .await
1136 }
1137
1138 async fn load_order(
1139 &self,
1140 client_order_id: &ClientOrderId,
1141 ) -> anyhow::Result<Option<OrderAny>> {
1142 DatabaseQueries::load_order(
1143 &self.database.con,
1144 &self.database.trader_key,
1145 client_order_id,
1146 self.encoding,
1147 )
1148 .await
1149 }
1150
1151 async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
1152 DatabaseQueries::load_position(
1153 &self.database.con,
1154 &self.database.trader_key,
1155 position_id,
1156 self.encoding,
1157 )
1158 .await
1159 }
1160
1161 fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<AHashMap<String, Bytes>> {
1162 todo!()
1163 }
1164
1165 fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<AHashMap<String, Bytes>> {
1166 todo!()
1167 }
1168
1169 fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
1170 anyhow::bail!("Loading signals from Redis cache adapter not supported")
1171 }
1172
1173 fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
1174 self.database.load_custom_data(data_type)
1175 }
1176
1177 fn load_order_snapshot(
1178 &self,
1179 client_order_id: &ClientOrderId,
1180 ) -> anyhow::Result<Option<OrderSnapshot>> {
1181 anyhow::bail!("Loading order snapshots from Redis cache adapter not supported")
1182 }
1183
1184 fn load_position_snapshot(
1185 &self,
1186 position_id: &PositionId,
1187 ) -> anyhow::Result<Option<PositionSnapshot>> {
1188 anyhow::bail!("Loading position snapshots from Redis cache adapter not supported")
1189 }
1190
1191 fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
1192 anyhow::bail!("Loading quote data for Redis cache adapter not supported")
1193 }
1194
1195 fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
1196 anyhow::bail!("Loading market data for Redis cache adapter not supported")
1197 }
1198
1199 fn load_funding_rates(
1200 &self,
1201 instrument_id: &InstrumentId,
1202 ) -> anyhow::Result<Vec<FundingRateUpdate>> {
1203 anyhow::bail!("Loading market data for Redis cache adapter not supported")
1204 }
1205
1206 fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
1207 anyhow::bail!("Loading market data for Redis cache adapter not supported")
1208 }
1209
1210 fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
1211 todo!()
1212 }
1213
1214 fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
1215 todo!()
1216 }
1217
1218 fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
1219 todo!()
1220 }
1221
1222 fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
1223 todo!()
1224 }
1225
1226 fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
1227 todo!()
1228 }
1229
1230 fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
1231 todo!()
1232 }
1233
1234 fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
1235 todo!()
1236 }
1237
1238 fn add_position(&self, position: &Position) -> anyhow::Result<()> {
1239 todo!()
1240 }
1241
1242 fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
1243 todo!()
1244 }
1245
1246 fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
1247 anyhow::bail!("Saving market data for Redis cache adapter not supported")
1248 }
1249
1250 fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
1251 anyhow::bail!("Saving signals for Redis cache adapter not supported")
1252 }
1253
1254 fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
1255 let json_bytes = serde_json::to_vec(data)
1256 .map_err(|e| anyhow::anyhow!("CustomData serialization failed: {e}"))?;
1257 let ts_init = data.ts_init().as_u64();
1258 let key = format!(
1259 "{CUSTOM}{REDIS_DELIMITER}{:020}{REDIS_DELIMITER}{}",
1260 ts_init,
1261 UUID4::new()
1262 );
1263 self.database
1264 .insert(key, Some(vec![Bytes::from(json_bytes)]))
1265 }
1266
1267 fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
1268 anyhow::bail!("Saving market data for Redis cache adapter not supported")
1269 }
1270
1271 fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
1272 anyhow::bail!("Saving market data for Redis cache adapter not supported")
1273 }
1274
1275 fn add_funding_rate(&self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
1276 anyhow::bail!("Saving market data for Redis cache adapter not supported")
1277 }
1278
1279 fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
1280 anyhow::bail!("Saving market data for Redis cache adapter not supported")
1281 }
1282
1283 fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
1284 todo!()
1285 }
1286
1287 fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
1288 todo!()
1289 }
1290
1291 fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
1292 let order_id_bytes = Bytes::from(client_order_id.to_string());
1293
1294 log::debug!("Deleting order: {client_order_id} from Redis");
1295 log::debug!("Trader key: {}", self.database.trader_key);
1296
1297 let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
1299 log::debug!("Deleting order key: {key}");
1300 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
1301 self.database
1302 .tx
1303 .send(op)
1304 .map_err(|e| anyhow::anyhow!("Failed to send delete order command: {e}"))?;
1305
1306 let index_keys = [
1308 INDEX_ORDER_IDS,
1309 INDEX_ORDERS,
1310 INDEX_ORDERS_OPEN,
1311 INDEX_ORDERS_CLOSED,
1312 INDEX_ORDERS_EMULATED,
1313 INDEX_ORDERS_INFLIGHT,
1314 ];
1315
1316 for index_key in &index_keys {
1317 let key = (*index_key).to_string();
1318 log::debug!("Deleting from index: {key} (order_id: {client_order_id})");
1319 let payload = vec![order_id_bytes.clone()];
1320 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1321 self.database
1322 .tx
1323 .send(op)
1324 .map_err(|e| anyhow::anyhow!("Failed to send delete order index command: {e}"))?;
1325 }
1326
1327 let hash_indexes = [INDEX_ORDER_POSITION, INDEX_ORDER_CLIENT];
1329 for index_key in &hash_indexes {
1330 let key = (*index_key).to_string();
1331 log::debug!("Deleting from hash index: {key} (order_id: {client_order_id})");
1332 let payload = vec![order_id_bytes.clone()];
1333 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1334 self.database.tx.send(op).map_err(|e| {
1335 anyhow::anyhow!("Failed to send delete order hash index command: {e}")
1336 })?;
1337 }
1338
1339 log::debug!("Sent all delete commands for order: {client_order_id}");
1340 Ok(())
1341 }
1342
1343 fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
1344 let position_id_bytes = Bytes::from(position_id.to_string());
1345
1346 let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
1348 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, None);
1349 self.database
1350 .tx
1351 .send(op)
1352 .map_err(|e| anyhow::anyhow!("Failed to send delete position command: {e}"))?;
1353
1354 let index_keys = [
1356 INDEX_POSITIONS,
1357 INDEX_POSITIONS_OPEN,
1358 INDEX_POSITIONS_CLOSED,
1359 ];
1360
1361 for index_key in &index_keys {
1362 let key = (*index_key).to_string();
1363 let payload = vec![position_id_bytes.clone()];
1364 let op = DatabaseCommand::new(DatabaseOperation::Delete, key, Some(payload));
1365 self.database.tx.send(op).map_err(|e| {
1366 anyhow::anyhow!("Failed to send delete position index command: {e}")
1367 })?;
1368 }
1369
1370 Ok(())
1371 }
1372
1373 fn delete_account_event(&self, account_id: &AccountId, event_id: &str) -> anyhow::Result<()> {
1374 todo!()
1375 }
1376
1377 fn index_venue_order_id(
1378 &self,
1379 client_order_id: ClientOrderId,
1380 venue_order_id: VenueOrderId,
1381 ) -> anyhow::Result<()> {
1382 todo!()
1383 }
1384
1385 fn index_order_position(
1386 &self,
1387 client_order_id: ClientOrderId,
1388 position_id: PositionId,
1389 ) -> anyhow::Result<()> {
1390 todo!()
1391 }
1392
1393 fn update_actor(&self) -> anyhow::Result<()> {
1394 todo!()
1395 }
1396
1397 fn update_strategy(&self) -> anyhow::Result<()> {
1398 todo!()
1399 }
1400
1401 fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
1402 todo!()
1403 }
1404
1405 fn update_order(&self, order_event: &OrderEventAny) -> anyhow::Result<()> {
1406 todo!()
1407 }
1408
1409 fn update_position(&self, position: &Position) -> anyhow::Result<()> {
1410 todo!()
1411 }
1412
1413 fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
1414 todo!()
1415 }
1416
1417 fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
1418 todo!()
1419 }
1420
1421 fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
1422 todo!()
1423 }
1424}
1425
1426#[cfg(test)]
1427mod tests {
1428 use rstest::rstest;
1429
1430 use super::*;
1431
1432 #[rstest]
1433 fn test_get_trader_key_with_prefix_and_instance_id() {
1434 let trader_id = TraderId::from("tester-123");
1435 let instance_id = UUID4::new();
1436 let config = CacheConfig {
1437 use_instance_id: true,
1438 ..Default::default()
1439 };
1440
1441 let key = get_trader_key(trader_id, instance_id, &config);
1442 assert!(key.starts_with("trader-tester-123:"));
1443 assert!(key.ends_with(&instance_id.to_string()));
1444 }
1445
1446 #[rstest]
1447 fn test_get_collection_key_valid() {
1448 let key = "collection:123";
1449 assert_eq!(get_collection_key(key).unwrap(), "collection");
1450 }
1451
1452 #[rstest]
1453 fn test_get_collection_key_invalid() {
1454 let key = "no_delimiter";
1455 assert!(get_collection_key(key).is_err());
1456 }
1457}