Skip to main content

nautilus_model/python/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Data types for the trading domain model.
17
18pub mod bar;
19pub mod bet;
20pub mod close;
21#[cfg(feature = "python")]
22pub mod custom;
23pub mod delta;
24pub mod deltas;
25pub mod depth;
26pub mod forward;
27pub mod funding;
28pub mod greeks;
29pub mod option_chain;
30pub mod order;
31pub mod prices;
32pub mod quote;
33pub mod status;
34pub mod trade;
35
36#[cfg(feature = "ffi")]
37use nautilus_core::ffi::cvec::CVec;
38#[cfg(feature = "python")]
39use nautilus_core::python::{
40    params::{params_to_pydict, pydict_to_params},
41    to_pyruntime_err, to_pytype_err, to_pyvalue_err,
42};
43#[cfg(feature = "python")]
44use pyo3::types::PyDict;
45use pyo3::{prelude::*, types::PyCapsule};
46
47#[cfg(feature = "cython-compat")]
48use crate::data::DataFFI;
49use crate::data::{
50    Bar, CustomData, Data, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
51    MarkPriceUpdate, OrderBookDelta, QuoteTick, TradeTick, close::InstrumentClose,
52    is_monotonically_increasing_by_init, register_python_data_class,
53};
54
55const ERROR_MONOTONICITY: &str = "`data` was not monotonically increasing by the `ts_init` field";
56
57#[pymethods]
58#[cfg_attr(feature = "python", pyo3_stub_gen::derive::gen_stub_pymethods)]
59impl DataType {
60    /// Represents a data type including metadata.
61    #[new]
62    #[pyo3(signature = (type_name, metadata=None, identifier=None))]
63    fn py_new(
64        py: Python<'_>,
65        type_name: &str,
66        metadata: Option<Py<PyDict>>,
67        identifier: Option<String>,
68    ) -> PyResult<Self> {
69        let params = match metadata {
70            None => None,
71            Some(d) => pydict_to_params(py, &d)?,
72        };
73        Ok(Self::new(type_name, params, identifier))
74    }
75
76    fn __richcmp__(&self, other: &Self, op: pyo3::pyclass::CompareOp, py: Python<'_>) -> Py<PyAny> {
77        use nautilus_core::python::IntoPyObjectNautilusExt;
78
79        match op {
80            pyo3::pyclass::CompareOp::Eq => (self.topic() == other.topic()).into_py_any_unwrap(py),
81            pyo3::pyclass::CompareOp::Ne => (self.topic() != other.topic()).into_py_any_unwrap(py),
82            _ => py.NotImplemented(),
83        }
84    }
85
86    fn __hash__(&self) -> isize {
87        self.precomputed_hash() as isize
88    }
89
90    /// Returns the type name for the data type.
91    #[getter]
92    #[pyo3(name = "type_name")]
93    fn py_type_name(&self) -> &str {
94        self.type_name()
95    }
96
97    /// Returns the metadata for the data type.
98    #[getter]
99    #[pyo3(name = "metadata")]
100    fn py_metadata(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
101        match self.metadata() {
102            None => Ok(py.None()),
103            Some(p) => Ok(params_to_pydict(py, p)?
104                .bind(py)
105                .clone()
106                .into_any()
107                .unbind()),
108        }
109    }
110
111    /// Returns the messaging topic for the data type.
112    #[getter]
113    #[pyo3(name = "topic")]
114    fn py_topic(&self) -> &str {
115        self.topic()
116    }
117
118    /// Returns the optional catalog path identifier (can contain subdirs, e.g. `"venue//symbol"`).
119    #[getter]
120    #[pyo3(name = "identifier")]
121    fn py_identifier(&self) -> Option<&str> {
122        self.identifier()
123    }
124}
125
126/// Creates a Python `PyCapsule` object containing a Rust `Data` instance.
127///
128/// This function takes ownership of the `Data` instance and encapsulates it within
129/// a `PyCapsule` object, allowing the Rust data to be passed into the Python runtime.
130///
131/// # Capsule type contract
132///
133/// When conversion to `DataFFI` fails (e.g. for `Data::Custom`), this returns a
134/// capsule containing a single `Data` value (no destructor). That capsule must
135/// **never** be passed to [`drop_cvec_pycapsule`], which expects a `CVec` and
136/// would cause undefined behavior. Only capsules produced by code that creates
137/// `CVec` (e.g. for `capsule_to_list`) may be passed to `drop_cvec_pycapsule`.
138///
139/// # Panics
140///
141/// This function panics if the `PyCapsule` creation fails, which may occur if
142/// there are issues with memory allocation or if the `Data` instance cannot be
143/// properly encapsulated.
144#[must_use]
145pub fn data_to_pycapsule(py: Python, data: Data) -> Py<PyAny> {
146    #[cfg(feature = "cython-compat")]
147    {
148        // For Cython compatibility, we convert to DataFFI if possible.
149        if let Ok(ffi_data) = DataFFI::try_from(data.clone()) {
150            let capsule = PyCapsule::new_with_destructor(py, ffi_data, None, |_, _| {})
151                .expect("Error creating `PyCapsule` for `DataFFI` ");
152            return capsule.into_any().unbind();
153        }
154    }
155
156    // Default case for PyO3 or when conversion fails (e.g. Custom data)
157    let capsule = PyCapsule::new_with_destructor(py, data, None, |_, _| {})
158        .expect("Error creating `PyCapsule` for `Data` ");
159    capsule.into_any().unbind()
160}
161
162/// Drops a `PyCapsule` containing a `CVec` structure.
163///
164/// This function safely extracts and drops the `CVec` instance encapsulated within
165/// a `PyCapsule` object. It is intended for cleaning up after the `Data` instances
166/// have been transferred into Python (e.g. via `capsule_to_list`) and are no longer needed.
167///
168/// # Capsule type contract
169///
170/// **Must only be called** on capsules that contain a `CVec` (pointer to `Vec<DataFFI>`).
171/// Never pass a capsule from [`data_to_pycapsule`] here: when that function returns a
172/// single-`Data` capsule (e.g. for `Data::Custom`), the pointer is not a `CVec`, and
173/// calling this would be undefined behavior.
174///
175/// # Panics
176///
177/// Panics if the capsule cannot be downcast to a `PyCapsule`, indicating a type
178/// mismatch or improper capsule handling.
179///
180/// This function involves raw pointer dereferencing and manual memory
181/// management. The caller must ensure the `PyCapsule` contains a valid `CVec` pointer.
182#[cfg(feature = "ffi")]
183#[pyfunction]
184#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.model")]
185#[allow(unsafe_code)]
186pub fn drop_cvec_pycapsule(capsule: &Bound<'_, PyAny>) {
187    let capsule: &Bound<'_, PyCapsule> = capsule
188        .cast::<PyCapsule>()
189        .expect("Error on downcast to `&PyCapsule`");
190    let cvec: &CVec = unsafe { &*(capsule.pointer_checked(None).unwrap().as_ptr() as *const CVec) };
191    let data: Vec<crate::data::DataFFI> =
192        unsafe { Vec::from_raw_parts(cvec.ptr.cast::<crate::data::DataFFI>(), cvec.len, cvec.cap) };
193    drop(data);
194}
195
196#[cfg(not(feature = "ffi"))]
197#[pyfunction]
198#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.model")]
199/// Drops a Python `PyCapsule` containing a `CVec` when the `ffi` feature is not enabled.
200///
201/// # Panics
202///
203/// Always panics with the message "`ffi` feature is not enabled" to indicate that
204/// FFI functionality is unavailable.
205pub fn drop_cvec_pycapsule(_capsule: &Bound<'_, PyAny>) {
206    panic!("`ffi` feature is not enabled");
207}
208
209/// Transforms the given Python objects into a vector of [`OrderBookDelta`] objects.
210///
211/// # Errors
212///
213/// Returns a `PyErr` if element conversion fails or the data is not monotonically increasing.
214pub fn pyobjects_to_book_deltas(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<OrderBookDelta>> {
215    let deltas: Vec<OrderBookDelta> = data
216        .into_iter()
217        .map(|obj| OrderBookDelta::from_pyobject(&obj))
218        .collect::<PyResult<Vec<OrderBookDelta>>>()?;
219
220    // Validate monotonically increasing
221    if !is_monotonically_increasing_by_init(&deltas) {
222        return Err(to_pyvalue_err(ERROR_MONOTONICITY));
223    }
224
225    Ok(deltas)
226}
227
228/// Transforms the given Python objects into a vector of [`QuoteTick`] objects.
229///
230/// # Errors
231///
232/// Returns a `PyErr` if element conversion fails or the data is not monotonically increasing.
233pub fn pyobjects_to_quotes(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<QuoteTick>> {
234    let quotes: Vec<QuoteTick> = data
235        .into_iter()
236        .map(|obj| QuoteTick::from_pyobject(&obj))
237        .collect::<PyResult<Vec<QuoteTick>>>()?;
238
239    // Validate monotonically increasing
240    if !is_monotonically_increasing_by_init(&quotes) {
241        return Err(to_pyvalue_err(ERROR_MONOTONICITY));
242    }
243
244    Ok(quotes)
245}
246
247/// Transforms the given Python objects into a vector of [`TradeTick`] objects.
248///
249/// # Errors
250///
251/// Returns a `PyErr` if element conversion fails or the data is not monotonically increasing.
252pub fn pyobjects_to_trades(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<TradeTick>> {
253    let trades: Vec<TradeTick> = data
254        .into_iter()
255        .map(|obj| TradeTick::from_pyobject(&obj))
256        .collect::<PyResult<Vec<TradeTick>>>()?;
257
258    // Validate monotonically increasing
259    if !is_monotonically_increasing_by_init(&trades) {
260        return Err(to_pyvalue_err(ERROR_MONOTONICITY));
261    }
262
263    Ok(trades)
264}
265
266/// Transforms the given Python objects into a vector of [`Bar`] objects.
267///
268/// # Errors
269///
270/// Returns a `PyErr` if element conversion fails or the data is not monotonically increasing.
271pub fn pyobjects_to_bars(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<Bar>> {
272    let bars: Vec<Bar> = data
273        .into_iter()
274        .map(|obj| Bar::from_pyobject(&obj))
275        .collect::<PyResult<Vec<Bar>>>()?;
276
277    // Validate monotonically increasing
278    if !is_monotonically_increasing_by_init(&bars) {
279        return Err(to_pyvalue_err(ERROR_MONOTONICITY));
280    }
281
282    Ok(bars)
283}
284
285/// Transforms the given Python objects into a vector of [`MarkPriceUpdate`] objects.
286///
287/// # Errors
288///
289/// Returns a `PyErr` if element conversion fails or the data is not monotonically increasing.
290pub fn pyobjects_to_mark_prices(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<MarkPriceUpdate>> {
291    let mark_prices: Vec<MarkPriceUpdate> = data
292        .into_iter()
293        .map(|obj| MarkPriceUpdate::from_pyobject(&obj))
294        .collect::<PyResult<Vec<MarkPriceUpdate>>>()?;
295
296    // Validate monotonically increasing
297    if !is_monotonically_increasing_by_init(&mark_prices) {
298        return Err(to_pyvalue_err(ERROR_MONOTONICITY));
299    }
300
301    Ok(mark_prices)
302}
303
304/// Transforms the given Python objects into a vector of [`IndexPriceUpdate`] objects.
305///
306/// # Errors
307///
308/// Returns a `PyErr` if element conversion fails or the data is not monotonically increasing.
309pub fn pyobjects_to_index_prices(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<IndexPriceUpdate>> {
310    let index_prices: Vec<IndexPriceUpdate> = data
311        .into_iter()
312        .map(|obj| IndexPriceUpdate::from_pyobject(&obj))
313        .collect::<PyResult<Vec<IndexPriceUpdate>>>()?;
314
315    // Validate monotonically increasing
316    if !is_monotonically_increasing_by_init(&index_prices) {
317        return Err(to_pyvalue_err(ERROR_MONOTONICITY));
318    }
319
320    Ok(index_prices)
321}
322
323/// Transforms the given Python objects into a vector of [`InstrumentStatus`] objects.
324///
325/// # Errors
326///
327/// Returns a `PyErr` if element conversion fails or the data is not monotonically increasing.
328pub fn pyobjects_to_instrument_statuses(
329    data: Vec<Bound<'_, PyAny>>,
330) -> PyResult<Vec<InstrumentStatus>> {
331    let statuses: Vec<InstrumentStatus> = data
332        .into_iter()
333        .map(|obj| InstrumentStatus::from_pyobject(&obj))
334        .collect::<PyResult<Vec<InstrumentStatus>>>()?;
335
336    if !is_monotonically_increasing_by_init(&statuses) {
337        return Err(to_pyvalue_err(ERROR_MONOTONICITY));
338    }
339
340    Ok(statuses)
341}
342
343/// Transforms the given Python objects into a vector of [`InstrumentClose`] objects.
344///
345/// # Errors
346///
347/// Returns a `PyErr` if element conversion fails or the data is not monotonically increasing.
348pub fn pyobjects_to_instrument_closes(
349    data: Vec<Bound<'_, PyAny>>,
350) -> PyResult<Vec<InstrumentClose>> {
351    let closes: Vec<InstrumentClose> = data
352        .into_iter()
353        .map(|obj| InstrumentClose::from_pyobject(&obj))
354        .collect::<PyResult<Vec<InstrumentClose>>>()?;
355
356    // Validate monotonically increasing
357    if !is_monotonically_increasing_by_init(&closes) {
358        return Err(to_pyvalue_err(ERROR_MONOTONICITY));
359    }
360
361    Ok(closes)
362}
363
364/// Deserializes custom data from JSON bytes into a PyO3 `CustomData` wrapper.
365///
366/// # Errors
367///
368/// Returns a `PyErr` if the type is not registered or JSON deserialization fails.
369#[cfg(feature = "python")]
370#[pyfunction]
371pub fn deserialize_custom_from_json(type_name: &str, payload: &[u8]) -> PyResult<CustomData> {
372    use crate::data::registry;
373    let value: serde_json::Value = serde_json::from_slice(payload)
374        .map_err(|e| to_pyvalue_err(format!("Invalid JSON: {e}")))?;
375    let Some(Data::Custom(custom)) = registry::deserialize_custom_from_json(type_name, &value)
376        .map_err(|e| to_pyvalue_err(format!("Deserialization failed: {e}")))?
377    else {
378        return Err(to_pyvalue_err(format!(
379            "Custom data type \"{type_name}\" is not registered"
380        )));
381    };
382    Ok(custom)
383}
384
385/// Deserializes JSON value to `CustomData` via the data class's `from_json`.
386#[cfg(feature = "python")]
387fn py_json_deserialize_custom_data(
388    data_class: &pyo3::Py<pyo3::PyAny>,
389    value: &serde_json::Value,
390) -> Result<std::sync::Arc<dyn crate::data::CustomDataTrait>, anyhow::Error> {
391    use std::sync::Arc;
392
393    use crate::data::PythonCustomDataWrapper;
394
395    pyo3::Python::attach(|py| {
396        let json_str = serde_json::to_string(&value)?;
397        let json_module = py
398            .import("json")
399            .map_err(|e| anyhow::anyhow!("Failed to import json: {e}"))?;
400        let py_dict = json_module
401            .call_method1("loads", (json_str,))
402            .map_err(|e| anyhow::anyhow!("Failed to parse JSON: {e}"))?;
403
404        let instance = data_class
405            .bind(py)
406            .call_method1("from_json", (py_dict,))
407            .map_err(|e| anyhow::anyhow!("Failed to call from_json: {e}"))?;
408
409        let wrapper = PythonCustomDataWrapper::new(py, &instance)
410            .map_err(|e| anyhow::anyhow!("Failed to create wrapper: {e}"))?;
411
412        Ok(Arc::new(wrapper) as Arc<dyn crate::data::CustomDataTrait>)
413    })
414}
415
416/// Encodes `CustomData` items to `RecordBatch` via Python `encode_record_batch_py`.
417#[allow(unsafe_code)]
418#[cfg(all(feature = "python", feature = "arrow"))]
419fn py_encode_custom_data_to_record_batch(
420    items: &[std::sync::Arc<dyn crate::data::CustomDataTrait>],
421) -> Result<arrow::record_batch::RecordBatch, anyhow::Error> {
422    pyo3::Python::attach(|py| {
423        let py_items: Result<Vec<_>, _> = items.iter().map(|item| item.to_pyobject(py)).collect();
424        let py_items = py_items.map_err(|e| anyhow::anyhow!("Failed to convert to Python: {e}"))?;
425        let py_list = pyo3::types::PyList::new(py, &py_items)
426            .map_err(|e| anyhow::anyhow!("Failed to create list: {e}"))?;
427
428        let first = items
429            .first()
430            .ok_or_else(|| anyhow::anyhow!("No items to encode"))?;
431        let first_py = first.to_pyobject(py)?;
432
433        if first_py
434            .bind(py)
435            .hasattr("encode_record_batch_py")
436            .unwrap_or(false)
437        {
438            let py_batch = first_py
439                .bind(py)
440                .call_method1("encode_record_batch_py", (py_list,))
441                .map_err(|e| anyhow::anyhow!("Failed to call encode_record_batch_py: {e}"))?;
442
443            let mut ffi_array = arrow::ffi::FFI_ArrowArray::empty();
444            let mut ffi_schema = arrow::ffi::FFI_ArrowSchema::empty();
445
446            py_batch.call_method1(
447                "_export_to_c",
448                (
449                    (&raw mut ffi_array as usize),
450                    (&raw mut ffi_schema as usize),
451                ),
452            )?;
453
454            let schema = std::sync::Arc::new(arrow::datatypes::Schema::try_from(&ffi_schema)?);
455            let struct_array_data = unsafe {
456                arrow::ffi::from_ffi_and_data_type(
457                    ffi_array,
458                    arrow::datatypes::DataType::Struct(schema.fields().clone()),
459                )?
460            };
461            let struct_array = arrow::array::StructArray::from(struct_array_data);
462            Ok(arrow::record_batch::RecordBatch::from(&struct_array))
463        } else {
464            anyhow::bail!("Instances must have encode_record_batch_py method")
465        }
466    })
467}
468
469#[cfg(all(feature = "python", feature = "arrow"))]
470fn pyarrow_schema_to_arrow_schema(
471    py_schema: &pyo3::Bound<'_, pyo3::PyAny>,
472) -> PyResult<arrow::datatypes::Schema> {
473    let mut ffi_schema = arrow::ffi::FFI_ArrowSchema::empty();
474    py_schema.call_method1("_export_to_c", ((&raw mut ffi_schema as usize),))?;
475    arrow::datatypes::Schema::try_from(&ffi_schema)
476        .map_err(|e| to_pyvalue_err(format!("Failed to import PyArrow schema: {e}")))
477}
478
479/// Decodes `RecordBatch` to `CustomData` via Python `decode_record_batch_py`.
480#[allow(unsafe_code)]
481#[cfg(all(feature = "python", feature = "arrow"))]
482fn py_decode_record_batch_to_custom_data(
483    data_class: &pyo3::Py<pyo3::PyAny>,
484    metadata: &std::collections::HashMap<String, String>,
485    batch: arrow::record_batch::RecordBatch,
486) -> Result<Vec<crate::data::Data>, anyhow::Error> {
487    use std::sync::Arc;
488
489    use crate::data::PythonCustomDataWrapper;
490
491    pyo3::Python::attach(|py| {
492        let struct_array: arrow::array::StructArray = batch.into();
493        let array_data = arrow::array::Array::to_data(&struct_array);
494        let mut ffi_array = arrow::ffi::FFI_ArrowArray::new(&array_data);
495        let fields = match arrow::array::Array::data_type(&struct_array) {
496            arrow::datatypes::DataType::Struct(f) => f.clone(),
497            _ => unreachable!(),
498        };
499        let mut ffi_schema =
500            arrow::ffi::FFI_ArrowSchema::try_from(arrow::datatypes::DataType::Struct(fields))?;
501
502        let pyarrow = py.import("pyarrow")?;
503        let cls = pyarrow.getattr("RecordBatch")?;
504        let py_batch = cls.call_method1(
505            "_import_from_c",
506            (
507                (&raw mut ffi_array as usize),
508                (&raw mut ffi_schema as usize),
509            ),
510        )?;
511
512        let metadata_py = pyo3::types::PyDict::new(py);
513        for (k, v) in metadata {
514            metadata_py.set_item(k, v)?;
515        }
516
517        let py_list = data_class
518            .bind(py)
519            .call_method1("decode_record_batch_py", (metadata_py, py_batch))
520            .map_err(|e| anyhow::anyhow!("Failed to call decode_record_batch_py: {e}"))?;
521
522        let list = py_list
523            .cast::<pyo3::types::PyList>()
524            .map_err(|_| anyhow::anyhow!("Expected list from decode_record_batch_py"))?;
525
526        let mut result = Vec::new();
527        for item in list.iter() {
528            let wrapper = PythonCustomDataWrapper::new(py, &item)
529                .map_err(|e| anyhow::anyhow!("Failed to create wrapper: {e}"))?;
530            result.push(crate::data::Data::Custom(
531                crate::data::CustomData::from_arc(Arc::new(wrapper)),
532            ));
533        }
534        Ok(result)
535    })
536}
537
538/// Registers a custom data **type** (class) with the catalog registry.
539///
540/// Use this when you prefer to pass the class instead of a sample instance.
541/// The class must have:
542/// - `type_name_static()` class method or `__name__` (used as type name in storage)
543/// - `decode_record_batch_py(metadata, ipc_bytes)` class method
544/// - Instances must have `ts_event`, `ts_init` and `encode_record_batch_py(items)`.
545///
546/// # Arguments
547///
548/// * `data_class` - The custom data class (e.g. `MarketTickPython` or `module.MarketTickData`)
549///
550/// # Errors
551///
552/// Returns a `PyErr` if the class lacks required methods or the type is already registered.
553///
554/// # Example
555///
556/// ```python
557/// from nautilus_trader.model.custom import customdataclass_pyo3
558/// from nautilus_trader.model import register_custom_data_class
559///
560/// @customdataclass_pyo3()
561/// class MarketTickPython:
562///     symbol: str = ""
563///     price: float = 0.0
564///     volume: int = 0
565///
566/// register_custom_data_class(MarketTickPython)
567/// ```
568#[cfg(feature = "python")]
569#[pyfunction]
570#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.model")]
571pub fn register_custom_data_class(data_class: &Bound<'_, PyAny>) -> PyResult<()> {
572    use std::sync::Arc;
573
574    use crate::data::registry;
575
576    let _py = data_class.py();
577
578    let type_name: String = if data_class.hasattr("type_name_static")? {
579        data_class.call_method0("type_name_static")?.extract()?
580    } else {
581        data_class.getattr("__name__")?.extract()?
582    };
583
584    #[cfg(feature = "arrow")]
585    if !data_class.hasattr("decode_record_batch_py")? {
586        return Err(to_pytype_err(
587            "Custom data class must have decode_record_batch_py(metadata, batch) class method",
588        ));
589    }
590
591    if !data_class.hasattr("from_json")? {
592        return Err(to_pytype_err(
593            "Custom data class must have from_json(data) class method (Rust macro provides it)",
594        ));
595    }
596
597    register_python_data_class(&type_name, data_class);
598
599    if let Some(extractor) = registry::get_rust_extractor(&type_name) {
600        let _ = registry::ensure_py_extractor_registered(&type_name, extractor);
601    }
602
603    let data_class_for_json = data_class.clone().unbind();
604
605    let json_deserializer = Box::new(
606        move |value: serde_json::Value| -> Result<Arc<dyn crate::data::CustomDataTrait>, anyhow::Error> {
607            pyo3::Python::attach(|py| {
608                py_json_deserialize_custom_data(&data_class_for_json.clone_ref(py), &value)
609            })
610        },
611    );
612
613    registry::ensure_json_deserializer_registered(&type_name, json_deserializer).map_err(|e| {
614        to_pyruntime_err(format!(
615            "Failed to register JSON deserializer for {type_name}: {e}"
616        ))
617    })?;
618
619    #[cfg(feature = "arrow")]
620    {
621        let data_class_for_decode = data_class.clone().unbind();
622        let pyarrow_schema = data_class
623            .getattr("_schema")
624            .ok()
625            .filter(|s| s.hasattr("_export_to_c").unwrap_or(false));
626        let schema = if let Some(py_schema) = pyarrow_schema {
627            Arc::new(pyarrow_schema_to_arrow_schema(&py_schema)?)
628        } else if let Some(schema) = registry::get_arrow_schema(&type_name) {
629            schema
630        } else {
631            Arc::new(arrow::datatypes::Schema::empty())
632        };
633
634        let encoder = Box::new(
635            move |items: &[Arc<dyn crate::data::CustomDataTrait>]| -> Result<
636                arrow::record_batch::RecordBatch,
637                anyhow::Error,
638            > { py_encode_custom_data_to_record_batch(items) },
639        );
640
641        let decoder = Box::new(
642            move |metadata: &std::collections::HashMap<String, String>,
643                  batch: arrow::record_batch::RecordBatch|
644                  -> Result<Vec<crate::data::Data>, anyhow::Error> {
645                pyo3::Python::attach(|py| {
646                    py_decode_record_batch_to_custom_data(
647                        &data_class_for_decode.clone_ref(py),
648                        metadata,
649                        batch,
650                    )
651                })
652            },
653        );
654
655        registry::ensure_arrow_registered(&type_name, schema, encoder, decoder).map_err(|e| {
656            to_pyruntime_err(format!(
657                "Failed to register Arrow encoder/decoder for {type_name}: {e}"
658            ))
659        })?;
660    }
661
662    Ok(())
663}
664
665/// Transforms the given Python objects into a vector of [`FundingRateUpdate`] objects.
666///
667/// # Errors
668///
669/// Returns a `PyErr` if element conversion fails or the data is not monotonically increasing.
670pub fn pyobjects_to_funding_rates(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<FundingRateUpdate>> {
671    let funding_rates: Vec<FundingRateUpdate> = data
672        .into_iter()
673        .map(|obj| FundingRateUpdate::from_pyobject(&obj))
674        .collect::<PyResult<Vec<FundingRateUpdate>>>()?;
675
676    // Validate monotonically increasing
677    if !is_monotonically_increasing_by_init(&funding_rates) {
678        return Err(to_pyvalue_err(ERROR_MONOTONICITY));
679    }
680
681    Ok(funding_rates)
682}