Skip to main content

nautilus_model/data/
custom.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16#[cfg(feature = "python")]
17use std::collections::HashSet;
18#[cfg(feature = "python")]
19use std::sync::RwLock;
20use std::{any::Any, fmt::Debug, sync::Arc};
21
22use nautilus_core::UnixNanos;
23#[cfg(feature = "python")]
24use pyo3::{IntoPyObjectExt, prelude::*, types::PyAny};
25use serde::{Serialize, Serializer};
26
27use crate::data::{
28    Data, DataType, HasTsInit,
29    registry::{ensure_json_deserializer_registered, register_json_deserializer},
30};
31
32#[cfg(feature = "python")]
33fn intern_type_name_static(name: String) -> &'static str {
34    static INTERNER: std::sync::OnceLock<RwLock<HashSet<&'static str>>> =
35        std::sync::OnceLock::new();
36    let set = INTERNER.get_or_init(|| RwLock::new(HashSet::new()));
37
38    if let Ok(guard) = set.read()
39        && guard.contains(name.as_str())
40    {
41        return guard.get(name.as_str()).copied().unwrap();
42    }
43
44    if let Ok(mut guard) = set.write() {
45        if let Some(&existing) = guard.get(name.as_str()) {
46            return existing;
47        }
48        let leaked: &'static str = Box::leak(name.into_boxed_str());
49        guard.insert(leaked);
50        leaked
51    } else {
52        log::warn!("intern_type_name_static: RwLock poisoned, interning skipped for type name");
53        Box::leak(name.into_boxed_str())
54    }
55}
56
57/// Wraps a Python custom data object so it can participate in the Rust data
58/// pipeline as an `Arc<dyn CustomDataTrait>`.
59///
60/// Holds a reference to the Python object and delegates trait methods via the
61/// Python GIL. `ts_event`, `ts_init`, and `type_name` are cached at construction
62/// to avoid GIL acquisition in the hot path (e.g., data sorting, message routing).
63#[cfg(feature = "python")]
64pub struct PythonCustomDataWrapper {
65    /// The Python object implementing the custom data interface.
66    py_object: Py<PyAny>,
67    /// Cached `ts_event` value (extracted once at construction).
68    cached_ts_event: UnixNanos,
69    /// Cached `ts_init` value (extracted once at construction).
70    cached_ts_init: UnixNanos,
71    /// Cached type name (extracted once at construction).
72    cached_type_name: String,
73    /// Leaked static string for `type_name()` return (required by trait signature).
74    cached_type_name_static: &'static str,
75}
76
77#[cfg(feature = "python")]
78impl PythonCustomDataWrapper {
79    /// Creates a new wrapper from a Python custom data object.
80    ///
81    /// Extracts and caches `ts_event`, `ts_init`, and the type name from the Python object.
82    ///
83    /// # Errors
84    /// Returns an error if required attributes cannot be extracted from the Python object.
85    pub fn new(_py: Python<'_>, py_object: &Bound<'_, PyAny>) -> PyResult<Self> {
86        // Extract ts_event
87        let ts_event: u64 = py_object.getattr("ts_event")?.extract()?;
88        let ts_event = UnixNanos::from(ts_event);
89
90        // Extract ts_init
91        let ts_init: u64 = py_object.getattr("ts_init")?.extract()?;
92        let ts_init = UnixNanos::from(ts_init);
93
94        // Get type name from class
95        let data_class = py_object.get_type();
96        let type_name: String = if data_class.hasattr("type_name_static")? {
97            data_class.call_method0("type_name_static")?.extract()?
98        } else {
99            data_class.getattr("__name__")?.extract()?
100        };
101
102        // Intern so we only store one static copy per distinct type name
103        let type_name_static: &'static str = intern_type_name_static(type_name.clone());
104
105        Ok(Self {
106            py_object: py_object.clone().unbind(),
107            cached_ts_event: ts_event,
108            cached_ts_init: ts_init,
109            cached_type_name: type_name,
110            cached_type_name_static: type_name_static,
111        })
112    }
113
114    /// Returns a reference to the underlying Python object.
115    #[must_use]
116    pub fn py_object(&self) -> &Py<PyAny> {
117        &self.py_object
118    }
119
120    /// Returns the cached type name.
121    #[must_use]
122    pub fn get_type_name(&self) -> &str {
123        &self.cached_type_name
124    }
125}
126
127#[cfg(feature = "python")]
128impl Clone for PythonCustomDataWrapper {
129    fn clone(&self) -> Self {
130        Python::attach(|py| Self {
131            py_object: self.py_object.clone_ref(py),
132            cached_ts_event: self.cached_ts_event,
133            cached_ts_init: self.cached_ts_init,
134            cached_type_name: self.cached_type_name.clone(),
135            cached_type_name_static: self.cached_type_name_static,
136        })
137    }
138}
139
140#[cfg(feature = "python")]
141impl Debug for PythonCustomDataWrapper {
142    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143        f.debug_struct(stringify!(PythonCustomDataWrapper))
144            .field("py_object", &self.py_object)
145            .field("type_name", &self.cached_type_name)
146            .field("type_name_static", &self.cached_type_name_static)
147            .field("ts_event", &self.cached_ts_event)
148            .field("ts_init", &self.cached_ts_init)
149            .finish()
150    }
151}
152
153#[cfg(feature = "python")]
154impl HasTsInit for PythonCustomDataWrapper {
155    fn ts_init(&self) -> UnixNanos {
156        self.cached_ts_init
157    }
158}
159
160#[cfg(feature = "python")]
161impl CustomDataTrait for PythonCustomDataWrapper {
162    fn type_name(&self) -> &'static str {
163        self.cached_type_name_static
164    }
165
166    fn as_any(&self) -> &dyn Any {
167        self
168    }
169
170    fn ts_event(&self) -> UnixNanos {
171        self.cached_ts_event
172    }
173
174    fn to_json(&self) -> anyhow::Result<String> {
175        Python::attach(|py| {
176            let obj = self.py_object.bind(py);
177            // Call to_json() on the Python object if available
178            if obj.hasattr("to_json")? {
179                let json_str: String = obj.call_method0("to_json")?.extract()?;
180                Ok(json_str)
181            } else {
182                // Fallback: use Python's json module
183                let json_module = py.import("json")?;
184                // Try to get a dict representation
185                let dict = if obj.hasattr("__dict__")? {
186                    obj.getattr("__dict__")?
187                } else {
188                    anyhow::bail!("Python object has no to_json() method or __dict__ attribute");
189                };
190                let json_str: String = json_module.call_method1("dumps", (dict,))?.extract()?;
191                Ok(json_str)
192            }
193        })
194    }
195
196    fn clone_arc(&self) -> Arc<dyn CustomDataTrait> {
197        Arc::new(self.clone())
198    }
199
200    fn eq_arc(&self, other: &dyn CustomDataTrait) -> bool {
201        // Equality by Python object identity only, to avoid false equality when two
202        // distinct Python objects share the same type name and timestamps.
203        if let Some(other_wrapper) = other.as_any().downcast_ref::<Self>() {
204            Python::attach(|py| {
205                let a = self.py_object.bind(py);
206                let b = other_wrapper.py_object.bind(py);
207                if a.is(b) {
208                    return true;
209                }
210                a.eq(b).unwrap_or(false)
211            })
212        } else {
213            false
214        }
215    }
216
217    fn to_pyobject(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
218        // Return the underlying Python object directly
219        Ok(self.py_object.clone_ref(py))
220    }
221}
222
223#[cfg(feature = "python")]
224fn python_data_classes() -> &'static dashmap::DashMap<String, Py<PyAny>> {
225    static PYTHON_DATA_CLASSES: std::sync::OnceLock<dashmap::DashMap<String, Py<PyAny>>> =
226        std::sync::OnceLock::new();
227    PYTHON_DATA_CLASSES.get_or_init(dashmap::DashMap::new)
228}
229
230#[cfg(feature = "python")]
231pub fn register_python_data_class(type_name: &str, data_class: &Bound<'_, PyAny>) {
232    python_data_classes().insert(type_name.to_string(), data_class.clone().unbind());
233}
234
235#[cfg(feature = "python")]
236#[must_use]
237pub fn get_python_data_class(py: Python<'_>, type_name: &str) -> Option<Py<PyAny>> {
238    python_data_classes()
239        .get(type_name)
240        .map(|entry| entry.value().clone_ref(py))
241}
242
243/// Reconstructs a Python custom data instance from type name and JSON.
244///
245/// # Errors
246///
247/// Returns a Python error if no class is registered for `type_name` or JSON parsing fails.
248#[cfg(feature = "python")]
249pub fn reconstruct_python_custom_data(
250    py: Python<'_>,
251    type_name: &str,
252    json: &str,
253) -> PyResult<Py<PyAny>> {
254    let data_class = get_python_data_class(py, type_name).ok_or_else(|| {
255        nautilus_core::python::to_pyruntime_err(format!(
256            "No registered Python class for custom data type `{type_name}`"
257        ))
258    })?;
259    let json_module = py.import("json")?;
260    let payload = json_module.call_method1("loads", (json,))?;
261    data_class
262        .bind(py)
263        .call_method1("from_json", (payload,))
264        .map(|obj| obj.unbind())
265}
266
267/// Converts a cloneable PyO3-backed custom data value into a Python object.
268///
269/// This is intended for `#[pyclass]` custom data types, where PyO3 already
270/// provides `IntoPyObject` for owned values.
271///
272/// # Errors
273///
274/// Returns any conversion error reported by PyO3.
275#[cfg(feature = "python")]
276pub fn clone_pyclass_to_pyobject<T>(value: &T, py: Python<'_>) -> PyResult<Py<PyAny>>
277where
278    T: Clone,
279    for<'py> T: pyo3::IntoPyObject<'py, Error = pyo3::PyErr>,
280{
281    value.clone().into_py_any(py)
282}
283
284/// Trait for typed custom data that can be used within the Nautilus domain model.
285pub trait CustomDataTrait: HasTsInit + Send + Sync + Debug {
286    /// Returns the type name for the custom data.
287    fn type_name(&self) -> &'static str;
288
289    /// Returns the data as a `dyn Any` for downcasting.
290    fn as_any(&self) -> &dyn Any;
291
292    /// Returns the event timestamp (when the data occurred).
293    fn ts_event(&self) -> UnixNanos;
294
295    /// Serializes the custom data to a JSON string.
296    ///
297    /// # Errors
298    /// Returns an error if JSON serialization fails.
299    fn to_json(&self) -> anyhow::Result<String>;
300
301    /// Python-facing JSON serialization. Default implementation forwards to `to_json`.
302    /// Override if a different behavior is needed for the Python API.
303    ///
304    /// # Errors
305    /// Returns an error if JSON serialization fails.
306    fn to_json_py(&self) -> anyhow::Result<String> {
307        self.to_json()
308    }
309
310    /// Returns a cloned Arc of the custom data.
311    fn clone_arc(&self) -> Arc<dyn CustomDataTrait>;
312
313    /// Returns whether the custom data is equal to another.
314    fn eq_arc(&self, other: &dyn CustomDataTrait) -> bool;
315
316    /// Converts the custom data to a Python object.
317    ///
318    /// # Errors
319    /// Returns an error if PyO3 conversion fails.
320    #[cfg(feature = "python")]
321    fn to_pyobject(&self, _py: Python<'_>) -> PyResult<Py<PyAny>> {
322        Err(nautilus_core::python::to_pytype_err(format!(
323            "to_pyobject not implemented for {}",
324            self.type_name()
325        )))
326    }
327
328    /// Returns the type name used in serialized form (e.g. in the `"type"` field).
329    #[must_use]
330    fn type_name_static() -> &'static str
331    where
332        Self: Sized,
333    {
334        std::any::type_name::<Self>()
335    }
336
337    /// Deserializes from a JSON value into an Arc'd trait object.
338    ///
339    /// # Errors
340    /// Returns an error if JSON deserialization fails.
341    fn from_json(_value: serde_json::Value) -> anyhow::Result<Arc<dyn CustomDataTrait>>
342    where
343        Self: Sized,
344    {
345        anyhow::bail!(
346            "from_json not implemented for {}",
347            std::any::type_name::<Self>()
348        )
349    }
350}
351
352/// Registers a custom data type for JSON deserialization. When `Data::deserialize`
353/// sees the type name returned by `T::type_name_static()`, it will call `T::from_json`.
354///
355/// # Errors
356/// Returns an error if the type is already registered.
357pub fn register_custom_data_json<T: CustomDataTrait + Sized>() -> anyhow::Result<()> {
358    let type_name = T::type_name_static();
359    register_json_deserializer(type_name, Box::new(|value| T::from_json(value)))
360}
361
362/// Registers a custom data type for JSON deserialization if not already registered.
363/// Idempotent: safe to call multiple times for the same type (e.g. module init).
364///
365/// # Errors
366/// Does not return an error (idempotent insert into `DashMap`).
367pub fn ensure_custom_data_json_registered<T: CustomDataTrait + Sized>() -> anyhow::Result<()> {
368    let type_name = T::type_name_static();
369    ensure_json_deserializer_registered(type_name, Box::new(|value| T::from_json(value)))
370}
371
372/// A wrapper for custom data including its data type.
373///
374/// The `data` field holds an [`Arc`] to a [`CustomDataTrait`] implementation,
375/// enabling cheap cloning when passing to Python (Arc clone is O(1)).
376/// Custom data is always Rust-defined (optionally with PyO3 bindings).
377#[cfg_attr(
378    feature = "python",
379    pyclass(
380        module = "nautilus_trader.core.nautilus_pyo3.model",
381        name = "CustomData",
382        from_py_object
383    )
384)]
385#[cfg_attr(
386    feature = "python",
387    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.model")
388)]
389#[derive(Clone, Debug)]
390pub struct CustomData {
391    /// The actual data object implementing [`CustomDataTrait`].
392    pub data: Arc<dyn CustomDataTrait>,
393    /// The data type metadata.
394    pub data_type: DataType,
395}
396
397impl CustomData {
398    /// Creates a new [`CustomData`] instance from an [`Arc`]'d [`CustomDataTrait`],
399    /// deriving the data type from the inner type name.
400    pub fn from_arc(arc: Arc<dyn CustomDataTrait>) -> Self {
401        let data_type = DataType::new(arc.type_name(), None, None);
402        Self {
403            data: arc,
404            data_type,
405        }
406    }
407
408    /// Creates a new [`CustomData`] instance with explicit data type metadata.
409    ///
410    /// Use this when the data type must come from external metadata (e.g. Parquet),
411    /// rather than being derived from the inner type name.
412    pub fn new(data: Arc<dyn CustomDataTrait>, data_type: DataType) -> Self {
413        Self { data, data_type }
414    }
415}
416
417impl PartialEq for CustomData {
418    fn eq(&self, other: &Self) -> bool {
419        self.data.eq_arc(other.data.as_ref()) && self.data_type == other.data_type
420    }
421}
422
423impl HasTsInit for CustomData {
424    fn ts_init(&self) -> UnixNanos {
425        self.data.ts_init()
426    }
427}
428
429pub(crate) fn parse_custom_data_from_json_bytes(
430    bytes: &[u8],
431) -> Result<CustomData, serde_json::Error> {
432    let data: Data = serde_json::from_slice(bytes)?;
433    match data {
434        Data::Custom(custom) => Ok(custom),
435        _ => Err(serde_json::Error::io(std::io::Error::new(
436            std::io::ErrorKind::InvalidData,
437            "JSON does not represent CustomData",
438        ))),
439    }
440}
441
442impl CustomData {
443    /// Deserializes `CustomData` from JSON bytes (full `CustomData` format with type and `data_type`).
444    ///
445    /// # Errors
446    ///
447    /// Returns an error if the bytes are not valid JSON or do not represent `CustomData`.
448    pub fn from_json_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
449        parse_custom_data_from_json_bytes(bytes)
450    }
451}
452
453/// Canonical JSON envelope for `CustomData`. All serialized `CustomData` uses this shape so
454/// deserialization can extract the payload without depending on user payload field names.
455struct CustomDataEnvelope {
456    type_name: String,
457    data_type: serde_json::Value,
458    payload: serde_json::Value,
459}
460
461impl Serialize for CustomDataEnvelope {
462    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
463    where
464        S: Serializer,
465    {
466        use serde::ser::SerializeStruct;
467        let mut state = serializer.serialize_struct("CustomDataEnvelope", 3)?;
468        state.serialize_field("type", &self.type_name)?;
469        state.serialize_field("data_type", &self.data_type)?;
470        state.serialize_field("payload", &self.payload)?;
471        state.end()
472    }
473}
474
475impl CustomData {
476    fn to_envelope_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
477        let json = self.data.to_json().map_err(|e| {
478            serde_json::Error::io(std::io::Error::new(std::io::ErrorKind::InvalidData, e))
479        })?;
480        let payload: serde_json::Value = serde_json::from_str(&json)?;
481        let metadata_value = self.data_type.metadata().map_or(
482            serde_json::Value::Object(serde_json::Map::new()),
483            |m| {
484                serde_json::to_value(m).unwrap_or(serde_json::Value::Object(serde_json::Map::new()))
485            },
486        );
487        let mut data_type_obj = serde_json::Map::new();
488        data_type_obj.insert(
489            "type_name".to_string(),
490            serde_json::Value::String(self.data_type.type_name().to_string()),
491        );
492        data_type_obj.insert("metadata".to_string(), metadata_value);
493
494        if let Some(id) = self.data_type.identifier() {
495            data_type_obj.insert(
496                "identifier".to_string(),
497                serde_json::Value::String(id.to_string()),
498            );
499        }
500
501        let envelope = CustomDataEnvelope {
502            type_name: self.data.type_name().to_string(),
503            data_type: serde_json::Value::Object(data_type_obj),
504            payload,
505        };
506        serde_json::to_value(envelope)
507    }
508}
509
510impl Serialize for CustomData {
511    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
512    where
513        S: Serializer,
514    {
515        let value = self
516            .to_envelope_json_value()
517            .map_err(serde::ser::Error::custom)?;
518        value.serialize(serializer)
519    }
520}
521
522#[cfg(test)]
523mod tests {
524    use nautilus_core::{Params, UnixNanos};
525    use rstest::rstest;
526    use serde::Deserialize;
527    use serde_json::json;
528
529    use super::*;
530    use crate::{data::HasTsInit, identifiers::InstrumentId};
531
532    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
533    struct TestCustomData {
534        ts_init: UnixNanos,
535        instrument_id: InstrumentId,
536    }
537
538    impl HasTsInit for TestCustomData {
539        fn ts_init(&self) -> UnixNanos {
540            self.ts_init
541        }
542    }
543
544    impl CustomDataTrait for TestCustomData {
545        fn type_name(&self) -> &'static str {
546            "TestCustomData"
547        }
548        fn as_any(&self) -> &dyn Any {
549            self
550        }
551        fn ts_event(&self) -> UnixNanos {
552            self.ts_init
553        }
554        fn to_json(&self) -> anyhow::Result<String> {
555            Ok(serde_json::to_string(self)?)
556        }
557        fn clone_arc(&self) -> Arc<dyn CustomDataTrait> {
558            Arc::new(self.clone())
559        }
560        fn eq_arc(&self, other: &dyn CustomDataTrait) -> bool {
561            if let Some(other) = other.as_any().downcast_ref::<Self>() {
562                self == other
563            } else {
564                false
565            }
566        }
567
568        fn type_name_static() -> &'static str {
569            "TestCustomData"
570        }
571
572        fn from_json(value: serde_json::Value) -> anyhow::Result<Arc<dyn CustomDataTrait>> {
573            let parsed: Self = serde_json::from_value(value)?;
574            Ok(Arc::new(parsed))
575        }
576    }
577
578    #[rstest]
579    fn test_custom_data_json_roundtrip() {
580        register_custom_data_json::<TestCustomData>()
581            .expect("TestCustomData must register for JSON roundtrip test");
582
583        let instrument_id = InstrumentId::from("TEST.SIM");
584        let metadata = Some(
585            serde_json::from_value::<Params>(json!({"key1": "value1", "key2": "value2"})).unwrap(),
586        );
587        let inner = TestCustomData {
588            ts_init: UnixNanos::from(100),
589            instrument_id,
590        };
591        let data_type = DataType::new("TestCustomData", metadata, Some(instrument_id.to_string()));
592        let original = CustomData::new(Arc::new(inner), data_type);
593
594        let json_bytes = serde_json::to_vec(&original).unwrap();
595        let roundtripped = CustomData::from_json_bytes(&json_bytes).unwrap();
596
597        assert_eq!(
598            roundtripped.data_type.type_name(),
599            original.data_type.type_name()
600        );
601        assert_eq!(
602            roundtripped.data_type.metadata(),
603            original.data_type.metadata()
604        );
605        assert_eq!(
606            roundtripped.data_type.identifier(),
607            original.data_type.identifier()
608        );
609        let orig_inner = original
610            .data
611            .as_any()
612            .downcast_ref::<TestCustomData>()
613            .unwrap();
614        let rt_inner = roundtripped
615            .data
616            .as_any()
617            .downcast_ref::<TestCustomData>()
618            .unwrap();
619        assert_eq!(orig_inner, rt_inner);
620    }
621
622    #[rstest]
623    fn test_custom_data_wrapper() {
624        let instrument_id = InstrumentId::from("TEST.SIM");
625        let data = TestCustomData {
626            ts_init: UnixNanos::from(100),
627            instrument_id,
628        };
629        let data_type = DataType::new("TestCustomData", None, Some(instrument_id.to_string()));
630        let custom_data = CustomData::new(Arc::new(data), data_type);
631
632        assert_eq!(custom_data.data.ts_init(), UnixNanos::from(100));
633        assert_eq!(Data::Custom(custom_data).instrument_id(), instrument_id);
634    }
635}