nautilus_data/engine/
streaming.rs1use ahash::AHashMap;
17use nautilus_common::messages::data::{
18 SubscribeBars, SubscribeCommand, SubscribeCustomData, SubscribeQuotes, SubscribeTrades,
19};
20use nautilus_core::{
21 Params,
22 correctness::{FAILED, check_key_not_in_map},
23};
24use nautilus_persistence::backend::catalog::ParquetDataCatalog;
25use serde_json::Value;
26use ustr::Ustr;
27
28use super::DataEngine;
29
30pub(crate) type CatalogMap = AHashMap<Ustr, ParquetDataCatalog>;
31
32impl DataEngine {
33 pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<&str>) {
39 let name = Ustr::from(name.unwrap_or("catalog_0"));
40
41 check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
42
43 self.catalogs.insert(name, catalog);
44 log::info!("Registered catalog <{name}>");
45 }
46
47 pub(super) fn subscribe_command_with_prefilled_start_ns(
48 &self,
49 cmd: SubscribeCommand,
50 ) -> anyhow::Result<SubscribeCommand> {
51 match cmd {
52 SubscribeCommand::Quotes(cmd) if Self::is_start_ns_missing(cmd.params.as_ref()) => {
53 let identifier = cmd.instrument_id.to_string();
54 let params = self.params_with_prefilled_start_ns(
55 cmd.params.as_ref(),
56 "quotes",
57 &identifier,
58 )?;
59 Ok(SubscribeCommand::Quotes(SubscribeQuotes { params, ..cmd }))
60 }
61 SubscribeCommand::Trades(cmd) if Self::is_start_ns_missing(cmd.params.as_ref()) => {
62 let identifier = cmd.instrument_id.to_string();
63 let params = self.params_with_prefilled_start_ns(
64 cmd.params.as_ref(),
65 "trades",
66 &identifier,
67 )?;
68 Ok(SubscribeCommand::Trades(SubscribeTrades { params, ..cmd }))
69 }
70 SubscribeCommand::Bars(cmd)
71 if cmd.bar_type.is_externally_aggregated()
72 && Self::is_start_ns_missing(cmd.params.as_ref()) =>
73 {
74 let identifier = cmd.bar_type.to_string();
75 let params =
76 self.params_with_prefilled_start_ns(cmd.params.as_ref(), "bars", &identifier)?;
77 Ok(SubscribeCommand::Bars(SubscribeBars { params, ..cmd }))
78 }
79 SubscribeCommand::Data(cmd) if Self::is_start_ns_missing(cmd.params.as_ref()) => {
80 let type_name = cmd.data_type.type_name().to_string();
81 let identifier = cmd.data_type.identifier().map(String::from);
82 let params = self.params_with_custom_data_prefilled_start_ns(
83 cmd.params.as_ref(),
84 &type_name,
85 identifier.as_deref(),
86 )?;
87 Ok(SubscribeCommand::Data(SubscribeCustomData {
88 params,
89 ..cmd
90 }))
91 }
92 _ => Ok(cmd),
93 }
94 }
95
96 fn is_start_ns_missing(params: Option<&Params>) -> bool {
97 params.is_none_or(|params| !params.contains_key("start_ns"))
98 }
99
100 fn params_with_prefilled_start_ns(
101 &self,
102 params: Option<&Params>,
103 data_cls: &str,
104 identifier: &str,
105 ) -> anyhow::Result<Option<Params>> {
106 let last_timestamp = self.catalog_last_timestamp(data_cls, identifier)?;
107
108 Ok(Some(Self::params_with_start_ns(params, last_timestamp)))
109 }
110
111 fn params_with_custom_data_prefilled_start_ns(
112 &self,
113 params: Option<&Params>,
114 type_name: &str,
115 identifier: Option<&str>,
116 ) -> anyhow::Result<Option<Params>> {
117 let last_timestamp = self.catalog_custom_data_last_timestamp(type_name, identifier)?;
118
119 Ok(Some(Self::params_with_start_ns(params, last_timestamp)))
120 }
121
122 fn params_with_start_ns(params: Option<&Params>, last_timestamp: Option<u64>) -> Params {
123 let start_ns = last_timestamp.map_or(Value::Null, |last_timestamp| {
124 Value::from(last_timestamp.saturating_add(1))
125 });
126 let mut params = params.cloned().unwrap_or_else(Params::new);
127
128 params.insert("start_ns".to_string(), start_ns);
129
130 params
131 }
132
133 fn catalog_last_timestamp(
134 &self,
135 data_cls: &str,
136 identifier: &str,
137 ) -> anyhow::Result<Option<u64>> {
138 for catalog in self.catalogs.values() {
139 if let Some(last_timestamp) =
140 catalog.query_last_timestamp(data_cls, Some(identifier))?
141 {
142 return Ok(Some(last_timestamp));
143 }
144 }
145
146 Ok(None)
147 }
148
149 fn catalog_custom_data_last_timestamp(
150 &self,
151 type_name: &str,
152 identifier: Option<&str>,
153 ) -> anyhow::Result<Option<u64>> {
154 for catalog in self.catalogs.values() {
155 let last_timestamp = if let Some(identifier) = identifier {
156 let directory = catalog.make_path_custom_data(type_name, Some(identifier))?;
157 let intervals = catalog.get_directory_intervals(&directory)?;
158 intervals.last().map(|(_, last_timestamp)| *last_timestamp)
159 } else {
160 let data_cls = format!("custom/{type_name}");
161 catalog.query_last_timestamp(&data_cls, None)?
162 };
163
164 if let Some(last_timestamp) = last_timestamp {
165 return Ok(Some(last_timestamp));
166 }
167 }
168
169 Ok(None)
170 }
171}