nautilus_model/python/data/
mod.rs1pub mod bar;
19pub mod bet;
20pub mod close;
21#[cfg(feature = "python")]
22pub mod custom;
23pub mod delta;
24pub mod deltas;
25pub mod depth;
26pub mod forward;
27pub mod funding;
28pub mod greeks;
29pub mod option_chain;
30pub mod order;
31pub mod prices;
32pub mod quote;
33pub mod status;
34pub mod trade;
35
36#[cfg(feature = "ffi")]
37use nautilus_core::ffi::cvec::CVec;
38#[cfg(feature = "python")]
39use nautilus_core::python::{
40 params::{params_to_pydict, pydict_to_params},
41 to_pyruntime_err, to_pytype_err, to_pyvalue_err,
42};
43#[cfg(feature = "python")]
44use pyo3::types::PyDict;
45use pyo3::{prelude::*, types::PyCapsule};
46
47#[cfg(feature = "cython-compat")]
48use crate::data::DataFFI;
49use crate::data::{
50 Bar, CustomData, Data, DataType, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate,
51 OrderBookDelta, QuoteTick, TradeTick, close::InstrumentClose,
52 is_monotonically_increasing_by_init, register_python_data_class,
53};
54
55const ERROR_MONOTONICITY: &str = "`data` was not monotonically increasing by the `ts_init` field";
56
57#[pymethods]
58#[cfg_attr(feature = "python", pyo3_stub_gen::derive::gen_stub_pymethods)]
59impl DataType {
60 #[new]
62 #[pyo3(signature = (type_name, metadata=None, identifier=None))]
63 fn py_new(
64 py: Python<'_>,
65 type_name: &str,
66 metadata: Option<Py<PyDict>>,
67 identifier: Option<String>,
68 ) -> PyResult<Self> {
69 let params = match metadata {
70 None => None,
71 Some(d) => pydict_to_params(py, d)?,
72 };
73 Ok(Self::new(type_name, params, identifier))
74 }
75
76 fn __richcmp__(&self, other: &Self, op: pyo3::pyclass::CompareOp, py: Python<'_>) -> Py<PyAny> {
77 use nautilus_core::python::IntoPyObjectNautilusExt;
78 match op {
79 pyo3::pyclass::CompareOp::Eq => (self.topic() == other.topic()).into_py_any_unwrap(py),
80 pyo3::pyclass::CompareOp::Ne => (self.topic() != other.topic()).into_py_any_unwrap(py),
81 _ => py.NotImplemented(),
82 }
83 }
84
85 fn __hash__(&self) -> isize {
86 self.precomputed_hash() as isize
87 }
88
89 #[getter]
91 #[pyo3(name = "type_name")]
92 fn py_type_name(&self) -> &str {
93 self.type_name()
94 }
95
96 #[getter]
98 #[pyo3(name = "metadata")]
99 fn py_metadata(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
100 match self.metadata() {
101 None => Ok(py.None()),
102 Some(p) => Ok(params_to_pydict(py, p)?
103 .bind(py)
104 .clone()
105 .into_any()
106 .unbind()),
107 }
108 }
109
110 #[getter]
112 #[pyo3(name = "topic")]
113 fn py_topic(&self) -> &str {
114 self.topic()
115 }
116
117 #[getter]
119 #[pyo3(name = "identifier")]
120 fn py_identifier(&self) -> Option<&str> {
121 self.identifier()
122 }
123}
124
125#[must_use]
144pub fn data_to_pycapsule(py: Python, data: Data) -> Py<PyAny> {
145 #[cfg(feature = "cython-compat")]
146 {
147 if let Ok(ffi_data) = DataFFI::try_from(data.clone()) {
149 let capsule = PyCapsule::new_with_destructor(py, ffi_data, None, |_, _| {})
150 .expect("Error creating `PyCapsule` for `DataFFI` ");
151 return capsule.into_any().unbind();
152 }
153 }
154
155 let capsule = PyCapsule::new_with_destructor(py, data, None, |_, _| {})
157 .expect("Error creating `PyCapsule` for `Data` ");
158 capsule.into_any().unbind()
159}
160
161#[cfg(feature = "ffi")]
182#[pyfunction]
183#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.model")]
184#[allow(unsafe_code)]
185pub fn drop_cvec_pycapsule(capsule: &Bound<'_, PyAny>) {
186 let capsule: &Bound<'_, PyCapsule> = capsule
187 .cast::<PyCapsule>()
188 .expect("Error on downcast to `&PyCapsule`");
189 let cvec: &CVec = unsafe { &*(capsule.pointer_checked(None).unwrap().as_ptr() as *const CVec) };
190 let data: Vec<crate::data::DataFFI> =
191 unsafe { Vec::from_raw_parts(cvec.ptr.cast::<crate::data::DataFFI>(), cvec.len, cvec.cap) };
192 drop(data);
193}
194
195#[cfg(not(feature = "ffi"))]
196#[pyfunction]
197#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.model")]
198pub fn drop_cvec_pycapsule(_capsule: &Bound<'_, PyAny>) {
205 panic!("`ffi` feature is not enabled");
206}
207
208pub fn pyobjects_to_book_deltas(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<OrderBookDelta>> {
214 let deltas: Vec<OrderBookDelta> = data
215 .into_iter()
216 .map(|obj| OrderBookDelta::from_pyobject(&obj))
217 .collect::<PyResult<Vec<OrderBookDelta>>>()?;
218
219 if !is_monotonically_increasing_by_init(&deltas) {
221 return Err(to_pyvalue_err(ERROR_MONOTONICITY));
222 }
223
224 Ok(deltas)
225}
226
227pub fn pyobjects_to_quotes(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<QuoteTick>> {
233 let quotes: Vec<QuoteTick> = data
234 .into_iter()
235 .map(|obj| QuoteTick::from_pyobject(&obj))
236 .collect::<PyResult<Vec<QuoteTick>>>()?;
237
238 if !is_monotonically_increasing_by_init("es) {
240 return Err(to_pyvalue_err(ERROR_MONOTONICITY));
241 }
242
243 Ok(quotes)
244}
245
246pub fn pyobjects_to_trades(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<TradeTick>> {
252 let trades: Vec<TradeTick> = data
253 .into_iter()
254 .map(|obj| TradeTick::from_pyobject(&obj))
255 .collect::<PyResult<Vec<TradeTick>>>()?;
256
257 if !is_monotonically_increasing_by_init(&trades) {
259 return Err(to_pyvalue_err(ERROR_MONOTONICITY));
260 }
261
262 Ok(trades)
263}
264
265pub fn pyobjects_to_bars(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<Bar>> {
271 let bars: Vec<Bar> = data
272 .into_iter()
273 .map(|obj| Bar::from_pyobject(&obj))
274 .collect::<PyResult<Vec<Bar>>>()?;
275
276 if !is_monotonically_increasing_by_init(&bars) {
278 return Err(to_pyvalue_err(ERROR_MONOTONICITY));
279 }
280
281 Ok(bars)
282}
283
284pub fn pyobjects_to_mark_prices(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<MarkPriceUpdate>> {
290 let mark_prices: Vec<MarkPriceUpdate> = data
291 .into_iter()
292 .map(|obj| MarkPriceUpdate::from_pyobject(&obj))
293 .collect::<PyResult<Vec<MarkPriceUpdate>>>()?;
294
295 if !is_monotonically_increasing_by_init(&mark_prices) {
297 return Err(to_pyvalue_err(ERROR_MONOTONICITY));
298 }
299
300 Ok(mark_prices)
301}
302
303pub fn pyobjects_to_index_prices(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<IndexPriceUpdate>> {
309 let index_prices: Vec<IndexPriceUpdate> = data
310 .into_iter()
311 .map(|obj| IndexPriceUpdate::from_pyobject(&obj))
312 .collect::<PyResult<Vec<IndexPriceUpdate>>>()?;
313
314 if !is_monotonically_increasing_by_init(&index_prices) {
316 return Err(to_pyvalue_err(ERROR_MONOTONICITY));
317 }
318
319 Ok(index_prices)
320}
321
322pub fn pyobjects_to_instrument_closes(
328 data: Vec<Bound<'_, PyAny>>,
329) -> PyResult<Vec<InstrumentClose>> {
330 let closes: Vec<InstrumentClose> = data
331 .into_iter()
332 .map(|obj| InstrumentClose::from_pyobject(&obj))
333 .collect::<PyResult<Vec<InstrumentClose>>>()?;
334
335 if !is_monotonically_increasing_by_init(&closes) {
337 return Err(to_pyvalue_err(ERROR_MONOTONICITY));
338 }
339
340 Ok(closes)
341}
342
343#[cfg(feature = "python")]
349#[pyfunction]
350pub fn deserialize_custom_from_json(type_name: &str, payload: &[u8]) -> PyResult<CustomData> {
351 use crate::data::registry;
352 let value: serde_json::Value = serde_json::from_slice(payload)
353 .map_err(|e| to_pyvalue_err(format!("Invalid JSON: {e}")))?;
354 let Some(Data::Custom(custom)) = registry::deserialize_custom_from_json(type_name, &value)
355 .map_err(|e| to_pyvalue_err(format!("Deserialization failed: {e}")))?
356 else {
357 return Err(to_pyvalue_err(format!(
358 "Custom data type \"{type_name}\" is not registered"
359 )));
360 };
361 Ok(custom)
362}
363
364#[cfg(feature = "python")]
366fn py_json_deserialize_custom_data(
367 data_class: &pyo3::Py<pyo3::PyAny>,
368 value: &serde_json::Value,
369) -> Result<std::sync::Arc<dyn crate::data::CustomDataTrait>, anyhow::Error> {
370 use std::sync::Arc;
371
372 use crate::data::PythonCustomDataWrapper;
373
374 pyo3::Python::attach(|py| {
375 let json_str = serde_json::to_string(&value)?;
376 let json_module = py
377 .import("json")
378 .map_err(|e| anyhow::anyhow!("Failed to import json: {e}"))?;
379 let py_dict = json_module
380 .call_method1("loads", (json_str,))
381 .map_err(|e| anyhow::anyhow!("Failed to parse JSON: {e}"))?;
382
383 let instance = data_class
384 .bind(py)
385 .call_method1("from_json", (py_dict,))
386 .map_err(|e| anyhow::anyhow!("Failed to call from_json: {e}"))?;
387
388 let wrapper = PythonCustomDataWrapper::new(py, &instance)
389 .map_err(|e| anyhow::anyhow!("Failed to create wrapper: {e}"))?;
390
391 Ok(Arc::new(wrapper) as Arc<dyn crate::data::CustomDataTrait>)
392 })
393}
394
395#[allow(unsafe_code)]
397#[cfg(feature = "python")]
398fn py_encode_custom_data_to_record_batch(
399 items: &[std::sync::Arc<dyn crate::data::CustomDataTrait>],
400) -> Result<arrow::record_batch::RecordBatch, anyhow::Error> {
401 pyo3::Python::attach(|py| {
402 let py_items: Result<Vec<_>, _> = items.iter().map(|item| item.to_pyobject(py)).collect();
403 let py_items = py_items.map_err(|e| anyhow::anyhow!("Failed to convert to Python: {e}"))?;
404 let py_list = pyo3::types::PyList::new(py, &py_items)
405 .map_err(|e| anyhow::anyhow!("Failed to create list: {e}"))?;
406
407 let first = items
408 .first()
409 .ok_or_else(|| anyhow::anyhow!("No items to encode"))?;
410 let first_py = first.to_pyobject(py)?;
411
412 if first_py
413 .bind(py)
414 .hasattr("encode_record_batch_py")
415 .unwrap_or(false)
416 {
417 let py_batch = first_py
418 .bind(py)
419 .call_method1("encode_record_batch_py", (py_list,))
420 .map_err(|e| anyhow::anyhow!("Failed to call encode_record_batch_py: {e}"))?;
421
422 let mut ffi_array = arrow::ffi::FFI_ArrowArray::empty();
423 let mut ffi_schema = arrow::ffi::FFI_ArrowSchema::empty();
424
425 py_batch.call_method1(
426 "_export_to_c",
427 (
428 (&raw mut ffi_array as usize),
429 (&raw mut ffi_schema as usize),
430 ),
431 )?;
432
433 let schema = std::sync::Arc::new(arrow::datatypes::Schema::try_from(&ffi_schema)?);
434 let struct_array_data = unsafe {
435 arrow::ffi::from_ffi_and_data_type(
436 ffi_array,
437 arrow::datatypes::DataType::Struct(schema.fields().clone()),
438 )?
439 };
440 let struct_array = arrow::array::StructArray::from(struct_array_data);
441 Ok(arrow::record_batch::RecordBatch::from(&struct_array))
442 } else {
443 anyhow::bail!("Instances must have encode_record_batch_py method")
444 }
445 })
446}
447
448#[allow(unsafe_code)]
450#[cfg(feature = "python")]
451fn py_decode_record_batch_to_custom_data(
452 data_class: &pyo3::Py<pyo3::PyAny>,
453 metadata: &std::collections::HashMap<String, String>,
454 batch: arrow::record_batch::RecordBatch,
455) -> Result<Vec<crate::data::Data>, anyhow::Error> {
456 use std::sync::Arc;
457
458 use crate::data::PythonCustomDataWrapper;
459
460 pyo3::Python::attach(|py| {
461 let struct_array: arrow::array::StructArray = batch.into();
462 let array_data = arrow::array::Array::to_data(&struct_array);
463 let mut ffi_array = arrow::ffi::FFI_ArrowArray::new(&array_data);
464 let fields = match arrow::array::Array::data_type(&struct_array) {
465 arrow::datatypes::DataType::Struct(f) => f.clone(),
466 _ => unreachable!(),
467 };
468 let mut ffi_schema =
469 arrow::ffi::FFI_ArrowSchema::try_from(arrow::datatypes::DataType::Struct(fields))?;
470
471 let pyarrow = py.import("pyarrow")?;
472 let cls = pyarrow.getattr("RecordBatch")?;
473 let py_batch = cls.call_method1(
474 "_import_from_c",
475 (
476 (&raw mut ffi_array as usize),
477 (&raw mut ffi_schema as usize),
478 ),
479 )?;
480
481 let metadata_py = pyo3::types::PyDict::new(py);
482 for (k, v) in metadata {
483 metadata_py.set_item(k, v)?;
484 }
485
486 let py_list = data_class
487 .bind(py)
488 .call_method1("decode_record_batch_py", (metadata_py, py_batch))
489 .map_err(|e| anyhow::anyhow!("Failed to call decode_record_batch_py: {e}"))?;
490
491 let list = py_list
492 .cast::<pyo3::types::PyList>()
493 .map_err(|_| anyhow::anyhow!("Expected list from decode_record_batch_py"))?;
494
495 let mut result = Vec::new();
496 for item in list.iter() {
497 let wrapper = PythonCustomDataWrapper::new(py, &item)
498 .map_err(|e| anyhow::anyhow!("Failed to create wrapper: {e}"))?;
499 result.push(crate::data::Data::Custom(
500 crate::data::CustomData::from_arc(Arc::new(wrapper)),
501 ));
502 }
503 Ok(result)
504 })
505}
506
507#[cfg(feature = "python")]
538#[pyfunction]
539#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.model")]
540pub fn register_custom_data_class(data_class: &Bound<'_, PyAny>) -> PyResult<()> {
541 use std::sync::Arc;
542
543 use crate::data::registry;
544
545 let _py = data_class.py();
546
547 if !data_class.hasattr("decode_record_batch_py")? {
548 return Err(to_pytype_err(
549 "Custom data class must have decode_record_batch_py(metadata, batch) class method",
550 ));
551 }
552
553 let type_name: String = if data_class.hasattr("type_name_static")? {
554 data_class.call_method0("type_name_static")?.extract()?
555 } else {
556 data_class.getattr("__name__")?.extract()?
557 };
558
559 if !data_class.hasattr("from_json")? {
560 return Err(to_pytype_err(
561 "Custom data class must have from_json(data) class method (Rust macro provides it)",
562 ));
563 }
564
565 register_python_data_class(&type_name, data_class);
566
567 if let Some(extractor) = registry::get_rust_extractor(&type_name) {
568 let _ = registry::ensure_py_extractor_registered(&type_name, extractor);
569 }
570
571 let data_class_for_json = data_class.clone().unbind();
572 let data_class_for_decode = data_class.clone().unbind();
573
574 let json_deserializer = Box::new(
575 move |value: serde_json::Value| -> Result<Arc<dyn crate::data::CustomDataTrait>, anyhow::Error> {
576 pyo3::Python::attach(|py| {
577 py_json_deserialize_custom_data(&data_class_for_json.clone_ref(py), &value)
578 })
579 },
580 );
581
582 registry::ensure_json_deserializer_registered(&type_name, json_deserializer).map_err(|e| {
583 to_pyruntime_err(format!(
584 "Failed to register JSON deserializer for {type_name}: {e}"
585 ))
586 })?;
587
588 let schema = Arc::new(arrow::datatypes::Schema::empty());
589
590 let encoder = Box::new(
591 move |items: &[Arc<dyn crate::data::CustomDataTrait>]| -> Result<
592 arrow::record_batch::RecordBatch,
593 anyhow::Error,
594 > { py_encode_custom_data_to_record_batch(items) },
595 );
596
597 let decoder = Box::new(
598 move |metadata: &std::collections::HashMap<String, String>,
599 batch: arrow::record_batch::RecordBatch|
600 -> Result<Vec<crate::data::Data>, anyhow::Error> {
601 pyo3::Python::attach(|py| {
602 py_decode_record_batch_to_custom_data(
603 &data_class_for_decode.clone_ref(py),
604 metadata,
605 batch,
606 )
607 })
608 },
609 );
610
611 registry::ensure_arrow_registered(&type_name, schema, encoder, decoder).map_err(|e| {
612 to_pyruntime_err(format!(
613 "Failed to register Arrow encoder/decoder for {type_name}: {e}"
614 ))
615 })?;
616
617 Ok(())
618}
619
620pub fn pyobjects_to_funding_rates(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<FundingRateUpdate>> {
626 let funding_rates: Vec<FundingRateUpdate> = data
627 .into_iter()
628 .map(|obj| FundingRateUpdate::from_pyobject(&obj))
629 .collect::<PyResult<Vec<FundingRateUpdate>>>()?;
630
631 if !is_monotonically_increasing_by_init(&funding_rates) {
633 return Err(to_pyvalue_err(ERROR_MONOTONICITY));
634 }
635
636 Ok(funding_rates)
637}