1#[cfg(feature = "python")]
17use std::collections::HashSet;
18#[cfg(feature = "python")]
19use std::sync::RwLock;
20use std::{any::Any, fmt::Debug, sync::Arc};
21
22use nautilus_core::UnixNanos;
23#[cfg(feature = "python")]
24use pyo3::{IntoPyObjectExt, prelude::*, types::PyAny};
25use serde::{Serialize, Serializer};
26
27use crate::data::{
28 Data, DataType, HasTsInit,
29 registry::{ensure_json_deserializer_registered, register_json_deserializer},
30};
31
32#[cfg(feature = "python")]
33fn intern_type_name_static(name: String) -> &'static str {
34 static INTERNER: std::sync::OnceLock<RwLock<HashSet<&'static str>>> =
35 std::sync::OnceLock::new();
36 let set = INTERNER.get_or_init(|| RwLock::new(HashSet::new()));
37
38 if let Ok(guard) = set.read()
39 && guard.contains(name.as_str())
40 {
41 return guard.get(name.as_str()).copied().unwrap();
42 }
43
44 match set.write() {
45 Ok(mut guard) => {
46 if let Some(&existing) = guard.get(name.as_str()) {
47 return existing;
48 }
49 let leaked: &'static str = Box::leak(name.into_boxed_str());
50 guard.insert(leaked);
51 leaked
52 }
53 Err(_) => {
54 log::warn!("intern_type_name_static: RwLock poisoned, interning skipped for type name");
55 Box::leak(name.into_boxed_str())
56 }
57 }
58}
59
60#[cfg(feature = "python")]
67pub struct PythonCustomDataWrapper {
68 py_object: Py<PyAny>,
70 cached_ts_event: UnixNanos,
72 cached_ts_init: UnixNanos,
74 cached_type_name: String,
76 cached_type_name_static: &'static str,
78}
79
80#[cfg(feature = "python")]
81impl PythonCustomDataWrapper {
82 pub fn new(_py: Python<'_>, py_object: &Bound<'_, PyAny>) -> PyResult<Self> {
89 let ts_event: u64 = py_object.getattr("ts_event")?.extract()?;
91 let ts_event = UnixNanos::from(ts_event);
92
93 let ts_init: u64 = py_object.getattr("ts_init")?.extract()?;
95 let ts_init = UnixNanos::from(ts_init);
96
97 let data_class = py_object.get_type();
99 let type_name: String = if data_class.hasattr("type_name_static")? {
100 data_class.call_method0("type_name_static")?.extract()?
101 } else {
102 data_class.getattr("__name__")?.extract()?
103 };
104
105 let type_name_static: &'static str = intern_type_name_static(type_name.clone());
107
108 Ok(Self {
109 py_object: py_object.clone().unbind(),
110 cached_ts_event: ts_event,
111 cached_ts_init: ts_init,
112 cached_type_name: type_name,
113 cached_type_name_static: type_name_static,
114 })
115 }
116
117 #[must_use]
119 pub fn py_object(&self) -> &Py<PyAny> {
120 &self.py_object
121 }
122
123 #[must_use]
125 pub fn get_type_name(&self) -> &str {
126 &self.cached_type_name
127 }
128}
129
130#[cfg(feature = "python")]
131impl Clone for PythonCustomDataWrapper {
132 fn clone(&self) -> Self {
133 Python::attach(|py| Self {
134 py_object: self.py_object.clone_ref(py),
135 cached_ts_event: self.cached_ts_event,
136 cached_ts_init: self.cached_ts_init,
137 cached_type_name: self.cached_type_name.clone(),
138 cached_type_name_static: self.cached_type_name_static,
139 })
140 }
141}
142
143#[cfg(feature = "python")]
144impl Debug for PythonCustomDataWrapper {
145 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146 f.debug_struct(stringify!(PythonCustomDataWrapper))
147 .field("type_name", &self.cached_type_name)
148 .field("ts_event", &self.cached_ts_event)
149 .field("ts_init", &self.cached_ts_init)
150 .finish()
151 }
152}
153
154#[cfg(feature = "python")]
155impl HasTsInit for PythonCustomDataWrapper {
156 fn ts_init(&self) -> UnixNanos {
157 self.cached_ts_init
158 }
159}
160
161#[cfg(feature = "python")]
162impl CustomDataTrait for PythonCustomDataWrapper {
163 fn type_name(&self) -> &'static str {
164 self.cached_type_name_static
165 }
166
167 fn as_any(&self) -> &dyn Any {
168 self
169 }
170
171 fn ts_event(&self) -> UnixNanos {
172 self.cached_ts_event
173 }
174
175 fn to_json(&self) -> anyhow::Result<String> {
176 Python::attach(|py| {
177 let obj = self.py_object.bind(py);
178 if obj.hasattr("to_json")? {
180 let json_str: String = obj.call_method0("to_json")?.extract()?;
181 Ok(json_str)
182 } else {
183 let json_module = py.import("json")?;
185 let dict = if obj.hasattr("__dict__")? {
187 obj.getattr("__dict__")?
188 } else {
189 anyhow::bail!("Python object has no to_json() method or __dict__ attribute");
190 };
191 let json_str: String = json_module.call_method1("dumps", (dict,))?.extract()?;
192 Ok(json_str)
193 }
194 })
195 }
196
197 fn clone_arc(&self) -> Arc<dyn CustomDataTrait> {
198 Arc::new(self.clone())
199 }
200
201 fn eq_arc(&self, other: &dyn CustomDataTrait) -> bool {
202 if let Some(other_wrapper) = other.as_any().downcast_ref::<Self>() {
205 Python::attach(|py| {
206 let a = self.py_object.bind(py);
207 let b = other_wrapper.py_object.bind(py);
208 if a.is(b) {
209 return true;
210 }
211 a.eq(b).unwrap_or(false)
212 })
213 } else {
214 false
215 }
216 }
217
218 fn to_pyobject(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
219 Ok(self.py_object.clone_ref(py))
221 }
222}
223
224#[cfg(feature = "python")]
225fn python_data_classes() -> &'static dashmap::DashMap<String, Py<PyAny>> {
226 static PYTHON_DATA_CLASSES: std::sync::OnceLock<dashmap::DashMap<String, Py<PyAny>>> =
227 std::sync::OnceLock::new();
228 PYTHON_DATA_CLASSES.get_or_init(dashmap::DashMap::new)
229}
230
231#[cfg(feature = "python")]
232pub fn register_python_data_class(type_name: &str, data_class: &Bound<'_, PyAny>) {
233 python_data_classes().insert(type_name.to_string(), data_class.clone().unbind());
234}
235
236#[cfg(feature = "python")]
237pub fn get_python_data_class(py: Python<'_>, type_name: &str) -> Option<Py<PyAny>> {
238 python_data_classes()
239 .get(type_name)
240 .map(|entry| entry.value().clone_ref(py))
241}
242
243#[cfg(feature = "python")]
249pub fn reconstruct_python_custom_data(
250 py: Python<'_>,
251 type_name: &str,
252 json: &str,
253) -> PyResult<Py<PyAny>> {
254 let data_class = get_python_data_class(py, type_name).ok_or_else(|| {
255 nautilus_core::python::to_pyruntime_err(format!(
256 "No registered Python class for custom data type `{type_name}`"
257 ))
258 })?;
259 let json_module = py.import("json")?;
260 let payload = json_module.call_method1("loads", (json,))?;
261 data_class
262 .bind(py)
263 .call_method1("from_json", (payload,))
264 .map(|obj| obj.unbind())
265}
266
267#[cfg(feature = "python")]
276pub fn clone_pyclass_to_pyobject<T>(value: &T, py: Python<'_>) -> PyResult<Py<PyAny>>
277where
278 T: Clone,
279 for<'py> T: pyo3::IntoPyObject<'py, Error = pyo3::PyErr>,
280{
281 value.clone().into_py_any(py)
282}
283
284pub trait CustomDataTrait: HasTsInit + Send + Sync + Debug {
286 fn type_name(&self) -> &'static str;
288
289 fn as_any(&self) -> &dyn Any;
291
292 fn ts_event(&self) -> UnixNanos;
294
295 fn to_json(&self) -> anyhow::Result<String>;
300
301 fn to_json_py(&self) -> anyhow::Result<String> {
307 self.to_json()
308 }
309
310 fn clone_arc(&self) -> Arc<dyn CustomDataTrait>;
312
313 fn eq_arc(&self, other: &dyn CustomDataTrait) -> bool;
315
316 #[cfg(feature = "python")]
321 fn to_pyobject(&self, _py: Python<'_>) -> PyResult<Py<PyAny>> {
322 Err(nautilus_core::python::to_pytype_err(format!(
323 "to_pyobject not implemented for {}",
324 self.type_name()
325 )))
326 }
327
328 fn type_name_static() -> &'static str
330 where
331 Self: Sized,
332 {
333 std::any::type_name::<Self>()
334 }
335
336 fn from_json(_value: serde_json::Value) -> anyhow::Result<Arc<dyn CustomDataTrait>>
341 where
342 Self: Sized,
343 {
344 anyhow::bail!(
345 "from_json not implemented for {}",
346 std::any::type_name::<Self>()
347 )
348 }
349}
350
351pub fn register_custom_data_json<T: CustomDataTrait + Sized>() -> anyhow::Result<()> {
357 let type_name = T::type_name_static();
358 register_json_deserializer(type_name, Box::new(|value| T::from_json(value)))
359}
360
361pub fn ensure_custom_data_json_registered<T: CustomDataTrait + Sized>() -> anyhow::Result<()> {
367 let type_name = T::type_name_static();
368 ensure_json_deserializer_registered(type_name, Box::new(|value| T::from_json(value)))
369}
370
371#[cfg_attr(
377 feature = "python",
378 pyclass(
379 module = "nautilus_trader.core.nautilus_pyo3.model",
380 name = "CustomData",
381 from_py_object
382 )
383)]
384#[cfg_attr(
385 feature = "python",
386 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.model")
387)]
388#[derive(Clone, Debug)]
389pub struct CustomData {
390 pub data: Arc<dyn CustomDataTrait>,
392 pub data_type: DataType,
394}
395
396impl CustomData {
397 pub fn from_arc(arc: Arc<dyn CustomDataTrait>) -> Self {
400 let data_type = DataType::new(arc.type_name(), None, None);
401 Self {
402 data: arc,
403 data_type,
404 }
405 }
406
407 pub fn new(data: Arc<dyn CustomDataTrait>, data_type: DataType) -> Self {
412 Self { data, data_type }
413 }
414}
415
416impl PartialEq for CustomData {
417 fn eq(&self, other: &Self) -> bool {
418 self.data.eq_arc(other.data.as_ref()) && self.data_type == other.data_type
419 }
420}
421
422impl HasTsInit for CustomData {
423 fn ts_init(&self) -> UnixNanos {
424 self.data.ts_init()
425 }
426}
427
428pub(crate) fn parse_custom_data_from_json_bytes(
429 bytes: &[u8],
430) -> Result<CustomData, serde_json::Error> {
431 let data: Data = serde_json::from_slice(bytes)?;
432 match data {
433 Data::Custom(custom) => Ok(custom),
434 _ => Err(serde_json::Error::io(std::io::Error::new(
435 std::io::ErrorKind::InvalidData,
436 "JSON does not represent CustomData",
437 ))),
438 }
439}
440
441impl CustomData {
442 pub fn from_json_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
448 parse_custom_data_from_json_bytes(bytes)
449 }
450}
451
452struct CustomDataEnvelope {
455 type_name: String,
456 data_type: serde_json::Value,
457 payload: serde_json::Value,
458}
459
460impl Serialize for CustomDataEnvelope {
461 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
462 where
463 S: Serializer,
464 {
465 use serde::ser::SerializeStruct;
466 let mut state = serializer.serialize_struct("CustomDataEnvelope", 3)?;
467 state.serialize_field("type", &self.type_name)?;
468 state.serialize_field("data_type", &self.data_type)?;
469 state.serialize_field("payload", &self.payload)?;
470 state.end()
471 }
472}
473
474impl CustomData {
475 fn to_envelope_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
476 let json = self.data.to_json().map_err(|e| {
477 serde_json::Error::io(std::io::Error::new(std::io::ErrorKind::InvalidData, e))
478 })?;
479 let payload: serde_json::Value = serde_json::from_str(&json)?;
480 let metadata_value = self.data_type.metadata().map_or(
481 serde_json::Value::Object(serde_json::Map::new()),
482 |m| {
483 serde_json::to_value(m).unwrap_or(serde_json::Value::Object(serde_json::Map::new()))
484 },
485 );
486 let mut data_type_obj = serde_json::Map::new();
487 data_type_obj.insert(
488 "type_name".to_string(),
489 serde_json::Value::String(self.data_type.type_name().to_string()),
490 );
491 data_type_obj.insert("metadata".to_string(), metadata_value);
492
493 if let Some(id) = self.data_type.identifier() {
494 data_type_obj.insert(
495 "identifier".to_string(),
496 serde_json::Value::String(id.to_string()),
497 );
498 }
499
500 let envelope = CustomDataEnvelope {
501 type_name: self.data.type_name().to_string(),
502 data_type: serde_json::Value::Object(data_type_obj),
503 payload,
504 };
505 serde_json::to_value(envelope)
506 }
507}
508
509impl Serialize for CustomData {
510 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
511 where
512 S: Serializer,
513 {
514 let value = self
515 .to_envelope_json_value()
516 .map_err(serde::ser::Error::custom)?;
517 value.serialize(serializer)
518 }
519}
520
521#[cfg(test)]
522mod tests {
523 use nautilus_core::{Params, UnixNanos};
524 use rstest::rstest;
525 use serde::Deserialize;
526 use serde_json::json;
527
528 use super::*;
529 use crate::{data::HasTsInit, identifiers::InstrumentId};
530
531 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
532 struct TestCustomData {
533 ts_init: UnixNanos,
534 instrument_id: InstrumentId,
535 }
536
537 impl HasTsInit for TestCustomData {
538 fn ts_init(&self) -> UnixNanos {
539 self.ts_init
540 }
541 }
542
543 impl CustomDataTrait for TestCustomData {
544 fn type_name(&self) -> &'static str {
545 "TestCustomData"
546 }
547 fn as_any(&self) -> &dyn Any {
548 self
549 }
550 fn ts_event(&self) -> UnixNanos {
551 self.ts_init
552 }
553 fn to_json(&self) -> anyhow::Result<String> {
554 Ok(serde_json::to_string(self)?)
555 }
556 fn clone_arc(&self) -> Arc<dyn CustomDataTrait> {
557 Arc::new(self.clone())
558 }
559 fn eq_arc(&self, other: &dyn CustomDataTrait) -> bool {
560 if let Some(other) = other.as_any().downcast_ref::<Self>() {
561 self == other
562 } else {
563 false
564 }
565 }
566
567 fn type_name_static() -> &'static str {
568 "TestCustomData"
569 }
570
571 fn from_json(value: serde_json::Value) -> anyhow::Result<Arc<dyn CustomDataTrait>> {
572 let parsed: Self = serde_json::from_value(value)?;
573 Ok(Arc::new(parsed))
574 }
575 }
576
577 #[rstest]
578 fn test_custom_data_json_roundtrip() {
579 register_custom_data_json::<TestCustomData>()
580 .expect("TestCustomData must register for JSON roundtrip test");
581
582 let instrument_id = InstrumentId::from("TEST.SIM");
583 let metadata = Some(
584 serde_json::from_value::<Params>(json!({"key1": "value1", "key2": "value2"})).unwrap(),
585 );
586 let inner = TestCustomData {
587 ts_init: UnixNanos::from(100),
588 instrument_id,
589 };
590 let data_type = DataType::new("TestCustomData", metadata, Some(instrument_id.to_string()));
591 let original = CustomData::new(Arc::new(inner), data_type);
592
593 let json_bytes = serde_json::to_vec(&original).unwrap();
594 let roundtripped = CustomData::from_json_bytes(&json_bytes).unwrap();
595
596 assert_eq!(
597 roundtripped.data_type.type_name(),
598 original.data_type.type_name()
599 );
600 assert_eq!(
601 roundtripped.data_type.metadata(),
602 original.data_type.metadata()
603 );
604 assert_eq!(
605 roundtripped.data_type.identifier(),
606 original.data_type.identifier()
607 );
608 let orig_inner = original
609 .data
610 .as_any()
611 .downcast_ref::<TestCustomData>()
612 .unwrap();
613 let rt_inner = roundtripped
614 .data
615 .as_any()
616 .downcast_ref::<TestCustomData>()
617 .unwrap();
618 assert_eq!(orig_inner, rt_inner);
619 }
620
621 #[rstest]
622 fn test_custom_data_wrapper() {
623 let instrument_id = InstrumentId::from("TEST.SIM");
624 let data = TestCustomData {
625 ts_init: UnixNanos::from(100),
626 instrument_id,
627 };
628 let data_type = DataType::new("TestCustomData", None, Some(instrument_id.to_string()));
629 let custom_data = CustomData::new(Arc::new(data), data_type);
630
631 assert_eq!(custom_data.data.ts_init(), UnixNanos::from(100));
632 assert_eq!(Data::Custom(custom_data).instrument_id(), instrument_id);
633 }
634}