Skip to main content

nautilus_model/python/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Data types for the trading domain model.
17
18#[cfg(feature = "ffi")]
19use std::ffi::{CStr, CString};
20
21pub mod bar;
22pub mod bet;
23pub mod close;
24#[cfg(feature = "python")]
25pub mod custom;
26pub mod delta;
27pub mod deltas;
28pub mod depth;
29pub mod forward;
30pub mod funding;
31pub mod greeks;
32pub mod option_chain;
33pub mod order;
34pub mod prices;
35pub mod quote;
36pub mod status;
37pub mod trade;
38
39#[cfg(feature = "ffi")]
40use nautilus_core::ffi::cvec::CVec;
41#[cfg(feature = "python")]
42use nautilus_core::python::{
43    params::{params_to_pydict, pydict_to_params},
44    to_pyruntime_err, to_pytype_err, to_pyvalue_err,
45};
46#[cfg(feature = "python")]
47use pyo3::types::PyDict;
48use pyo3::{prelude::*, types::PyCapsule};
49
50#[cfg(any(feature = "cython-compat", feature = "ffi"))]
51use crate::data::DataFFI;
52use crate::data::{
53    Bar, CustomData, Data, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
54    MarkPriceUpdate, OptionGreeks, OrderBookDelta, QuoteTick, TradeTick, close::InstrumentClose,
55    is_monotonically_increasing_by_init, register_python_data_class,
56};
57
58const ERROR_MONOTONICITY: &str = "`data` was not monotonically increasing by the `ts_init` field";
59
60#[cfg(feature = "ffi")]
61pub const DATA_FFI_CVEC_CAPSULE_NAME: &CStr = c"nautilus.DataFFI.CVec";
62
63#[cfg(feature = "ffi")]
64#[repr(transparent)]
65#[derive(Debug)]
66pub struct DataFfiCVec(CVec);
67
68#[cfg(feature = "ffi")]
69impl DataFfiCVec {
70    #[must_use]
71    pub fn capsule_name() -> CString {
72        DATA_FFI_CVEC_CAPSULE_NAME.to_owned()
73    }
74}
75
76#[cfg(feature = "ffi")]
77impl From<Vec<DataFFI>> for DataFfiCVec {
78    fn from(data: Vec<DataFFI>) -> Self {
79        Self(data.into())
80    }
81}
82
83#[cfg(feature = "ffi")]
84#[allow(unsafe_code)]
85// SAFETY: DataFfiCVec only wraps CVec allocations produced from Vec<DataFFI>.
86// DataFFI is the type-specific payload for these Python capsules, and the
87// capsule transfers ownership metadata without sharing mutable access.
88unsafe impl Send for DataFfiCVec {}
89
90#[pymethods]
91#[cfg_attr(feature = "python", pyo3_stub_gen::derive::gen_stub_pymethods)]
92impl DataType {
93    /// Represents a data type including metadata.
94    #[new]
95    #[pyo3(signature = (type_name, metadata=None, identifier=None))]
96    fn py_new(
97        py: Python<'_>,
98        type_name: &str,
99        metadata: Option<Py<PyDict>>,
100        identifier: Option<String>,
101    ) -> PyResult<Self> {
102        let params = match metadata {
103            None => None,
104            Some(d) => pydict_to_params(py, &d)?,
105        };
106        Ok(Self::new(type_name, params, identifier))
107    }
108
109    fn __richcmp__(&self, other: &Self, op: pyo3::pyclass::CompareOp, py: Python<'_>) -> Py<PyAny> {
110        use nautilus_core::python::IntoPyObjectNautilusExt;
111
112        match op {
113            pyo3::pyclass::CompareOp::Eq => (self.topic() == other.topic()).into_py_any_unwrap(py),
114            pyo3::pyclass::CompareOp::Ne => (self.topic() != other.topic()).into_py_any_unwrap(py),
115            _ => py.NotImplemented(),
116        }
117    }
118
119    fn __hash__(&self) -> isize {
120        self.precomputed_hash() as isize
121    }
122
123    /// Returns the type name for the data type.
124    #[getter]
125    #[pyo3(name = "type_name")]
126    fn py_type_name(&self) -> &str {
127        self.type_name()
128    }
129
130    /// Returns the metadata for the data type.
131    #[getter]
132    #[pyo3(name = "metadata")]
133    fn py_metadata(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
134        match self.metadata() {
135            None => Ok(py.None()),
136            Some(p) => Ok(params_to_pydict(py, p)?
137                .bind(py)
138                .clone()
139                .into_any()
140                .unbind()),
141        }
142    }
143
144    /// Returns the messaging topic for the data type.
145    #[getter]
146    #[pyo3(name = "topic")]
147    fn py_topic(&self) -> &str {
148        self.topic()
149    }
150
151    /// Returns the optional catalog path identifier (can contain subdirs, e.g. `"venue//symbol"`).
152    #[getter]
153    #[pyo3(name = "identifier")]
154    fn py_identifier(&self) -> Option<&str> {
155        self.identifier()
156    }
157}
158
159/// Creates a Python `PyCapsule` object containing a Rust `Data` instance.
160///
161/// This function takes ownership of the `Data` instance and encapsulates it within
162/// a `PyCapsule` object, allowing the Rust data to be passed into the Python runtime.
163///
164/// # Capsule type contract
165///
166/// When conversion to `DataFFI` fails (e.g. for `Data::Custom`), this returns a
167/// capsule containing a single `Data` value (no destructor). That capsule must
168/// **never** be passed to [`drop_cvec_pycapsule`], which expects a `CVec` and
169/// would cause undefined behavior. Only capsules produced by code that creates
170/// `CVec` (e.g. for `capsule_to_list`) may be passed to `drop_cvec_pycapsule`.
171///
172/// # Panics
173///
174/// This function panics if the `PyCapsule` creation fails, which may occur if
175/// there are issues with memory allocation or if the `Data` instance cannot be
176/// properly encapsulated.
177#[must_use]
178pub fn data_to_pycapsule(py: Python, data: Data) -> Py<PyAny> {
179    #[cfg(feature = "cython-compat")]
180    {
181        // For Cython compatibility, we convert to DataFFI if possible.
182        if let Ok(ffi_data) = DataFFI::try_from(data.clone()) {
183            let capsule = PyCapsule::new_with_destructor(py, ffi_data, None, |_, _| {})
184                .expect("Error creating `PyCapsule` for `DataFFI` ");
185            return capsule.into_any().unbind();
186        }
187    }
188
189    // Default case for PyO3 or when conversion fails (e.g. Custom data)
190    let capsule = PyCapsule::new_with_destructor(py, data, None, |_, _| {})
191        .expect("Error creating `PyCapsule` for `Data` ");
192    capsule.into_any().unbind()
193}
194
195/// Drops a `PyCapsule` containing a `CVec` structure.
196///
197/// This function safely extracts and drops the `CVec` instance encapsulated within
198/// a `PyCapsule` object. It is intended for cleaning up after the `Data` instances
199/// have been transferred into Python (e.g. via `capsule_to_list`) and are no longer needed.
200///
201/// # Capsule type contract
202///
203/// **Must only be called** on capsules that contain a `CVec` (pointer to `Vec<DataFFI>`).
204/// Never pass a capsule from [`data_to_pycapsule`] here: when that function returns a
205/// single-`Data` capsule (e.g. for `Data::Custom`), the pointer is not a `CVec`, and
206/// calling this would be undefined behavior.
207///
208/// # Errors
209///
210/// Returns a `PyErr` if the object is not a `PyCapsule`, has the wrong capsule
211/// name, or contains invalid `CVec` metadata.
212///
213/// This function involves raw pointer dereferencing and manual memory
214/// management. The caller must ensure the `PyCapsule` contains a valid `CVec` pointer.
215#[cfg(feature = "ffi")]
216#[pyfunction]
217#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.model")]
218#[allow(unsafe_code)]
219pub fn drop_cvec_pycapsule(capsule: &Bound<'_, PyAny>) -> PyResult<()> {
220    let capsule: &Bound<'_, PyCapsule> = capsule
221        .cast::<PyCapsule>()
222        .map_err(|e| to_pyvalue_err(format!("Expected DataFFI CVec PyCapsule: {e}")))?;
223    let cvec_ptr = capsule
224        .pointer_checked(Some(DATA_FFI_CVEC_CAPSULE_NAME))
225        .map_err(|e| to_pyvalue_err(format!("Invalid DataFFI CVec PyCapsule: {e}")))?
226        .as_ptr()
227        .cast::<CVec>();
228    // SAFETY: The capsule name check above verifies this is a DataFfiCVec, whose
229    // transparent representation starts with the CVec metadata.
230    let cvec = unsafe { *cvec_ptr };
231
232    if cvec.len == 0 && cvec.cap == 0 {
233        // SAFETY: The pointer targets the CVec metadata inside the checked capsule.
234        unsafe {
235            *cvec_ptr = CVec::empty();
236        }
237        return Ok(());
238    }
239
240    if cvec.len > cvec.cap {
241        return Err(to_pyvalue_err(format!(
242            "Invalid DataFFI CVec metadata: len ({}) > cap ({})",
243            cvec.len, cvec.cap
244        )));
245    }
246
247    if cvec.ptr.is_null() {
248        return Err(to_pyvalue_err(format!(
249            "Invalid DataFFI CVec metadata: null ptr with len ({}) and cap ({})",
250            cvec.len, cvec.cap
251        )));
252    }
253
254    // SAFETY: The pointer targets the CVec metadata inside the checked capsule.
255    // Reset it before reconstructing the Vec so repeated calls do not double free.
256    unsafe {
257        *cvec_ptr = CVec::empty();
258    }
259
260    // SAFETY: The metadata came from CVec::from(Vec<DataFFI>) and was validated above.
261    let data: Vec<DataFFI> =
262        unsafe { Vec::from_raw_parts(cvec.ptr.cast::<DataFFI>(), cvec.len, cvec.cap) };
263    drop(data);
264    Ok(())
265}
266
267#[cfg(not(feature = "ffi"))]
268#[pyfunction]
269#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.model")]
270/// Drops a Python `PyCapsule` containing a `CVec` when the `ffi` feature is not enabled.
271///
272/// # Errors
273///
274/// Always returns a `PyErr` with the message "`ffi` feature is not enabled" to indicate that
275/// FFI functionality is unavailable.
276pub fn drop_cvec_pycapsule(_capsule: &Bound<'_, PyAny>) -> PyResult<()> {
277    Err(to_pyruntime_err("`ffi` feature is not enabled"))
278}
279
280/// Transforms the given Python objects into a vector of [`OrderBookDelta`] objects.
281///
282/// # Errors
283///
284/// Returns a `PyErr` if element conversion fails or the data is not monotonically increasing.
285pub fn pyobjects_to_book_deltas(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<OrderBookDelta>> {
286    let deltas: Vec<OrderBookDelta> = data
287        .into_iter()
288        .map(|obj| OrderBookDelta::from_pyobject(&obj))
289        .collect::<PyResult<Vec<OrderBookDelta>>>()?;
290
291    // Validate monotonically increasing
292    if !is_monotonically_increasing_by_init(&deltas) {
293        return Err(to_pyvalue_err(ERROR_MONOTONICITY));
294    }
295
296    Ok(deltas)
297}
298
299/// Transforms the given Python objects into a vector of [`QuoteTick`] objects.
300///
301/// # Errors
302///
303/// Returns a `PyErr` if element conversion fails or the data is not monotonically increasing.
304pub fn pyobjects_to_quotes(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<QuoteTick>> {
305    let quotes: Vec<QuoteTick> = data
306        .into_iter()
307        .map(|obj| QuoteTick::from_pyobject(&obj))
308        .collect::<PyResult<Vec<QuoteTick>>>()?;
309
310    // Validate monotonically increasing
311    if !is_monotonically_increasing_by_init(&quotes) {
312        return Err(to_pyvalue_err(ERROR_MONOTONICITY));
313    }
314
315    Ok(quotes)
316}
317
318/// Transforms the given Python objects into a vector of [`TradeTick`] objects.
319///
320/// # Errors
321///
322/// Returns a `PyErr` if element conversion fails or the data is not monotonically increasing.
323pub fn pyobjects_to_trades(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<TradeTick>> {
324    let trades: Vec<TradeTick> = data
325        .into_iter()
326        .map(|obj| TradeTick::from_pyobject(&obj))
327        .collect::<PyResult<Vec<TradeTick>>>()?;
328
329    // Validate monotonically increasing
330    if !is_monotonically_increasing_by_init(&trades) {
331        return Err(to_pyvalue_err(ERROR_MONOTONICITY));
332    }
333
334    Ok(trades)
335}
336
337/// Transforms the given Python objects into a vector of [`Bar`] objects.
338///
339/// # Errors
340///
341/// Returns a `PyErr` if element conversion fails or the data is not monotonically increasing.
342pub fn pyobjects_to_bars(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<Bar>> {
343    let bars: Vec<Bar> = data
344        .into_iter()
345        .map(|obj| Bar::from_pyobject(&obj))
346        .collect::<PyResult<Vec<Bar>>>()?;
347
348    // Validate monotonically increasing
349    if !is_monotonically_increasing_by_init(&bars) {
350        return Err(to_pyvalue_err(ERROR_MONOTONICITY));
351    }
352
353    Ok(bars)
354}
355
356/// Transforms the given Python objects into a vector of [`MarkPriceUpdate`] objects.
357///
358/// # Errors
359///
360/// Returns a `PyErr` if element conversion fails or the data is not monotonically increasing.
361pub fn pyobjects_to_mark_prices(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<MarkPriceUpdate>> {
362    let mark_prices: Vec<MarkPriceUpdate> = data
363        .into_iter()
364        .map(|obj| MarkPriceUpdate::from_pyobject(&obj))
365        .collect::<PyResult<Vec<MarkPriceUpdate>>>()?;
366
367    // Validate monotonically increasing
368    if !is_monotonically_increasing_by_init(&mark_prices) {
369        return Err(to_pyvalue_err(ERROR_MONOTONICITY));
370    }
371
372    Ok(mark_prices)
373}
374
375/// Transforms the given Python objects into a vector of [`IndexPriceUpdate`] objects.
376///
377/// # Errors
378///
379/// Returns a `PyErr` if element conversion fails or the data is not monotonically increasing.
380pub fn pyobjects_to_index_prices(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<IndexPriceUpdate>> {
381    let index_prices: Vec<IndexPriceUpdate> = data
382        .into_iter()
383        .map(|obj| IndexPriceUpdate::from_pyobject(&obj))
384        .collect::<PyResult<Vec<IndexPriceUpdate>>>()?;
385
386    // Validate monotonically increasing
387    if !is_monotonically_increasing_by_init(&index_prices) {
388        return Err(to_pyvalue_err(ERROR_MONOTONICITY));
389    }
390
391    Ok(index_prices)
392}
393
394/// Transforms the given Python objects into a vector of [`InstrumentStatus`] objects.
395///
396/// # Errors
397///
398/// Returns a `PyErr` if element conversion fails or the data is not monotonically increasing.
399pub fn pyobjects_to_instrument_statuses(
400    data: Vec<Bound<'_, PyAny>>,
401) -> PyResult<Vec<InstrumentStatus>> {
402    let statuses: Vec<InstrumentStatus> = data
403        .into_iter()
404        .map(|obj| InstrumentStatus::from_pyobject(&obj))
405        .collect::<PyResult<Vec<InstrumentStatus>>>()?;
406
407    if !is_monotonically_increasing_by_init(&statuses) {
408        return Err(to_pyvalue_err(ERROR_MONOTONICITY));
409    }
410
411    Ok(statuses)
412}
413
414/// Transforms the given Python objects into a vector of [`OptionGreeks`] objects.
415///
416/// # Errors
417///
418/// Returns a `PyErr` if element conversion fails or the data is not monotonically increasing.
419pub fn pyobjects_to_option_greeks(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<OptionGreeks>> {
420    let greeks: Vec<OptionGreeks> = data
421        .into_iter()
422        .map(|obj| OptionGreeks::from_pyobject(&obj))
423        .collect::<PyResult<Vec<OptionGreeks>>>()?;
424
425    if !is_monotonically_increasing_by_init(&greeks) {
426        return Err(to_pyvalue_err(ERROR_MONOTONICITY));
427    }
428
429    Ok(greeks)
430}
431
432/// Transforms the given Python objects into a vector of [`InstrumentClose`] objects.
433///
434/// # Errors
435///
436/// Returns a `PyErr` if element conversion fails or the data is not monotonically increasing.
437pub fn pyobjects_to_instrument_closes(
438    data: Vec<Bound<'_, PyAny>>,
439) -> PyResult<Vec<InstrumentClose>> {
440    let closes: Vec<InstrumentClose> = data
441        .into_iter()
442        .map(|obj| InstrumentClose::from_pyobject(&obj))
443        .collect::<PyResult<Vec<InstrumentClose>>>()?;
444
445    // Validate monotonically increasing
446    if !is_monotonically_increasing_by_init(&closes) {
447        return Err(to_pyvalue_err(ERROR_MONOTONICITY));
448    }
449
450    Ok(closes)
451}
452
453/// Deserializes custom data from JSON bytes into a PyO3 `CustomData` wrapper.
454///
455/// # Errors
456///
457/// Returns a `PyErr` if the type is not registered or JSON deserialization fails.
458#[cfg(feature = "python")]
459#[pyfunction]
460pub fn deserialize_custom_from_json(type_name: &str, payload: &[u8]) -> PyResult<CustomData> {
461    use crate::data::registry;
462    let value: serde_json::Value = serde_json::from_slice(payload)
463        .map_err(|e| to_pyvalue_err(format!("Invalid JSON: {e}")))?;
464    let Some(Data::Custom(custom)) = registry::deserialize_custom_from_json(type_name, &value)
465        .map_err(|e| to_pyvalue_err(format!("Deserialization failed: {e}")))?
466    else {
467        return Err(to_pyvalue_err(format!(
468            "Custom data type \"{type_name}\" is not registered"
469        )));
470    };
471    Ok(custom)
472}
473
474/// Deserializes JSON value to `CustomData` via the data class's `from_json`.
475#[cfg(feature = "python")]
476fn py_json_deserialize_custom_data(
477    data_class: &pyo3::Py<pyo3::PyAny>,
478    value: &serde_json::Value,
479) -> Result<std::sync::Arc<dyn crate::data::CustomDataTrait>, anyhow::Error> {
480    use std::sync::Arc;
481
482    use crate::data::PythonCustomDataWrapper;
483
484    pyo3::Python::attach(|py| {
485        let json_str = serde_json::to_string(&value)?;
486        let json_module = py
487            .import("json")
488            .map_err(|e| anyhow::anyhow!("Failed to import json: {e}"))?;
489        let py_dict = json_module
490            .call_method1("loads", (json_str,))
491            .map_err(|e| anyhow::anyhow!("Failed to parse JSON: {e}"))?;
492
493        let instance = data_class
494            .bind(py)
495            .call_method1("from_json", (py_dict,))
496            .map_err(|e| anyhow::anyhow!("Failed to call from_json: {e}"))?;
497
498        let wrapper = PythonCustomDataWrapper::new(py, &instance)
499            .map_err(|e| anyhow::anyhow!("Failed to create wrapper: {e}"))?;
500
501        Ok(Arc::new(wrapper) as Arc<dyn crate::data::CustomDataTrait>)
502    })
503}
504
505/// Encodes `CustomData` items to `RecordBatch` via Python `encode_record_batch_py`.
506#[allow(unsafe_code)]
507#[cfg(all(feature = "python", feature = "arrow"))]
508fn py_encode_custom_data_to_record_batch(
509    items: &[std::sync::Arc<dyn crate::data::CustomDataTrait>],
510) -> Result<arrow::record_batch::RecordBatch, anyhow::Error> {
511    pyo3::Python::attach(|py| {
512        let py_items: Result<Vec<_>, _> = items.iter().map(|item| item.to_pyobject(py)).collect();
513        let py_items = py_items.map_err(|e| anyhow::anyhow!("Failed to convert to Python: {e}"))?;
514        let py_list = pyo3::types::PyList::new(py, &py_items)
515            .map_err(|e| anyhow::anyhow!("Failed to create list: {e}"))?;
516
517        let first = items
518            .first()
519            .ok_or_else(|| anyhow::anyhow!("No items to encode"))?;
520        let first_py = first.to_pyobject(py)?;
521
522        if first_py
523            .bind(py)
524            .hasattr("encode_record_batch_py")
525            .unwrap_or(false)
526        {
527            let py_batch = first_py
528                .bind(py)
529                .call_method1("encode_record_batch_py", (py_list,))
530                .map_err(|e| anyhow::anyhow!("Failed to call encode_record_batch_py: {e}"))?;
531
532            let mut ffi_array = arrow::ffi::FFI_ArrowArray::empty();
533            let mut ffi_schema = arrow::ffi::FFI_ArrowSchema::empty();
534
535            py_batch.call_method1(
536                "_export_to_c",
537                (
538                    (&raw mut ffi_array as usize),
539                    (&raw mut ffi_schema as usize),
540                ),
541            )?;
542
543            let schema = std::sync::Arc::new(arrow::datatypes::Schema::try_from(&ffi_schema)?);
544            let struct_array_data = unsafe {
545                arrow::ffi::from_ffi_and_data_type(
546                    ffi_array,
547                    arrow::datatypes::DataType::Struct(schema.fields().clone()),
548                )?
549            };
550            let struct_array = arrow::array::StructArray::from(struct_array_data);
551            Ok(arrow::record_batch::RecordBatch::from(&struct_array))
552        } else {
553            anyhow::bail!("Instances must have encode_record_batch_py method")
554        }
555    })
556}
557
558#[cfg(all(feature = "python", feature = "arrow"))]
559fn pyarrow_schema_to_arrow_schema(
560    py_schema: &pyo3::Bound<'_, pyo3::PyAny>,
561) -> PyResult<arrow::datatypes::Schema> {
562    let mut ffi_schema = arrow::ffi::FFI_ArrowSchema::empty();
563    py_schema.call_method1("_export_to_c", ((&raw mut ffi_schema as usize),))?;
564    arrow::datatypes::Schema::try_from(&ffi_schema)
565        .map_err(|e| to_pyvalue_err(format!("Failed to import PyArrow schema: {e}")))
566}
567
568/// Decodes `RecordBatch` to `CustomData` via Python `decode_record_batch_py`.
569#[allow(unsafe_code)]
570#[cfg(all(feature = "python", feature = "arrow"))]
571fn py_decode_record_batch_to_custom_data(
572    data_class: &pyo3::Py<pyo3::PyAny>,
573    metadata: &std::collections::HashMap<String, String>,
574    batch: arrow::record_batch::RecordBatch,
575) -> Result<Vec<crate::data::Data>, anyhow::Error> {
576    use std::sync::Arc;
577
578    use crate::data::PythonCustomDataWrapper;
579
580    pyo3::Python::attach(|py| {
581        let struct_array: arrow::array::StructArray = batch.into();
582        let array_data = arrow::array::Array::to_data(&struct_array);
583        let mut ffi_array = arrow::ffi::FFI_ArrowArray::new(&array_data);
584        let fields = match arrow::array::Array::data_type(&struct_array) {
585            arrow::datatypes::DataType::Struct(f) => f.clone(),
586            _ => unreachable!(),
587        };
588        let mut ffi_schema =
589            arrow::ffi::FFI_ArrowSchema::try_from(arrow::datatypes::DataType::Struct(fields))?;
590
591        let pyarrow = py.import("pyarrow")?;
592        let cls = pyarrow.getattr("RecordBatch")?;
593        let py_batch = cls.call_method1(
594            "_import_from_c",
595            (
596                (&raw mut ffi_array as usize),
597                (&raw mut ffi_schema as usize),
598            ),
599        )?;
600
601        let metadata_py = pyo3::types::PyDict::new(py);
602        for (k, v) in metadata {
603            metadata_py.set_item(k, v)?;
604        }
605
606        let py_list = data_class
607            .bind(py)
608            .call_method1("decode_record_batch_py", (metadata_py, py_batch))
609            .map_err(|e| anyhow::anyhow!("Failed to call decode_record_batch_py: {e}"))?;
610
611        let list = py_list
612            .cast::<pyo3::types::PyList>()
613            .map_err(|_| anyhow::anyhow!("Expected list from decode_record_batch_py"))?;
614
615        let mut result = Vec::new();
616        for item in list.iter() {
617            let wrapper = PythonCustomDataWrapper::new(py, &item)
618                .map_err(|e| anyhow::anyhow!("Failed to create wrapper: {e}"))?;
619            result.push(crate::data::Data::Custom(
620                crate::data::CustomData::from_arc(Arc::new(wrapper)),
621            ));
622        }
623        Ok(result)
624    })
625}
626
627/// Registers a custom data **type** (class) with the catalog registry.
628///
629/// Use this when you prefer to pass the class instead of a sample instance.
630/// The class must have:
631/// - `type_name_static()` class method or `__name__` (used as type name in storage)
632/// - `decode_record_batch_py(metadata, ipc_bytes)` class method
633/// - Instances must have `ts_event`, `ts_init` and `encode_record_batch_py(items)`.
634///
635/// # Arguments
636///
637/// * `data_class` - The custom data class (e.g. `MarketTickPython` or `module.MarketTickData`)
638///
639/// # Errors
640///
641/// Returns a `PyErr` if the class lacks required methods or the type is already registered.
642///
643/// # Example
644///
645/// ```python
646/// from nautilus_trader.model.custom import customdataclass_pyo3
647/// from nautilus_trader.model import register_custom_data_class
648///
649/// @customdataclass_pyo3()
650/// class MarketTickPython:
651///     symbol: str = ""
652///     price: float = 0.0
653///     volume: int = 0
654///
655/// register_custom_data_class(MarketTickPython)
656/// ```
657#[cfg(feature = "python")]
658#[pyfunction]
659#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.model")]
660pub fn register_custom_data_class(data_class: &Bound<'_, PyAny>) -> PyResult<()> {
661    use std::sync::Arc;
662
663    use crate::data::registry;
664
665    let _py = data_class.py();
666
667    let type_name: String = if data_class.hasattr("type_name_static")? {
668        data_class.call_method0("type_name_static")?.extract()?
669    } else {
670        data_class.getattr("__name__")?.extract()?
671    };
672
673    #[cfg(feature = "arrow")]
674    if !data_class.hasattr("decode_record_batch_py")? {
675        return Err(to_pytype_err(
676            "Custom data class must have decode_record_batch_py(metadata, batch) class method",
677        ));
678    }
679
680    if !data_class.hasattr("from_json")? {
681        return Err(to_pytype_err(
682            "Custom data class must have from_json(data) class method (Rust macro provides it)",
683        ));
684    }
685
686    register_python_data_class(&type_name, data_class);
687
688    if let Some(extractor) = registry::get_rust_extractor(&type_name) {
689        let _ = registry::ensure_py_extractor_registered(&type_name, extractor);
690    }
691
692    let data_class_for_json = data_class.clone().unbind();
693
694    let json_deserializer = Box::new(
695        move |value: serde_json::Value| -> Result<Arc<dyn crate::data::CustomDataTrait>, anyhow::Error> {
696            pyo3::Python::attach(|py| {
697                py_json_deserialize_custom_data(&data_class_for_json.clone_ref(py), &value)
698            })
699        },
700    );
701
702    registry::ensure_json_deserializer_registered(&type_name, json_deserializer).map_err(|e| {
703        to_pyruntime_err(format!(
704            "Failed to register JSON deserializer for {type_name}: {e}"
705        ))
706    })?;
707
708    #[cfg(feature = "arrow")]
709    {
710        let data_class_for_decode = data_class.clone().unbind();
711        let pyarrow_schema = data_class
712            .getattr("_schema")
713            .ok()
714            .filter(|s| s.hasattr("_export_to_c").unwrap_or(false));
715        let schema = if let Some(py_schema) = pyarrow_schema {
716            Arc::new(pyarrow_schema_to_arrow_schema(&py_schema)?)
717        } else if let Some(schema) = registry::get_arrow_schema(&type_name) {
718            schema
719        } else {
720            Arc::new(arrow::datatypes::Schema::empty())
721        };
722
723        let encoder = Box::new(
724            move |items: &[Arc<dyn crate::data::CustomDataTrait>]| -> Result<
725                arrow::record_batch::RecordBatch,
726                anyhow::Error,
727            > { py_encode_custom_data_to_record_batch(items) },
728        );
729
730        let decoder = Box::new(
731            move |metadata: &std::collections::HashMap<String, String>,
732                  batch: arrow::record_batch::RecordBatch|
733                  -> Result<Vec<crate::data::Data>, anyhow::Error> {
734                pyo3::Python::attach(|py| {
735                    py_decode_record_batch_to_custom_data(
736                        &data_class_for_decode.clone_ref(py),
737                        metadata,
738                        batch,
739                    )
740                })
741            },
742        );
743
744        registry::ensure_arrow_registered(&type_name, schema, encoder, decoder).map_err(|e| {
745            to_pyruntime_err(format!(
746                "Failed to register Arrow encoder/decoder for {type_name}: {e}"
747            ))
748        })?;
749    }
750
751    Ok(())
752}
753
754/// Transforms the given Python objects into a vector of [`FundingRateUpdate`] objects.
755///
756/// # Errors
757///
758/// Returns a `PyErr` if element conversion fails or the data is not monotonically increasing.
759pub fn pyobjects_to_funding_rates(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<FundingRateUpdate>> {
760    let funding_rates: Vec<FundingRateUpdate> = data
761        .into_iter()
762        .map(|obj| FundingRateUpdate::from_pyobject(&obj))
763        .collect::<PyResult<Vec<FundingRateUpdate>>>()?;
764
765    // Validate monotonically increasing
766    if !is_monotonically_increasing_by_init(&funding_rates) {
767        return Err(to_pyvalue_err(ERROR_MONOTONICITY));
768    }
769
770    Ok(funding_rates)
771}
772
773#[cfg(all(test, feature = "python", feature = "ffi"))]
774mod tests {
775    use std::{ffi::CString, ptr::NonNull};
776
777    use nautilus_core::ffi::cvec::CVec;
778    use pyo3::{prelude::*, types::PyCapsule};
779    use rstest::rstest;
780
781    use super::{DataFfiCVec, drop_cvec_pycapsule};
782    use crate::data::{DataFFI, stubs::stub_bar};
783
784    #[rstest]
785    fn test_drop_cvec_pycapsule_rejects_wrong_capsule_name() {
786        Python::initialize();
787        Python::attach(|py| {
788            let capsule = data_ffi_capsule(
789                py,
790                DataFfiCVec(CVec::empty()),
791                Some(CString::new("wrong.DataFFI.CVec").unwrap()),
792            );
793
794            let err = drop_cvec_pycapsule(capsule.as_any()).unwrap_err();
795
796            assert!(err.to_string().contains("Invalid DataFFI CVec PyCapsule"));
797        });
798    }
799
800    #[rstest]
801    fn test_drop_cvec_pycapsule_rejects_len_greater_than_cap() {
802        Python::initialize();
803        Python::attach(|py| {
804            let cvec = CVec {
805                ptr: NonNull::<u8>::dangling().as_ptr().cast(),
806                len: 2,
807                cap: 1,
808            };
809            let capsule =
810                data_ffi_capsule(py, DataFfiCVec(cvec), Some(DataFfiCVec::capsule_name()));
811
812            let err = drop_cvec_pycapsule(capsule.as_any()).unwrap_err();
813
814            assert!(
815                err.to_string()
816                    .contains("Invalid DataFFI CVec metadata: len (2) > cap (1)")
817            );
818        });
819    }
820
821    #[rstest]
822    fn test_drop_cvec_pycapsule_rejects_null_non_empty_pointer() {
823        Python::initialize();
824        Python::attach(|py| {
825            let cvec = CVec {
826                ptr: std::ptr::null_mut(),
827                len: 1,
828                cap: 1,
829            };
830            let capsule =
831                data_ffi_capsule(py, DataFfiCVec(cvec), Some(DataFfiCVec::capsule_name()));
832
833            let err = drop_cvec_pycapsule(capsule.as_any()).unwrap_err();
834
835            assert!(
836                err.to_string()
837                    .contains("Invalid DataFFI CVec metadata: null ptr with len (1) and cap (1)")
838            );
839        });
840    }
841
842    #[rstest]
843    fn test_drop_cvec_pycapsule_accepts_empty_cvec() {
844        Python::initialize();
845        Python::attach(|py| {
846            let capsule = data_ffi_capsule(
847                py,
848                DataFfiCVec(CVec::empty()),
849                Some(DataFfiCVec::capsule_name()),
850            );
851
852            drop_cvec_pycapsule(capsule.as_any()).unwrap();
853        });
854    }
855
856    #[rstest]
857    fn test_drop_cvec_pycapsule_allows_repeated_drop() {
858        Python::initialize();
859        Python::attach(|py| {
860            let cvec: DataFfiCVec = vec![DataFFI::Bar(stub_bar())].into();
861            let capsule = data_ffi_capsule(py, cvec, Some(DataFfiCVec::capsule_name()));
862
863            drop_cvec_pycapsule(capsule.as_any()).unwrap();
864            drop_cvec_pycapsule(capsule.as_any()).unwrap();
865        });
866    }
867
868    fn data_ffi_capsule(
869        py: Python<'_>,
870        cvec: DataFfiCVec,
871        name: Option<CString>,
872    ) -> Bound<'_, PyCapsule> {
873        PyCapsule::new_with_destructor::<DataFfiCVec, _>(py, cvec, name, |_, _| {}).unwrap()
874    }
875}