1use std::fmt::Debug;
17
18use bytes::Bytes;
19use nautilus_core::UUID4;
20use nautilus_model::identifiers::TraderId;
21use serde::{Deserialize, Serialize};
22use ustr::Ustr;
23
24use crate::enums::SerializationEncoding;
25
26#[cfg_attr(
32 feature = "python",
33 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
34)]
35#[cfg_attr(
36 feature = "python",
37 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
38)]
39#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
40#[serde(default)]
41pub struct DatabaseConfig {
42 #[serde(alias = "type")]
44 pub database_type: String,
45 pub host: Option<String>,
47 pub port: Option<u16>,
49 pub username: Option<String>,
51 pub password: Option<String>,
53 pub ssl: bool,
55 pub connection_timeout: u16,
57 pub response_timeout: u16,
59 pub number_of_retries: usize,
61 pub exponent_base: u64,
63 pub max_delay: u64,
65 pub factor: u64,
67}
68
69impl Debug for DatabaseConfig {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 let redacted = self.password.as_ref().map(|_| "***");
72 f.debug_struct(stringify!(DatabaseConfig))
73 .field("database_type", &self.database_type)
74 .field("host", &self.host)
75 .field("port", &self.port)
76 .field("username", &self.username)
77 .field("password", &redacted)
78 .field("ssl", &self.ssl)
79 .field("connection_timeout", &self.connection_timeout)
80 .field("response_timeout", &self.response_timeout)
81 .field("number_of_retries", &self.number_of_retries)
82 .field("exponent_base", &self.exponent_base)
83 .field("max_delay", &self.max_delay)
84 .field("factor", &self.factor)
85 .finish()
86 }
87}
88
89impl Default for DatabaseConfig {
90 fn default() -> Self {
92 Self {
93 database_type: "redis".to_string(),
94 host: None,
95 port: None,
96 username: None,
97 password: None,
98 ssl: false,
99 connection_timeout: 20,
100 response_timeout: 20,
101 number_of_retries: 100,
102 exponent_base: 2,
103 max_delay: 1000,
104 factor: 2,
105 }
106 }
107}
108
109#[cfg_attr(
111 feature = "python",
112 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
113)]
114#[cfg_attr(
115 feature = "python",
116 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
117)]
118#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, bon::Builder)]
119#[serde(default)]
120pub struct MessageBusConfig {
121 pub database: Option<DatabaseConfig>,
123 #[builder(default = SerializationEncoding::MsgPack)]
125 pub encoding: SerializationEncoding,
126 #[builder(default)]
129 pub timestamps_as_iso8601: bool,
130 pub buffer_interval_ms: Option<u32>,
134 pub autotrim_mins: Option<u32>,
138 #[builder(default = true)]
140 pub use_trader_prefix: bool,
141 #[builder(default = true)]
143 pub use_trader_id: bool,
144 #[builder(default)]
146 pub use_instance_id: bool,
147 #[builder(default = "stream".to_string())]
149 pub streams_prefix: String,
150 #[builder(default = true)]
153 pub stream_per_topic: bool,
154 pub external_streams: Option<Vec<String>>,
156 pub types_filter: Option<Vec<String>>,
158 pub heartbeat_interval_secs: Option<u16>,
160}
161
162impl Default for MessageBusConfig {
163 fn default() -> Self {
164 Self::builder().build()
165 }
166}
167
168pub trait MessageBusDatabaseAdapter {
175 type DatabaseType;
176
177 fn new(
181 trader_id: TraderId,
182 instance_id: UUID4,
183 config: MessageBusConfig,
184 ) -> anyhow::Result<Self::DatabaseType>;
185 fn is_closed(&self) -> bool;
186 fn publish(&self, topic: Ustr, payload: Bytes);
187 fn close(&mut self);
188}
189
190#[cfg(test)]
191mod tests {
192 use rstest::*;
193 use serde_json::json;
194
195 use super::*;
196
197 #[rstest]
198 fn test_default_database_config() {
199 let config = DatabaseConfig::default();
200 assert_eq!(config.database_type, "redis");
201 assert_eq!(config.host, None);
202 assert_eq!(config.port, None);
203 assert_eq!(config.username, None);
204 assert_eq!(config.password, None);
205 assert!(!config.ssl);
206 assert_eq!(config.connection_timeout, 20);
207 assert_eq!(config.response_timeout, 20);
208 assert_eq!(config.number_of_retries, 100);
209 assert_eq!(config.exponent_base, 2);
210 assert_eq!(config.max_delay, 1000);
211 assert_eq!(config.factor, 2);
212 }
213
214 #[rstest]
215 fn test_deserialize_database_config() {
216 let config_json = json!({
217 "type": "redis",
218 "host": "localhost",
219 "port": 6379,
220 "username": "user",
221 "password": "pass",
222 "ssl": true,
223 "connection_timeout": 30,
224 "response_timeout": 10,
225 "number_of_retries": 3,
226 "exponent_base": 2,
227 "max_delay": 10,
228 "factor": 2
229 });
230 let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
231 assert_eq!(config.database_type, "redis");
232 assert_eq!(config.host, Some("localhost".to_string()));
233 assert_eq!(config.port, Some(6379));
234 assert_eq!(config.username, Some("user".to_string()));
235 assert_eq!(config.password, Some("pass".to_string()));
236 assert!(config.ssl);
237 assert_eq!(config.connection_timeout, 30);
238 assert_eq!(config.response_timeout, 10);
239 assert_eq!(config.number_of_retries, 3);
240 assert_eq!(config.exponent_base, 2);
241 assert_eq!(config.max_delay, 10);
242 assert_eq!(config.factor, 2);
243 }
244
245 #[rstest]
246 fn test_default_message_bus_config() {
247 let config = MessageBusConfig::default();
248 assert_eq!(config.encoding, SerializationEncoding::MsgPack);
249 assert!(!config.timestamps_as_iso8601);
250 assert_eq!(config.buffer_interval_ms, None);
251 assert_eq!(config.autotrim_mins, None);
252 assert!(config.use_trader_prefix);
253 assert!(config.use_trader_id);
254 assert!(!config.use_instance_id);
255 assert_eq!(config.streams_prefix, "stream");
256 assert!(config.stream_per_topic);
257 assert_eq!(config.external_streams, None);
258 assert_eq!(config.types_filter, None);
259 }
260
261 #[rstest]
262 fn test_deserialize_message_bus_config() {
263 let config_json = json!({
264 "database": {
265 "type": "redis",
266 "host": "localhost",
267 "port": 6379,
268 "username": "user",
269 "password": "pass",
270 "ssl": true,
271 "connection_timeout": 30,
272 "response_timeout": 10,
273 "number_of_retries": 3,
274 "exponent_base": 2,
275 "max_delay": 10,
276 "factor": 2
277 },
278 "encoding": "json",
279 "timestamps_as_iso8601": true,
280 "buffer_interval_ms": 100,
281 "autotrim_mins": 60,
282 "use_trader_prefix": false,
283 "use_trader_id": false,
284 "use_instance_id": true,
285 "streams_prefix": "data_streams",
286 "stream_per_topic": false,
287 "external_streams": ["stream1", "stream2"],
288 "types_filter": ["type1", "type2"]
289 });
290 let config: MessageBusConfig = serde_json::from_value(config_json).unwrap();
291 assert_eq!(config.encoding, SerializationEncoding::Json);
292 assert!(config.timestamps_as_iso8601);
293 assert_eq!(config.buffer_interval_ms, Some(100));
294 assert_eq!(config.autotrim_mins, Some(60));
295 assert!(!config.use_trader_prefix);
296 assert!(!config.use_trader_id);
297 assert!(config.use_instance_id);
298 assert_eq!(config.streams_prefix, "data_streams");
299 assert!(!config.stream_per_topic);
300 assert_eq!(
301 config.external_streams,
302 Some(vec!["stream1".to_string(), "stream2".to_string()])
303 );
304 assert_eq!(
305 config.types_filter,
306 Some(vec!["type1".to_string(), "type2".to_string()])
307 );
308 }
309}