Skip to main content

nautilus_data/engine/
streaming.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use ahash::AHashMap;
17use nautilus_common::messages::data::{
18    SubscribeBars, SubscribeCommand, SubscribeCustomData, SubscribeQuotes, SubscribeTrades,
19};
20use nautilus_core::{
21    Params,
22    correctness::{FAILED, check_key_not_in_map},
23};
24use nautilus_persistence::backend::catalog::ParquetDataCatalog;
25use serde_json::Value;
26use ustr::Ustr;
27
28use super::DataEngine;
29
30pub(crate) type CatalogMap = AHashMap<Ustr, ParquetDataCatalog>;
31
32impl DataEngine {
33    /// Registers the `catalog` with the engine with an optional specific `name`.
34    ///
35    /// # Panics
36    ///
37    /// Panics if a catalog with the same `name` has already been registered.
38    pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<&str>) {
39        let name = Ustr::from(name.unwrap_or("catalog_0"));
40
41        check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
42
43        self.catalogs.insert(name, catalog);
44        log::info!("Registered catalog <{name}>");
45    }
46
47    pub(super) fn subscribe_command_with_prefilled_start_ns(
48        &self,
49        cmd: SubscribeCommand,
50    ) -> anyhow::Result<SubscribeCommand> {
51        match cmd {
52            SubscribeCommand::Quotes(cmd) if Self::is_start_ns_missing(cmd.params.as_ref()) => {
53                let identifier = cmd.instrument_id.to_string();
54                let params = self.params_with_prefilled_start_ns(
55                    cmd.params.as_ref(),
56                    "quotes",
57                    &identifier,
58                )?;
59                Ok(SubscribeCommand::Quotes(SubscribeQuotes { params, ..cmd }))
60            }
61            SubscribeCommand::Trades(cmd) if Self::is_start_ns_missing(cmd.params.as_ref()) => {
62                let identifier = cmd.instrument_id.to_string();
63                let params = self.params_with_prefilled_start_ns(
64                    cmd.params.as_ref(),
65                    "trades",
66                    &identifier,
67                )?;
68                Ok(SubscribeCommand::Trades(SubscribeTrades { params, ..cmd }))
69            }
70            SubscribeCommand::Bars(cmd)
71                if cmd.bar_type.is_externally_aggregated()
72                    && Self::is_start_ns_missing(cmd.params.as_ref()) =>
73            {
74                let identifier = cmd.bar_type.to_string();
75                let params =
76                    self.params_with_prefilled_start_ns(cmd.params.as_ref(), "bars", &identifier)?;
77                Ok(SubscribeCommand::Bars(SubscribeBars { params, ..cmd }))
78            }
79            SubscribeCommand::Data(cmd) if Self::is_start_ns_missing(cmd.params.as_ref()) => {
80                let type_name = cmd.data_type.type_name().to_string();
81                let identifier = cmd.data_type.identifier().map(String::from);
82                let params = self.params_with_custom_data_prefilled_start_ns(
83                    cmd.params.as_ref(),
84                    &type_name,
85                    identifier.as_deref(),
86                )?;
87                Ok(SubscribeCommand::Data(SubscribeCustomData {
88                    params,
89                    ..cmd
90                }))
91            }
92            _ => Ok(cmd),
93        }
94    }
95
96    fn is_start_ns_missing(params: Option<&Params>) -> bool {
97        params.is_none_or(|params| !params.contains_key("start_ns"))
98    }
99
100    fn params_with_prefilled_start_ns(
101        &self,
102        params: Option<&Params>,
103        data_cls: &str,
104        identifier: &str,
105    ) -> anyhow::Result<Option<Params>> {
106        let last_timestamp = self.catalog_last_timestamp(data_cls, identifier)?;
107
108        Ok(Some(Self::params_with_start_ns(params, last_timestamp)))
109    }
110
111    fn params_with_custom_data_prefilled_start_ns(
112        &self,
113        params: Option<&Params>,
114        type_name: &str,
115        identifier: Option<&str>,
116    ) -> anyhow::Result<Option<Params>> {
117        let last_timestamp = self.catalog_custom_data_last_timestamp(type_name, identifier)?;
118
119        Ok(Some(Self::params_with_start_ns(params, last_timestamp)))
120    }
121
122    fn params_with_start_ns(params: Option<&Params>, last_timestamp: Option<u64>) -> Params {
123        let start_ns = last_timestamp.map_or(Value::Null, |last_timestamp| {
124            Value::from(last_timestamp.saturating_add(1))
125        });
126        let mut params = params.cloned().unwrap_or_else(Params::new);
127
128        params.insert("start_ns".to_string(), start_ns);
129
130        params
131    }
132
133    fn catalog_last_timestamp(
134        &self,
135        data_cls: &str,
136        identifier: &str,
137    ) -> anyhow::Result<Option<u64>> {
138        for catalog in self.catalogs.values() {
139            if let Some(last_timestamp) =
140                catalog.query_last_timestamp(data_cls, Some(identifier))?
141            {
142                return Ok(Some(last_timestamp));
143            }
144        }
145
146        Ok(None)
147    }
148
149    fn catalog_custom_data_last_timestamp(
150        &self,
151        type_name: &str,
152        identifier: Option<&str>,
153    ) -> anyhow::Result<Option<u64>> {
154        for catalog in self.catalogs.values() {
155            let last_timestamp = if let Some(identifier) = identifier {
156                let directory = catalog.make_path_custom_data(type_name, Some(identifier))?;
157                let intervals = catalog.get_directory_intervals(&directory)?;
158                intervals.last().map(|(_, last_timestamp)| *last_timestamp)
159            } else {
160                let data_cls = format!("custom/{type_name}");
161                catalog.query_last_timestamp(&data_cls, None)?
162            };
163
164            if let Some(last_timestamp) = last_timestamp {
165                return Ok(Some(last_timestamp));
166            }
167        }
168
169        Ok(None)
170    }
171}