1#[cfg(feature = "python")]
17use std::collections::HashSet;
18#[cfg(feature = "python")]
19use std::sync::RwLock;
20use std::{any::Any, fmt::Debug, sync::Arc};
21
22use nautilus_core::UnixNanos;
23#[cfg(feature = "python")]
24use pyo3::{IntoPyObjectExt, prelude::*, types::PyAny};
25use serde::{Serialize, Serializer};
26
27use crate::data::{
28 Data, DataType, HasTsInit,
29 registry::{ensure_json_deserializer_registered, register_json_deserializer},
30};
31
32#[cfg(feature = "python")]
33fn intern_type_name_static(name: String) -> &'static str {
34 static INTERNER: std::sync::OnceLock<RwLock<HashSet<&'static str>>> =
35 std::sync::OnceLock::new();
36 let set = INTERNER.get_or_init(|| RwLock::new(HashSet::new()));
37
38 if let Ok(guard) = set.read()
39 && guard.contains(name.as_str())
40 {
41 return guard.get(name.as_str()).copied().unwrap();
42 }
43
44 if let Ok(mut guard) = set.write() {
45 if let Some(&existing) = guard.get(name.as_str()) {
46 return existing;
47 }
48 let leaked: &'static str = Box::leak(name.into_boxed_str());
49 guard.insert(leaked);
50 leaked
51 } else {
52 log::warn!("intern_type_name_static: RwLock poisoned, interning skipped for type name");
53 Box::leak(name.into_boxed_str())
54 }
55}
56
57#[cfg(feature = "python")]
64pub struct PythonCustomDataWrapper {
65 py_object: Py<PyAny>,
67 cached_ts_event: UnixNanos,
69 cached_ts_init: UnixNanos,
71 cached_type_name: String,
73 cached_type_name_static: &'static str,
75}
76
77#[cfg(feature = "python")]
78impl PythonCustomDataWrapper {
79 pub fn new(_py: Python<'_>, py_object: &Bound<'_, PyAny>) -> PyResult<Self> {
86 let ts_event: u64 = py_object.getattr("ts_event")?.extract()?;
88 let ts_event = UnixNanos::from(ts_event);
89
90 let ts_init: u64 = py_object.getattr("ts_init")?.extract()?;
92 let ts_init = UnixNanos::from(ts_init);
93
94 let data_class = py_object.get_type();
96 let type_name: String = if data_class.hasattr("type_name_static")? {
97 data_class.call_method0("type_name_static")?.extract()?
98 } else {
99 data_class.getattr("__name__")?.extract()?
100 };
101
102 let type_name_static: &'static str = intern_type_name_static(type_name.clone());
104
105 Ok(Self {
106 py_object: py_object.clone().unbind(),
107 cached_ts_event: ts_event,
108 cached_ts_init: ts_init,
109 cached_type_name: type_name,
110 cached_type_name_static: type_name_static,
111 })
112 }
113
114 #[must_use]
116 pub fn py_object(&self) -> &Py<PyAny> {
117 &self.py_object
118 }
119
120 #[must_use]
122 pub fn get_type_name(&self) -> &str {
123 &self.cached_type_name
124 }
125}
126
127#[cfg(feature = "python")]
128impl Clone for PythonCustomDataWrapper {
129 fn clone(&self) -> Self {
130 Python::attach(|py| Self {
131 py_object: self.py_object.clone_ref(py),
132 cached_ts_event: self.cached_ts_event,
133 cached_ts_init: self.cached_ts_init,
134 cached_type_name: self.cached_type_name.clone(),
135 cached_type_name_static: self.cached_type_name_static,
136 })
137 }
138}
139
140#[cfg(feature = "python")]
141impl Debug for PythonCustomDataWrapper {
142 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 f.debug_struct(stringify!(PythonCustomDataWrapper))
144 .field("py_object", &self.py_object)
145 .field("type_name", &self.cached_type_name)
146 .field("type_name_static", &self.cached_type_name_static)
147 .field("ts_event", &self.cached_ts_event)
148 .field("ts_init", &self.cached_ts_init)
149 .finish()
150 }
151}
152
153#[cfg(feature = "python")]
154impl HasTsInit for PythonCustomDataWrapper {
155 fn ts_init(&self) -> UnixNanos {
156 self.cached_ts_init
157 }
158}
159
160#[cfg(feature = "python")]
161impl CustomDataTrait for PythonCustomDataWrapper {
162 fn type_name(&self) -> &'static str {
163 self.cached_type_name_static
164 }
165
166 fn as_any(&self) -> &dyn Any {
167 self
168 }
169
170 fn ts_event(&self) -> UnixNanos {
171 self.cached_ts_event
172 }
173
174 fn to_json(&self) -> anyhow::Result<String> {
175 Python::attach(|py| {
176 let obj = self.py_object.bind(py);
177 if obj.hasattr("to_json")? {
179 let json_str: String = obj.call_method0("to_json")?.extract()?;
180 Ok(json_str)
181 } else {
182 let json_module = py.import("json")?;
184 let dict = if obj.hasattr("__dict__")? {
186 obj.getattr("__dict__")?
187 } else {
188 anyhow::bail!("Python object has no to_json() method or __dict__ attribute");
189 };
190 let json_str: String = json_module.call_method1("dumps", (dict,))?.extract()?;
191 Ok(json_str)
192 }
193 })
194 }
195
196 fn clone_arc(&self) -> Arc<dyn CustomDataTrait> {
197 Arc::new(self.clone())
198 }
199
200 fn eq_arc(&self, other: &dyn CustomDataTrait) -> bool {
201 if let Some(other_wrapper) = other.as_any().downcast_ref::<Self>() {
204 Python::attach(|py| {
205 let a = self.py_object.bind(py);
206 let b = other_wrapper.py_object.bind(py);
207 if a.is(b) {
208 return true;
209 }
210 a.eq(b).unwrap_or(false)
211 })
212 } else {
213 false
214 }
215 }
216
217 fn to_pyobject(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
218 Ok(self.py_object.clone_ref(py))
220 }
221}
222
223#[cfg(feature = "python")]
224fn python_data_classes() -> &'static dashmap::DashMap<String, Py<PyAny>> {
225 static PYTHON_DATA_CLASSES: std::sync::OnceLock<dashmap::DashMap<String, Py<PyAny>>> =
226 std::sync::OnceLock::new();
227 PYTHON_DATA_CLASSES.get_or_init(dashmap::DashMap::new)
228}
229
230#[cfg(feature = "python")]
231pub fn register_python_data_class(type_name: &str, data_class: &Bound<'_, PyAny>) {
232 python_data_classes().insert(type_name.to_string(), data_class.clone().unbind());
233}
234
235#[cfg(feature = "python")]
236#[must_use]
237pub fn get_python_data_class(py: Python<'_>, type_name: &str) -> Option<Py<PyAny>> {
238 python_data_classes()
239 .get(type_name)
240 .map(|entry| entry.value().clone_ref(py))
241}
242
243#[cfg(feature = "python")]
249pub fn reconstruct_python_custom_data(
250 py: Python<'_>,
251 type_name: &str,
252 json: &str,
253) -> PyResult<Py<PyAny>> {
254 let data_class = get_python_data_class(py, type_name).ok_or_else(|| {
255 nautilus_core::python::to_pyruntime_err(format!(
256 "No registered Python class for custom data type `{type_name}`"
257 ))
258 })?;
259 let json_module = py.import("json")?;
260 let payload = json_module.call_method1("loads", (json,))?;
261 data_class
262 .bind(py)
263 .call_method1("from_json", (payload,))
264 .map(|obj| obj.unbind())
265}
266
267#[cfg(feature = "python")]
276pub fn clone_pyclass_to_pyobject<T>(value: &T, py: Python<'_>) -> PyResult<Py<PyAny>>
277where
278 T: Clone,
279 for<'py> T: pyo3::IntoPyObject<'py, Error = pyo3::PyErr>,
280{
281 value.clone().into_py_any(py)
282}
283
284pub trait CustomDataTrait: HasTsInit + Send + Sync + Debug {
286 fn type_name(&self) -> &'static str;
288
289 fn as_any(&self) -> &dyn Any;
291
292 fn ts_event(&self) -> UnixNanos;
294
295 fn to_json(&self) -> anyhow::Result<String>;
300
301 fn to_json_py(&self) -> anyhow::Result<String> {
307 self.to_json()
308 }
309
310 fn clone_arc(&self) -> Arc<dyn CustomDataTrait>;
312
313 fn eq_arc(&self, other: &dyn CustomDataTrait) -> bool;
315
316 #[cfg(feature = "python")]
321 fn to_pyobject(&self, _py: Python<'_>) -> PyResult<Py<PyAny>> {
322 Err(nautilus_core::python::to_pytype_err(format!(
323 "to_pyobject not implemented for {}",
324 self.type_name()
325 )))
326 }
327
328 #[must_use]
330 fn type_name_static() -> &'static str
331 where
332 Self: Sized,
333 {
334 std::any::type_name::<Self>()
335 }
336
337 fn from_json(_value: serde_json::Value) -> anyhow::Result<Arc<dyn CustomDataTrait>>
342 where
343 Self: Sized,
344 {
345 anyhow::bail!(
346 "from_json not implemented for {}",
347 std::any::type_name::<Self>()
348 )
349 }
350}
351
352pub fn register_custom_data_json<T: CustomDataTrait + Sized>() -> anyhow::Result<()> {
358 let type_name = T::type_name_static();
359 register_json_deserializer(type_name, Box::new(|value| T::from_json(value)))
360}
361
362pub fn ensure_custom_data_json_registered<T: CustomDataTrait + Sized>() -> anyhow::Result<()> {
368 let type_name = T::type_name_static();
369 ensure_json_deserializer_registered(type_name, Box::new(|value| T::from_json(value)))
370}
371
372#[cfg_attr(
378 feature = "python",
379 pyclass(
380 module = "nautilus_trader.core.nautilus_pyo3.model",
381 name = "CustomData",
382 from_py_object
383 )
384)]
385#[cfg_attr(
386 feature = "python",
387 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.model")
388)]
389#[derive(Clone, Debug)]
390pub struct CustomData {
391 pub data: Arc<dyn CustomDataTrait>,
393 pub data_type: DataType,
395}
396
397impl CustomData {
398 pub fn from_arc(arc: Arc<dyn CustomDataTrait>) -> Self {
401 let data_type = DataType::new(arc.type_name(), None, None);
402 Self {
403 data: arc,
404 data_type,
405 }
406 }
407
408 pub fn new(data: Arc<dyn CustomDataTrait>, data_type: DataType) -> Self {
413 Self { data, data_type }
414 }
415}
416
417impl PartialEq for CustomData {
418 fn eq(&self, other: &Self) -> bool {
419 self.data.eq_arc(other.data.as_ref()) && self.data_type == other.data_type
420 }
421}
422
423impl HasTsInit for CustomData {
424 fn ts_init(&self) -> UnixNanos {
425 self.data.ts_init()
426 }
427}
428
429pub(crate) fn parse_custom_data_from_json_bytes(
430 bytes: &[u8],
431) -> Result<CustomData, serde_json::Error> {
432 let data: Data = serde_json::from_slice(bytes)?;
433 match data {
434 Data::Custom(custom) => Ok(custom),
435 _ => Err(serde_json::Error::io(std::io::Error::new(
436 std::io::ErrorKind::InvalidData,
437 "JSON does not represent CustomData",
438 ))),
439 }
440}
441
442impl CustomData {
443 pub fn from_json_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
449 parse_custom_data_from_json_bytes(bytes)
450 }
451}
452
453struct CustomDataEnvelope {
456 type_name: String,
457 data_type: serde_json::Value,
458 payload: serde_json::Value,
459}
460
461impl Serialize for CustomDataEnvelope {
462 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
463 where
464 S: Serializer,
465 {
466 use serde::ser::SerializeStruct;
467 let mut state = serializer.serialize_struct("CustomDataEnvelope", 3)?;
468 state.serialize_field("type", &self.type_name)?;
469 state.serialize_field("data_type", &self.data_type)?;
470 state.serialize_field("payload", &self.payload)?;
471 state.end()
472 }
473}
474
475impl CustomData {
476 fn to_envelope_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
477 let json = self.data.to_json().map_err(|e| {
478 serde_json::Error::io(std::io::Error::new(std::io::ErrorKind::InvalidData, e))
479 })?;
480 let payload: serde_json::Value = serde_json::from_str(&json)?;
481 let metadata_value = self.data_type.metadata().map_or(
482 serde_json::Value::Object(serde_json::Map::new()),
483 |m| {
484 serde_json::to_value(m).unwrap_or(serde_json::Value::Object(serde_json::Map::new()))
485 },
486 );
487 let mut data_type_obj = serde_json::Map::new();
488 data_type_obj.insert(
489 "type_name".to_string(),
490 serde_json::Value::String(self.data_type.type_name().to_string()),
491 );
492 data_type_obj.insert("metadata".to_string(), metadata_value);
493
494 if let Some(id) = self.data_type.identifier() {
495 data_type_obj.insert(
496 "identifier".to_string(),
497 serde_json::Value::String(id.to_string()),
498 );
499 }
500
501 let envelope = CustomDataEnvelope {
502 type_name: self.data.type_name().to_string(),
503 data_type: serde_json::Value::Object(data_type_obj),
504 payload,
505 };
506 serde_json::to_value(envelope)
507 }
508}
509
510impl Serialize for CustomData {
511 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
512 where
513 S: Serializer,
514 {
515 let value = self
516 .to_envelope_json_value()
517 .map_err(serde::ser::Error::custom)?;
518 value.serialize(serializer)
519 }
520}
521
522#[cfg(test)]
523mod tests {
524 use nautilus_core::{Params, UnixNanos};
525 use rstest::rstest;
526 use serde::Deserialize;
527 use serde_json::json;
528
529 use super::*;
530 use crate::{data::HasTsInit, identifiers::InstrumentId};
531
532 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
533 struct TestCustomData {
534 ts_init: UnixNanos,
535 instrument_id: InstrumentId,
536 }
537
538 impl HasTsInit for TestCustomData {
539 fn ts_init(&self) -> UnixNanos {
540 self.ts_init
541 }
542 }
543
544 impl CustomDataTrait for TestCustomData {
545 fn type_name(&self) -> &'static str {
546 "TestCustomData"
547 }
548 fn as_any(&self) -> &dyn Any {
549 self
550 }
551 fn ts_event(&self) -> UnixNanos {
552 self.ts_init
553 }
554 fn to_json(&self) -> anyhow::Result<String> {
555 Ok(serde_json::to_string(self)?)
556 }
557 fn clone_arc(&self) -> Arc<dyn CustomDataTrait> {
558 Arc::new(self.clone())
559 }
560 fn eq_arc(&self, other: &dyn CustomDataTrait) -> bool {
561 if let Some(other) = other.as_any().downcast_ref::<Self>() {
562 self == other
563 } else {
564 false
565 }
566 }
567
568 fn type_name_static() -> &'static str {
569 "TestCustomData"
570 }
571
572 fn from_json(value: serde_json::Value) -> anyhow::Result<Arc<dyn CustomDataTrait>> {
573 let parsed: Self = serde_json::from_value(value)?;
574 Ok(Arc::new(parsed))
575 }
576 }
577
578 #[rstest]
579 fn test_custom_data_json_roundtrip() {
580 register_custom_data_json::<TestCustomData>()
581 .expect("TestCustomData must register for JSON roundtrip test");
582
583 let instrument_id = InstrumentId::from("TEST.SIM");
584 let metadata = Some(
585 serde_json::from_value::<Params>(json!({"key1": "value1", "key2": "value2"})).unwrap(),
586 );
587 let inner = TestCustomData {
588 ts_init: UnixNanos::from(100),
589 instrument_id,
590 };
591 let data_type = DataType::new("TestCustomData", metadata, Some(instrument_id.to_string()));
592 let original = CustomData::new(Arc::new(inner), data_type);
593
594 let json_bytes = serde_json::to_vec(&original).unwrap();
595 let roundtripped = CustomData::from_json_bytes(&json_bytes).unwrap();
596
597 assert_eq!(
598 roundtripped.data_type.type_name(),
599 original.data_type.type_name()
600 );
601 assert_eq!(
602 roundtripped.data_type.metadata(),
603 original.data_type.metadata()
604 );
605 assert_eq!(
606 roundtripped.data_type.identifier(),
607 original.data_type.identifier()
608 );
609 let orig_inner = original
610 .data
611 .as_any()
612 .downcast_ref::<TestCustomData>()
613 .unwrap();
614 let rt_inner = roundtripped
615 .data
616 .as_any()
617 .downcast_ref::<TestCustomData>()
618 .unwrap();
619 assert_eq!(orig_inner, rt_inner);
620 }
621
622 #[rstest]
623 fn test_custom_data_wrapper() {
624 let instrument_id = InstrumentId::from("TEST.SIM");
625 let data = TestCustomData {
626 ts_init: UnixNanos::from(100),
627 instrument_id,
628 };
629 let data_type = DataType::new("TestCustomData", None, Some(instrument_id.to_string()));
630 let custom_data = CustomData::new(Arc::new(data), data_type);
631
632 assert_eq!(custom_data.data.ts_init(), UnixNanos::from(100));
633 assert_eq!(Data::Custom(custom_data).instrument_id(), instrument_id);
634 }
635}