Skip to main content

nautilus_common/msgbus/
database.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::fmt::Debug;
17
18use bytes::Bytes;
19use nautilus_core::UUID4;
20use nautilus_model::identifiers::TraderId;
21use serde::{Deserialize, Serialize};
22use ustr::Ustr;
23
24use crate::enums::SerializationEncoding;
25
26/// Configuration for database connections.
27///
28/// # Notes
29///
30/// If `database_type` is `"redis"`, it requires Redis version 6.2 or higher for correct operation.
31#[cfg_attr(
32    feature = "python",
33    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
34)]
35#[cfg_attr(
36    feature = "python",
37    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
38)]
39#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
40#[serde(default)]
41pub struct DatabaseConfig {
42    /// The database type.
43    #[serde(alias = "type")]
44    pub database_type: String,
45    /// The database host address. If `None`, the typical default should be used.
46    pub host: Option<String>,
47    /// The database port. If `None`, the typical default should be used.
48    pub port: Option<u16>,
49    /// The account username for the database connection.
50    pub username: Option<String>,
51    /// The account password for the database connection.
52    pub password: Option<String>,
53    /// If the database should use an SSL-enabled connection.
54    pub ssl: bool,
55    /// The timeout (in seconds) to wait for a new connection.
56    pub connection_timeout: u16,
57    /// The timeout (in seconds) to wait for a response.
58    pub response_timeout: u16,
59    /// The number of retry attempts with exponential backoff for connection attempts.
60    pub number_of_retries: usize,
61    /// The base value for exponential backoff calculation.
62    pub exponent_base: u64,
63    /// The maximum delay between retry attempts (in seconds).
64    pub max_delay: u64,
65    /// The multiplication factor for retry delay calculation.
66    pub factor: u64,
67}
68
69impl Debug for DatabaseConfig {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        let redacted = self.password.as_ref().map(|_| "***");
72        f.debug_struct(stringify!(DatabaseConfig))
73            .field("database_type", &self.database_type)
74            .field("host", &self.host)
75            .field("port", &self.port)
76            .field("username", &self.username)
77            .field("password", &redacted)
78            .field("ssl", &self.ssl)
79            .field("connection_timeout", &self.connection_timeout)
80            .field("response_timeout", &self.response_timeout)
81            .field("number_of_retries", &self.number_of_retries)
82            .field("exponent_base", &self.exponent_base)
83            .field("max_delay", &self.max_delay)
84            .field("factor", &self.factor)
85            .finish()
86    }
87}
88
89impl Default for DatabaseConfig {
90    /// Creates a new default [`DatabaseConfig`] instance.
91    fn default() -> Self {
92        Self {
93            database_type: "redis".to_string(),
94            host: None,
95            port: None,
96            username: None,
97            password: None,
98            ssl: false,
99            connection_timeout: 20,
100            response_timeout: 20,
101            number_of_retries: 100,
102            exponent_base: 2,
103            max_delay: 1000,
104            factor: 2,
105        }
106    }
107}
108
109/// Configuration for `MessageBus` instances.
110#[cfg_attr(
111    feature = "python",
112    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
113)]
114#[cfg_attr(
115    feature = "python",
116    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
117)]
118#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, bon::Builder)]
119#[serde(default)]
120pub struct MessageBusConfig {
121    /// The configuration for the message bus backing database.
122    pub database: Option<DatabaseConfig>,
123    /// The encoding for database operations, controls the type of serializer used.
124    #[builder(default = SerializationEncoding::MsgPack)]
125    pub encoding: SerializationEncoding,
126    /// If timestamps should be persisted as ISO 8601 strings.
127    /// If `false`, then timestamps will be persisted as UNIX nanoseconds.
128    #[builder(default)]
129    pub timestamps_as_iso8601: bool,
130    /// The buffer interval (milliseconds) between pipelined/batched transactions.
131    /// The recommended range if using buffered pipelining is [10, 1000] milliseconds,
132    /// with a good compromise being 100 milliseconds.
133    pub buffer_interval_ms: Option<u32>,
134    /// The lookback window in minutes for automatic stream trimming.
135    /// The actual window may extend up to one minute beyond the specified value since streams are trimmed at most once every minute.
136    /// This feature requires Redis version 6.2 or higher; otherwise, it will result in a command syntax error.
137    pub autotrim_mins: Option<u32>,
138    /// If a 'trader-' prefix is used for stream names.
139    #[builder(default = true)]
140    pub use_trader_prefix: bool,
141    /// If the trader's ID is used for stream names.
142    #[builder(default = true)]
143    pub use_trader_id: bool,
144    /// If the trader's instance ID is used for stream names. Default is `false`.
145    #[builder(default)]
146    pub use_instance_id: bool,
147    /// The prefix for externally published stream names. Must have a `database` config.
148    #[builder(default = "stream".to_string())]
149    pub streams_prefix: String,
150    /// If `true`, messages will be written to separate streams per topic.
151    /// If `false`, all messages will be written to the same stream.
152    #[builder(default = true)]
153    pub stream_per_topic: bool,
154    /// The external stream keys the message bus will listen to for publishing deserialized message payloads internally.
155    pub external_streams: Option<Vec<String>>,
156    /// A list of serializable types **not** to publish externally.
157    pub types_filter: Option<Vec<String>>,
158    /// The heartbeat interval (seconds).
159    pub heartbeat_interval_secs: Option<u16>,
160}
161
162impl Default for MessageBusConfig {
163    fn default() -> Self {
164        Self::builder().build()
165    }
166}
167
168/// A generic message bus database facade.
169///
170/// The main operations take a consistent `key` and `payload` which should provide enough
171/// information to implement the message bus database in many different technologies.
172///
173/// Delete operations may need a `payload` to target specific values.
174pub trait MessageBusDatabaseAdapter {
175    type DatabaseType;
176
177    /// # Errors
178    ///
179    /// Returns an error if initializing the database connection fails.
180    fn new(
181        trader_id: TraderId,
182        instance_id: UUID4,
183        config: MessageBusConfig,
184    ) -> anyhow::Result<Self::DatabaseType>;
185    fn is_closed(&self) -> bool;
186    fn publish(&self, topic: Ustr, payload: Bytes);
187    fn close(&mut self);
188}
189
190#[cfg(test)]
191mod tests {
192    use rstest::*;
193    use serde_json::json;
194
195    use super::*;
196
197    #[rstest]
198    fn test_default_database_config() {
199        let config = DatabaseConfig::default();
200        assert_eq!(config.database_type, "redis");
201        assert_eq!(config.host, None);
202        assert_eq!(config.port, None);
203        assert_eq!(config.username, None);
204        assert_eq!(config.password, None);
205        assert!(!config.ssl);
206        assert_eq!(config.connection_timeout, 20);
207        assert_eq!(config.response_timeout, 20);
208        assert_eq!(config.number_of_retries, 100);
209        assert_eq!(config.exponent_base, 2);
210        assert_eq!(config.max_delay, 1000);
211        assert_eq!(config.factor, 2);
212    }
213
214    #[rstest]
215    fn test_deserialize_database_config() {
216        let config_json = json!({
217            "type": "redis",
218            "host": "localhost",
219            "port": 6379,
220            "username": "user",
221            "password": "pass",
222            "ssl": true,
223            "connection_timeout": 30,
224            "response_timeout": 10,
225            "number_of_retries": 3,
226            "exponent_base": 2,
227            "max_delay": 10,
228            "factor": 2
229        });
230        let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
231        assert_eq!(config.database_type, "redis");
232        assert_eq!(config.host, Some("localhost".to_string()));
233        assert_eq!(config.port, Some(6379));
234        assert_eq!(config.username, Some("user".to_string()));
235        assert_eq!(config.password, Some("pass".to_string()));
236        assert!(config.ssl);
237        assert_eq!(config.connection_timeout, 30);
238        assert_eq!(config.response_timeout, 10);
239        assert_eq!(config.number_of_retries, 3);
240        assert_eq!(config.exponent_base, 2);
241        assert_eq!(config.max_delay, 10);
242        assert_eq!(config.factor, 2);
243    }
244
245    #[rstest]
246    fn test_default_message_bus_config() {
247        let config = MessageBusConfig::default();
248        assert_eq!(config.encoding, SerializationEncoding::MsgPack);
249        assert!(!config.timestamps_as_iso8601);
250        assert_eq!(config.buffer_interval_ms, None);
251        assert_eq!(config.autotrim_mins, None);
252        assert!(config.use_trader_prefix);
253        assert!(config.use_trader_id);
254        assert!(!config.use_instance_id);
255        assert_eq!(config.streams_prefix, "stream");
256        assert!(config.stream_per_topic);
257        assert_eq!(config.external_streams, None);
258        assert_eq!(config.types_filter, None);
259    }
260
261    #[rstest]
262    fn test_deserialize_message_bus_config() {
263        let config_json = json!({
264            "database": {
265                "type": "redis",
266                "host": "localhost",
267                "port": 6379,
268                "username": "user",
269                "password": "pass",
270                "ssl": true,
271                "connection_timeout": 30,
272                "response_timeout": 10,
273                "number_of_retries": 3,
274                "exponent_base": 2,
275                "max_delay": 10,
276                "factor": 2
277            },
278            "encoding": "json",
279            "timestamps_as_iso8601": true,
280            "buffer_interval_ms": 100,
281            "autotrim_mins": 60,
282            "use_trader_prefix": false,
283            "use_trader_id": false,
284            "use_instance_id": true,
285            "streams_prefix": "data_streams",
286            "stream_per_topic": false,
287            "external_streams": ["stream1", "stream2"],
288            "types_filter": ["type1", "type2"]
289        });
290        let config: MessageBusConfig = serde_json::from_value(config_json).unwrap();
291        assert_eq!(config.encoding, SerializationEncoding::Json);
292        assert!(config.timestamps_as_iso8601);
293        assert_eq!(config.buffer_interval_ms, Some(100));
294        assert_eq!(config.autotrim_mins, Some(60));
295        assert!(!config.use_trader_prefix);
296        assert!(!config.use_trader_id);
297        assert!(config.use_instance_id);
298        assert_eq!(config.streams_prefix, "data_streams");
299        assert!(!config.stream_per_topic);
300        assert_eq!(
301            config.external_streams,
302            Some(vec!["stream1".to_string(), "stream2".to_string()])
303        );
304        assert_eq!(
305            config.types_filter,
306            Some(vec!["type1".to_string(), "type2".to_string()])
307        );
308    }
309}