Skip to main content

nautilus_model/data/
custom.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16#[cfg(feature = "python")]
17use std::collections::HashSet;
18#[cfg(feature = "python")]
19use std::sync::RwLock;
20use std::{any::Any, fmt::Debug, sync::Arc};
21
22use nautilus_core::UnixNanos;
23#[cfg(feature = "python")]
24use pyo3::{IntoPyObjectExt, prelude::*, types::PyAny};
25use serde::{Serialize, Serializer};
26
27use crate::data::{
28    Data, DataType, HasTsInit,
29    registry::{ensure_json_deserializer_registered, register_json_deserializer},
30};
31
32#[cfg(feature = "python")]
33fn intern_type_name_static(name: String) -> &'static str {
34    static INTERNER: std::sync::OnceLock<RwLock<HashSet<&'static str>>> =
35        std::sync::OnceLock::new();
36    let set = INTERNER.get_or_init(|| RwLock::new(HashSet::new()));
37
38    if let Ok(guard) = set.read()
39        && guard.contains(name.as_str())
40    {
41        return guard.get(name.as_str()).copied().unwrap();
42    }
43
44    match set.write() {
45        Ok(mut guard) => {
46            if let Some(&existing) = guard.get(name.as_str()) {
47                return existing;
48            }
49            let leaked: &'static str = Box::leak(name.into_boxed_str());
50            guard.insert(leaked);
51            leaked
52        }
53        Err(_) => {
54            log::warn!("intern_type_name_static: RwLock poisoned, interning skipped for type name");
55            Box::leak(name.into_boxed_str())
56        }
57    }
58}
59
60/// Wraps a Python custom data object so it can participate in the Rust data
61/// pipeline as an `Arc<dyn CustomDataTrait>`.
62///
63/// Holds a reference to the Python object and delegates trait methods via the
64/// Python GIL. `ts_event`, `ts_init`, and `type_name` are cached at construction
65/// to avoid GIL acquisition in the hot path (e.g., data sorting, message routing).
66#[cfg(feature = "python")]
67pub struct PythonCustomDataWrapper {
68    /// The Python object implementing the custom data interface.
69    py_object: Py<PyAny>,
70    /// Cached ts_event value (extracted once at construction).
71    cached_ts_event: UnixNanos,
72    /// Cached ts_init value (extracted once at construction).
73    cached_ts_init: UnixNanos,
74    /// Cached type name (extracted once at construction).
75    cached_type_name: String,
76    /// Leaked static string for type_name() return (required by trait signature).
77    cached_type_name_static: &'static str,
78}
79
80#[cfg(feature = "python")]
81impl PythonCustomDataWrapper {
82    /// Creates a new wrapper from a Python custom data object.
83    ///
84    /// Extracts and caches `ts_event`, `ts_init`, and the type name from the Python object.
85    ///
86    /// # Errors
87    /// Returns an error if required attributes cannot be extracted from the Python object.
88    pub fn new(_py: Python<'_>, py_object: &Bound<'_, PyAny>) -> PyResult<Self> {
89        // Extract ts_event
90        let ts_event: u64 = py_object.getattr("ts_event")?.extract()?;
91        let ts_event = UnixNanos::from(ts_event);
92
93        // Extract ts_init
94        let ts_init: u64 = py_object.getattr("ts_init")?.extract()?;
95        let ts_init = UnixNanos::from(ts_init);
96
97        // Get type name from class
98        let data_class = py_object.get_type();
99        let type_name: String = if data_class.hasattr("type_name_static")? {
100            data_class.call_method0("type_name_static")?.extract()?
101        } else {
102            data_class.getattr("__name__")?.extract()?
103        };
104
105        // Intern so we only store one static copy per distinct type name
106        let type_name_static: &'static str = intern_type_name_static(type_name.clone());
107
108        Ok(Self {
109            py_object: py_object.clone().unbind(),
110            cached_ts_event: ts_event,
111            cached_ts_init: ts_init,
112            cached_type_name: type_name,
113            cached_type_name_static: type_name_static,
114        })
115    }
116
117    /// Returns a reference to the underlying Python object.
118    #[must_use]
119    pub fn py_object(&self) -> &Py<PyAny> {
120        &self.py_object
121    }
122
123    /// Returns the cached type name.
124    #[must_use]
125    pub fn get_type_name(&self) -> &str {
126        &self.cached_type_name
127    }
128}
129
130#[cfg(feature = "python")]
131impl Clone for PythonCustomDataWrapper {
132    fn clone(&self) -> Self {
133        Python::attach(|py| Self {
134            py_object: self.py_object.clone_ref(py),
135            cached_ts_event: self.cached_ts_event,
136            cached_ts_init: self.cached_ts_init,
137            cached_type_name: self.cached_type_name.clone(),
138            cached_type_name_static: self.cached_type_name_static,
139        })
140    }
141}
142
143#[cfg(feature = "python")]
144impl Debug for PythonCustomDataWrapper {
145    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146        f.debug_struct(stringify!(PythonCustomDataWrapper))
147            .field("type_name", &self.cached_type_name)
148            .field("ts_event", &self.cached_ts_event)
149            .field("ts_init", &self.cached_ts_init)
150            .finish()
151    }
152}
153
154#[cfg(feature = "python")]
155impl HasTsInit for PythonCustomDataWrapper {
156    fn ts_init(&self) -> UnixNanos {
157        self.cached_ts_init
158    }
159}
160
161#[cfg(feature = "python")]
162impl CustomDataTrait for PythonCustomDataWrapper {
163    fn type_name(&self) -> &'static str {
164        self.cached_type_name_static
165    }
166
167    fn as_any(&self) -> &dyn Any {
168        self
169    }
170
171    fn ts_event(&self) -> UnixNanos {
172        self.cached_ts_event
173    }
174
175    fn to_json(&self) -> anyhow::Result<String> {
176        Python::attach(|py| {
177            let obj = self.py_object.bind(py);
178            // Call to_json() on the Python object if available
179            if obj.hasattr("to_json")? {
180                let json_str: String = obj.call_method0("to_json")?.extract()?;
181                Ok(json_str)
182            } else {
183                // Fallback: use Python's json module
184                let json_module = py.import("json")?;
185                // Try to get a dict representation
186                let dict = if obj.hasattr("__dict__")? {
187                    obj.getattr("__dict__")?
188                } else {
189                    anyhow::bail!("Python object has no to_json() method or __dict__ attribute");
190                };
191                let json_str: String = json_module.call_method1("dumps", (dict,))?.extract()?;
192                Ok(json_str)
193            }
194        })
195    }
196
197    fn clone_arc(&self) -> Arc<dyn CustomDataTrait> {
198        Arc::new(self.clone())
199    }
200
201    fn eq_arc(&self, other: &dyn CustomDataTrait) -> bool {
202        // Equality by Python object identity only, to avoid false equality when two
203        // distinct Python objects share the same type name and timestamps.
204        if let Some(other_wrapper) = other.as_any().downcast_ref::<Self>() {
205            Python::attach(|py| {
206                let a = self.py_object.bind(py);
207                let b = other_wrapper.py_object.bind(py);
208                if a.is(b) {
209                    return true;
210                }
211                a.eq(b).unwrap_or(false)
212            })
213        } else {
214            false
215        }
216    }
217
218    fn to_pyobject(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
219        // Return the underlying Python object directly
220        Ok(self.py_object.clone_ref(py))
221    }
222}
223
224#[cfg(feature = "python")]
225fn python_data_classes() -> &'static dashmap::DashMap<String, Py<PyAny>> {
226    static PYTHON_DATA_CLASSES: std::sync::OnceLock<dashmap::DashMap<String, Py<PyAny>>> =
227        std::sync::OnceLock::new();
228    PYTHON_DATA_CLASSES.get_or_init(dashmap::DashMap::new)
229}
230
231#[cfg(feature = "python")]
232pub fn register_python_data_class(type_name: &str, data_class: &Bound<'_, PyAny>) {
233    python_data_classes().insert(type_name.to_string(), data_class.clone().unbind());
234}
235
236#[cfg(feature = "python")]
237pub fn get_python_data_class(py: Python<'_>, type_name: &str) -> Option<Py<PyAny>> {
238    python_data_classes()
239        .get(type_name)
240        .map(|entry| entry.value().clone_ref(py))
241}
242
243/// Reconstructs a Python custom data instance from type name and JSON.
244///
245/// # Errors
246///
247/// Returns a Python error if no class is registered for `type_name` or JSON parsing fails.
248#[cfg(feature = "python")]
249pub fn reconstruct_python_custom_data(
250    py: Python<'_>,
251    type_name: &str,
252    json: &str,
253) -> PyResult<Py<PyAny>> {
254    let data_class = get_python_data_class(py, type_name).ok_or_else(|| {
255        nautilus_core::python::to_pyruntime_err(format!(
256            "No registered Python class for custom data type `{type_name}`"
257        ))
258    })?;
259    let json_module = py.import("json")?;
260    let payload = json_module.call_method1("loads", (json,))?;
261    data_class
262        .bind(py)
263        .call_method1("from_json", (payload,))
264        .map(|obj| obj.unbind())
265}
266
267/// Converts a cloneable PyO3-backed custom data value into a Python object.
268///
269/// This is intended for `#[pyclass]` custom data types, where PyO3 already
270/// provides `IntoPyObject` for owned values.
271///
272/// # Errors
273///
274/// Returns any conversion error reported by PyO3.
275#[cfg(feature = "python")]
276pub fn clone_pyclass_to_pyobject<T>(value: &T, py: Python<'_>) -> PyResult<Py<PyAny>>
277where
278    T: Clone,
279    for<'py> T: pyo3::IntoPyObject<'py, Error = pyo3::PyErr>,
280{
281    value.clone().into_py_any(py)
282}
283
284/// Trait for typed custom data that can be used within the Nautilus domain model.
285pub trait CustomDataTrait: HasTsInit + Send + Sync + Debug {
286    /// Returns the type name for the custom data.
287    fn type_name(&self) -> &'static str;
288
289    /// Returns the data as a `dyn Any` for downcasting.
290    fn as_any(&self) -> &dyn Any;
291
292    /// Returns the event timestamp (when the data occurred).
293    fn ts_event(&self) -> UnixNanos;
294
295    /// Serializes the custom data to a JSON string.
296    ///
297    /// # Errors
298    /// Returns an error if JSON serialization fails.
299    fn to_json(&self) -> anyhow::Result<String>;
300
301    /// Python-facing JSON serialization. Default implementation forwards to `to_json`.
302    /// Override if a different behavior is needed for the Python API.
303    ///
304    /// # Errors
305    /// Returns an error if JSON serialization fails.
306    fn to_json_py(&self) -> anyhow::Result<String> {
307        self.to_json()
308    }
309
310    /// Returns a cloned Arc of the custom data.
311    fn clone_arc(&self) -> Arc<dyn CustomDataTrait>;
312
313    /// Returns whether the custom data is equal to another.
314    fn eq_arc(&self, other: &dyn CustomDataTrait) -> bool;
315
316    /// Converts the custom data to a Python object.
317    ///
318    /// # Errors
319    /// Returns an error if PyO3 conversion fails.
320    #[cfg(feature = "python")]
321    fn to_pyobject(&self, _py: Python<'_>) -> PyResult<Py<PyAny>> {
322        Err(nautilus_core::python::to_pytype_err(format!(
323            "to_pyobject not implemented for {}",
324            self.type_name()
325        )))
326    }
327
328    /// Returns the type name used in serialized form (e.g. in the `"type"` field).
329    fn type_name_static() -> &'static str
330    where
331        Self: Sized,
332    {
333        std::any::type_name::<Self>()
334    }
335
336    /// Deserializes from a JSON value into an Arc'd trait object.
337    ///
338    /// # Errors
339    /// Returns an error if JSON deserialization fails.
340    fn from_json(_value: serde_json::Value) -> anyhow::Result<Arc<dyn CustomDataTrait>>
341    where
342        Self: Sized,
343    {
344        anyhow::bail!(
345            "from_json not implemented for {}",
346            std::any::type_name::<Self>()
347        )
348    }
349}
350
351/// Registers a custom data type for JSON deserialization. When `Data::deserialize`
352/// sees the type name returned by `T::type_name_static()`, it will call `T::from_json`.
353///
354/// # Errors
355/// Returns an error if the type is already registered.
356pub fn register_custom_data_json<T: CustomDataTrait + Sized>() -> anyhow::Result<()> {
357    let type_name = T::type_name_static();
358    register_json_deserializer(type_name, Box::new(|value| T::from_json(value)))
359}
360
361/// Registers a custom data type for JSON deserialization if not already registered.
362/// Idempotent: safe to call multiple times for the same type (e.g. module init).
363///
364/// # Errors
365/// Does not return an error (idempotent insert into DashMap).
366pub fn ensure_custom_data_json_registered<T: CustomDataTrait + Sized>() -> anyhow::Result<()> {
367    let type_name = T::type_name_static();
368    ensure_json_deserializer_registered(type_name, Box::new(|value| T::from_json(value)))
369}
370
371/// A wrapper for custom data including its data type.
372///
373/// The `data` field holds an [`Arc`] to a [`CustomDataTrait`] implementation,
374/// enabling cheap cloning when passing to Python (Arc clone is O(1)).
375/// Custom data is always Rust-defined (optionally with PyO3 bindings).
376#[cfg_attr(
377    feature = "python",
378    pyclass(
379        module = "nautilus_trader.core.nautilus_pyo3.model",
380        name = "CustomData",
381        from_py_object
382    )
383)]
384#[cfg_attr(
385    feature = "python",
386    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.model")
387)]
388#[derive(Clone, Debug)]
389pub struct CustomData {
390    /// The actual data object implementing [`CustomDataTrait`].
391    pub data: Arc<dyn CustomDataTrait>,
392    /// The data type metadata.
393    pub data_type: DataType,
394}
395
396impl CustomData {
397    /// Creates a new [`CustomData`] instance from an [`Arc`]'d [`CustomDataTrait`],
398    /// deriving the data type from the inner type name.
399    pub fn from_arc(arc: Arc<dyn CustomDataTrait>) -> Self {
400        let data_type = DataType::new(arc.type_name(), None, None);
401        Self {
402            data: arc,
403            data_type,
404        }
405    }
406
407    /// Creates a new [`CustomData`] instance with explicit data type metadata.
408    ///
409    /// Use this when the data type must come from external metadata (e.g. Parquet),
410    /// rather than being derived from the inner type name.
411    pub fn new(data: Arc<dyn CustomDataTrait>, data_type: DataType) -> Self {
412        Self { data, data_type }
413    }
414}
415
416impl PartialEq for CustomData {
417    fn eq(&self, other: &Self) -> bool {
418        self.data.eq_arc(other.data.as_ref()) && self.data_type == other.data_type
419    }
420}
421
422impl HasTsInit for CustomData {
423    fn ts_init(&self) -> UnixNanos {
424        self.data.ts_init()
425    }
426}
427
428pub(crate) fn parse_custom_data_from_json_bytes(
429    bytes: &[u8],
430) -> Result<CustomData, serde_json::Error> {
431    let data: Data = serde_json::from_slice(bytes)?;
432    match data {
433        Data::Custom(custom) => Ok(custom),
434        _ => Err(serde_json::Error::io(std::io::Error::new(
435            std::io::ErrorKind::InvalidData,
436            "JSON does not represent CustomData",
437        ))),
438    }
439}
440
441impl CustomData {
442    /// Deserializes CustomData from JSON bytes (full CustomData format with type and data_type).
443    ///
444    /// # Errors
445    ///
446    /// Returns an error if the bytes are not valid JSON or do not represent CustomData.
447    pub fn from_json_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
448        parse_custom_data_from_json_bytes(bytes)
449    }
450}
451
452/// Canonical JSON envelope for CustomData. All serialized CustomData uses this shape so
453/// deserialization can extract the payload without depending on user payload field names.
454struct CustomDataEnvelope {
455    type_name: String,
456    data_type: serde_json::Value,
457    payload: serde_json::Value,
458}
459
460impl Serialize for CustomDataEnvelope {
461    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
462    where
463        S: Serializer,
464    {
465        use serde::ser::SerializeStruct;
466        let mut state = serializer.serialize_struct("CustomDataEnvelope", 3)?;
467        state.serialize_field("type", &self.type_name)?;
468        state.serialize_field("data_type", &self.data_type)?;
469        state.serialize_field("payload", &self.payload)?;
470        state.end()
471    }
472}
473
474impl CustomData {
475    fn to_envelope_json_value(&self) -> Result<serde_json::Value, serde_json::Error> {
476        let json = self.data.to_json().map_err(|e| {
477            serde_json::Error::io(std::io::Error::new(std::io::ErrorKind::InvalidData, e))
478        })?;
479        let payload: serde_json::Value = serde_json::from_str(&json)?;
480        let metadata_value = self.data_type.metadata().map_or(
481            serde_json::Value::Object(serde_json::Map::new()),
482            |m| {
483                serde_json::to_value(m).unwrap_or(serde_json::Value::Object(serde_json::Map::new()))
484            },
485        );
486        let mut data_type_obj = serde_json::Map::new();
487        data_type_obj.insert(
488            "type_name".to_string(),
489            serde_json::Value::String(self.data_type.type_name().to_string()),
490        );
491        data_type_obj.insert("metadata".to_string(), metadata_value);
492
493        if let Some(id) = self.data_type.identifier() {
494            data_type_obj.insert(
495                "identifier".to_string(),
496                serde_json::Value::String(id.to_string()),
497            );
498        }
499
500        let envelope = CustomDataEnvelope {
501            type_name: self.data.type_name().to_string(),
502            data_type: serde_json::Value::Object(data_type_obj),
503            payload,
504        };
505        serde_json::to_value(envelope)
506    }
507}
508
509impl Serialize for CustomData {
510    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
511    where
512        S: Serializer,
513    {
514        let value = self
515            .to_envelope_json_value()
516            .map_err(serde::ser::Error::custom)?;
517        value.serialize(serializer)
518    }
519}
520
521#[cfg(test)]
522mod tests {
523    use nautilus_core::{Params, UnixNanos};
524    use rstest::rstest;
525    use serde::Deserialize;
526    use serde_json::json;
527
528    use super::*;
529    use crate::{data::HasTsInit, identifiers::InstrumentId};
530
531    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
532    struct TestCustomData {
533        ts_init: UnixNanos,
534        instrument_id: InstrumentId,
535    }
536
537    impl HasTsInit for TestCustomData {
538        fn ts_init(&self) -> UnixNanos {
539            self.ts_init
540        }
541    }
542
543    impl CustomDataTrait for TestCustomData {
544        fn type_name(&self) -> &'static str {
545            "TestCustomData"
546        }
547        fn as_any(&self) -> &dyn Any {
548            self
549        }
550        fn ts_event(&self) -> UnixNanos {
551            self.ts_init
552        }
553        fn to_json(&self) -> anyhow::Result<String> {
554            Ok(serde_json::to_string(self)?)
555        }
556        fn clone_arc(&self) -> Arc<dyn CustomDataTrait> {
557            Arc::new(self.clone())
558        }
559        fn eq_arc(&self, other: &dyn CustomDataTrait) -> bool {
560            if let Some(other) = other.as_any().downcast_ref::<Self>() {
561                self == other
562            } else {
563                false
564            }
565        }
566
567        fn type_name_static() -> &'static str {
568            "TestCustomData"
569        }
570
571        fn from_json(value: serde_json::Value) -> anyhow::Result<Arc<dyn CustomDataTrait>> {
572            let parsed: Self = serde_json::from_value(value)?;
573            Ok(Arc::new(parsed))
574        }
575    }
576
577    #[rstest]
578    fn test_custom_data_json_roundtrip() {
579        register_custom_data_json::<TestCustomData>()
580            .expect("TestCustomData must register for JSON roundtrip test");
581
582        let instrument_id = InstrumentId::from("TEST.SIM");
583        let metadata = Some(
584            serde_json::from_value::<Params>(json!({"key1": "value1", "key2": "value2"})).unwrap(),
585        );
586        let inner = TestCustomData {
587            ts_init: UnixNanos::from(100),
588            instrument_id,
589        };
590        let data_type = DataType::new("TestCustomData", metadata, Some(instrument_id.to_string()));
591        let original = CustomData::new(Arc::new(inner), data_type);
592
593        let json_bytes = serde_json::to_vec(&original).unwrap();
594        let roundtripped = CustomData::from_json_bytes(&json_bytes).unwrap();
595
596        assert_eq!(
597            roundtripped.data_type.type_name(),
598            original.data_type.type_name()
599        );
600        assert_eq!(
601            roundtripped.data_type.metadata(),
602            original.data_type.metadata()
603        );
604        assert_eq!(
605            roundtripped.data_type.identifier(),
606            original.data_type.identifier()
607        );
608        let orig_inner = original
609            .data
610            .as_any()
611            .downcast_ref::<TestCustomData>()
612            .unwrap();
613        let rt_inner = roundtripped
614            .data
615            .as_any()
616            .downcast_ref::<TestCustomData>()
617            .unwrap();
618        assert_eq!(orig_inner, rt_inner);
619    }
620
621    #[rstest]
622    fn test_custom_data_wrapper() {
623        let instrument_id = InstrumentId::from("TEST.SIM");
624        let data = TestCustomData {
625            ts_init: UnixNanos::from(100),
626            instrument_id,
627        };
628        let data_type = DataType::new("TestCustomData", None, Some(instrument_id.to_string()));
629        let custom_data = CustomData::new(Arc::new(data), data_type);
630
631        assert_eq!(custom_data.data.ts_init(), UnixNanos::from(100));
632        assert_eq!(Data::Custom(custom_data).instrument_id(), instrument_id);
633    }
634}