nautilus_model/python/data/
mod.rs1pub mod bar;
19pub mod bet;
20pub mod close;
21#[cfg(feature = "python")]
22pub mod custom;
23pub mod delta;
24pub mod deltas;
25pub mod depth;
26pub mod forward;
27pub mod funding;
28pub mod greeks;
29pub mod option_chain;
30pub mod order;
31pub mod prices;
32pub mod quote;
33pub mod status;
34pub mod trade;
35
36#[cfg(feature = "ffi")]
37use nautilus_core::ffi::cvec::CVec;
38#[cfg(feature = "python")]
39use nautilus_core::python::{
40 params::{params_to_pydict, pydict_to_params},
41 to_pyruntime_err, to_pytype_err, to_pyvalue_err,
42};
43#[cfg(feature = "python")]
44use pyo3::types::PyDict;
45use pyo3::{prelude::*, types::PyCapsule};
46
47#[cfg(feature = "cython-compat")]
48use crate::data::DataFFI;
49use crate::data::{
50 Bar, CustomData, Data, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
51 MarkPriceUpdate, OrderBookDelta, QuoteTick, TradeTick, close::InstrumentClose,
52 is_monotonically_increasing_by_init, register_python_data_class,
53};
54
55const ERROR_MONOTONICITY: &str = "`data` was not monotonically increasing by the `ts_init` field";
56
57#[pymethods]
58#[cfg_attr(feature = "python", pyo3_stub_gen::derive::gen_stub_pymethods)]
59impl DataType {
60 #[new]
62 #[pyo3(signature = (type_name, metadata=None, identifier=None))]
63 fn py_new(
64 py: Python<'_>,
65 type_name: &str,
66 metadata: Option<Py<PyDict>>,
67 identifier: Option<String>,
68 ) -> PyResult<Self> {
69 let params = match metadata {
70 None => None,
71 Some(d) => pydict_to_params(py, &d)?,
72 };
73 Ok(Self::new(type_name, params, identifier))
74 }
75
76 fn __richcmp__(&self, other: &Self, op: pyo3::pyclass::CompareOp, py: Python<'_>) -> Py<PyAny> {
77 use nautilus_core::python::IntoPyObjectNautilusExt;
78
79 match op {
80 pyo3::pyclass::CompareOp::Eq => (self.topic() == other.topic()).into_py_any_unwrap(py),
81 pyo3::pyclass::CompareOp::Ne => (self.topic() != other.topic()).into_py_any_unwrap(py),
82 _ => py.NotImplemented(),
83 }
84 }
85
86 fn __hash__(&self) -> isize {
87 self.precomputed_hash() as isize
88 }
89
90 #[getter]
92 #[pyo3(name = "type_name")]
93 fn py_type_name(&self) -> &str {
94 self.type_name()
95 }
96
97 #[getter]
99 #[pyo3(name = "metadata")]
100 fn py_metadata(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
101 match self.metadata() {
102 None => Ok(py.None()),
103 Some(p) => Ok(params_to_pydict(py, p)?
104 .bind(py)
105 .clone()
106 .into_any()
107 .unbind()),
108 }
109 }
110
111 #[getter]
113 #[pyo3(name = "topic")]
114 fn py_topic(&self) -> &str {
115 self.topic()
116 }
117
118 #[getter]
120 #[pyo3(name = "identifier")]
121 fn py_identifier(&self) -> Option<&str> {
122 self.identifier()
123 }
124}
125
126#[must_use]
145pub fn data_to_pycapsule(py: Python, data: Data) -> Py<PyAny> {
146 #[cfg(feature = "cython-compat")]
147 {
148 if let Ok(ffi_data) = DataFFI::try_from(data.clone()) {
150 let capsule = PyCapsule::new_with_destructor(py, ffi_data, None, |_, _| {})
151 .expect("Error creating `PyCapsule` for `DataFFI` ");
152 return capsule.into_any().unbind();
153 }
154 }
155
156 let capsule = PyCapsule::new_with_destructor(py, data, None, |_, _| {})
158 .expect("Error creating `PyCapsule` for `Data` ");
159 capsule.into_any().unbind()
160}
161
162#[cfg(feature = "ffi")]
183#[pyfunction]
184#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.model")]
185#[allow(unsafe_code)]
186pub fn drop_cvec_pycapsule(capsule: &Bound<'_, PyAny>) {
187 let capsule: &Bound<'_, PyCapsule> = capsule
188 .cast::<PyCapsule>()
189 .expect("Error on downcast to `&PyCapsule`");
190 let cvec: &CVec = unsafe { &*(capsule.pointer_checked(None).unwrap().as_ptr() as *const CVec) };
191 let data: Vec<crate::data::DataFFI> =
192 unsafe { Vec::from_raw_parts(cvec.ptr.cast::<crate::data::DataFFI>(), cvec.len, cvec.cap) };
193 drop(data);
194}
195
196#[cfg(not(feature = "ffi"))]
197#[pyfunction]
198#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.model")]
199pub fn drop_cvec_pycapsule(_capsule: &Bound<'_, PyAny>) {
206 panic!("`ffi` feature is not enabled");
207}
208
209pub fn pyobjects_to_book_deltas(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<OrderBookDelta>> {
215 let deltas: Vec<OrderBookDelta> = data
216 .into_iter()
217 .map(|obj| OrderBookDelta::from_pyobject(&obj))
218 .collect::<PyResult<Vec<OrderBookDelta>>>()?;
219
220 if !is_monotonically_increasing_by_init(&deltas) {
222 return Err(to_pyvalue_err(ERROR_MONOTONICITY));
223 }
224
225 Ok(deltas)
226}
227
228pub fn pyobjects_to_quotes(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<QuoteTick>> {
234 let quotes: Vec<QuoteTick> = data
235 .into_iter()
236 .map(|obj| QuoteTick::from_pyobject(&obj))
237 .collect::<PyResult<Vec<QuoteTick>>>()?;
238
239 if !is_monotonically_increasing_by_init("es) {
241 return Err(to_pyvalue_err(ERROR_MONOTONICITY));
242 }
243
244 Ok(quotes)
245}
246
247pub fn pyobjects_to_trades(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<TradeTick>> {
253 let trades: Vec<TradeTick> = data
254 .into_iter()
255 .map(|obj| TradeTick::from_pyobject(&obj))
256 .collect::<PyResult<Vec<TradeTick>>>()?;
257
258 if !is_monotonically_increasing_by_init(&trades) {
260 return Err(to_pyvalue_err(ERROR_MONOTONICITY));
261 }
262
263 Ok(trades)
264}
265
266pub fn pyobjects_to_bars(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<Bar>> {
272 let bars: Vec<Bar> = data
273 .into_iter()
274 .map(|obj| Bar::from_pyobject(&obj))
275 .collect::<PyResult<Vec<Bar>>>()?;
276
277 if !is_monotonically_increasing_by_init(&bars) {
279 return Err(to_pyvalue_err(ERROR_MONOTONICITY));
280 }
281
282 Ok(bars)
283}
284
285pub fn pyobjects_to_mark_prices(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<MarkPriceUpdate>> {
291 let mark_prices: Vec<MarkPriceUpdate> = data
292 .into_iter()
293 .map(|obj| MarkPriceUpdate::from_pyobject(&obj))
294 .collect::<PyResult<Vec<MarkPriceUpdate>>>()?;
295
296 if !is_monotonically_increasing_by_init(&mark_prices) {
298 return Err(to_pyvalue_err(ERROR_MONOTONICITY));
299 }
300
301 Ok(mark_prices)
302}
303
304pub fn pyobjects_to_index_prices(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<IndexPriceUpdate>> {
310 let index_prices: Vec<IndexPriceUpdate> = data
311 .into_iter()
312 .map(|obj| IndexPriceUpdate::from_pyobject(&obj))
313 .collect::<PyResult<Vec<IndexPriceUpdate>>>()?;
314
315 if !is_monotonically_increasing_by_init(&index_prices) {
317 return Err(to_pyvalue_err(ERROR_MONOTONICITY));
318 }
319
320 Ok(index_prices)
321}
322
323pub fn pyobjects_to_instrument_statuses(
329 data: Vec<Bound<'_, PyAny>>,
330) -> PyResult<Vec<InstrumentStatus>> {
331 let statuses: Vec<InstrumentStatus> = data
332 .into_iter()
333 .map(|obj| InstrumentStatus::from_pyobject(&obj))
334 .collect::<PyResult<Vec<InstrumentStatus>>>()?;
335
336 if !is_monotonically_increasing_by_init(&statuses) {
337 return Err(to_pyvalue_err(ERROR_MONOTONICITY));
338 }
339
340 Ok(statuses)
341}
342
343pub fn pyobjects_to_instrument_closes(
349 data: Vec<Bound<'_, PyAny>>,
350) -> PyResult<Vec<InstrumentClose>> {
351 let closes: Vec<InstrumentClose> = data
352 .into_iter()
353 .map(|obj| InstrumentClose::from_pyobject(&obj))
354 .collect::<PyResult<Vec<InstrumentClose>>>()?;
355
356 if !is_monotonically_increasing_by_init(&closes) {
358 return Err(to_pyvalue_err(ERROR_MONOTONICITY));
359 }
360
361 Ok(closes)
362}
363
364#[cfg(feature = "python")]
370#[pyfunction]
371pub fn deserialize_custom_from_json(type_name: &str, payload: &[u8]) -> PyResult<CustomData> {
372 use crate::data::registry;
373 let value: serde_json::Value = serde_json::from_slice(payload)
374 .map_err(|e| to_pyvalue_err(format!("Invalid JSON: {e}")))?;
375 let Some(Data::Custom(custom)) = registry::deserialize_custom_from_json(type_name, &value)
376 .map_err(|e| to_pyvalue_err(format!("Deserialization failed: {e}")))?
377 else {
378 return Err(to_pyvalue_err(format!(
379 "Custom data type \"{type_name}\" is not registered"
380 )));
381 };
382 Ok(custom)
383}
384
385#[cfg(feature = "python")]
387fn py_json_deserialize_custom_data(
388 data_class: &pyo3::Py<pyo3::PyAny>,
389 value: &serde_json::Value,
390) -> Result<std::sync::Arc<dyn crate::data::CustomDataTrait>, anyhow::Error> {
391 use std::sync::Arc;
392
393 use crate::data::PythonCustomDataWrapper;
394
395 pyo3::Python::attach(|py| {
396 let json_str = serde_json::to_string(&value)?;
397 let json_module = py
398 .import("json")
399 .map_err(|e| anyhow::anyhow!("Failed to import json: {e}"))?;
400 let py_dict = json_module
401 .call_method1("loads", (json_str,))
402 .map_err(|e| anyhow::anyhow!("Failed to parse JSON: {e}"))?;
403
404 let instance = data_class
405 .bind(py)
406 .call_method1("from_json", (py_dict,))
407 .map_err(|e| anyhow::anyhow!("Failed to call from_json: {e}"))?;
408
409 let wrapper = PythonCustomDataWrapper::new(py, &instance)
410 .map_err(|e| anyhow::anyhow!("Failed to create wrapper: {e}"))?;
411
412 Ok(Arc::new(wrapper) as Arc<dyn crate::data::CustomDataTrait>)
413 })
414}
415
416#[allow(unsafe_code)]
418#[cfg(all(feature = "python", feature = "arrow"))]
419fn py_encode_custom_data_to_record_batch(
420 items: &[std::sync::Arc<dyn crate::data::CustomDataTrait>],
421) -> Result<arrow::record_batch::RecordBatch, anyhow::Error> {
422 pyo3::Python::attach(|py| {
423 let py_items: Result<Vec<_>, _> = items.iter().map(|item| item.to_pyobject(py)).collect();
424 let py_items = py_items.map_err(|e| anyhow::anyhow!("Failed to convert to Python: {e}"))?;
425 let py_list = pyo3::types::PyList::new(py, &py_items)
426 .map_err(|e| anyhow::anyhow!("Failed to create list: {e}"))?;
427
428 let first = items
429 .first()
430 .ok_or_else(|| anyhow::anyhow!("No items to encode"))?;
431 let first_py = first.to_pyobject(py)?;
432
433 if first_py
434 .bind(py)
435 .hasattr("encode_record_batch_py")
436 .unwrap_or(false)
437 {
438 let py_batch = first_py
439 .bind(py)
440 .call_method1("encode_record_batch_py", (py_list,))
441 .map_err(|e| anyhow::anyhow!("Failed to call encode_record_batch_py: {e}"))?;
442
443 let mut ffi_array = arrow::ffi::FFI_ArrowArray::empty();
444 let mut ffi_schema = arrow::ffi::FFI_ArrowSchema::empty();
445
446 py_batch.call_method1(
447 "_export_to_c",
448 (
449 (&raw mut ffi_array as usize),
450 (&raw mut ffi_schema as usize),
451 ),
452 )?;
453
454 let schema = std::sync::Arc::new(arrow::datatypes::Schema::try_from(&ffi_schema)?);
455 let struct_array_data = unsafe {
456 arrow::ffi::from_ffi_and_data_type(
457 ffi_array,
458 arrow::datatypes::DataType::Struct(schema.fields().clone()),
459 )?
460 };
461 let struct_array = arrow::array::StructArray::from(struct_array_data);
462 Ok(arrow::record_batch::RecordBatch::from(&struct_array))
463 } else {
464 anyhow::bail!("Instances must have encode_record_batch_py method")
465 }
466 })
467}
468
469#[cfg(all(feature = "python", feature = "arrow"))]
470fn pyarrow_schema_to_arrow_schema(
471 py_schema: &pyo3::Bound<'_, pyo3::PyAny>,
472) -> PyResult<arrow::datatypes::Schema> {
473 let mut ffi_schema = arrow::ffi::FFI_ArrowSchema::empty();
474 py_schema.call_method1("_export_to_c", ((&raw mut ffi_schema as usize),))?;
475 arrow::datatypes::Schema::try_from(&ffi_schema)
476 .map_err(|e| to_pyvalue_err(format!("Failed to import PyArrow schema: {e}")))
477}
478
479#[allow(unsafe_code)]
481#[cfg(all(feature = "python", feature = "arrow"))]
482fn py_decode_record_batch_to_custom_data(
483 data_class: &pyo3::Py<pyo3::PyAny>,
484 metadata: &std::collections::HashMap<String, String>,
485 batch: arrow::record_batch::RecordBatch,
486) -> Result<Vec<crate::data::Data>, anyhow::Error> {
487 use std::sync::Arc;
488
489 use crate::data::PythonCustomDataWrapper;
490
491 pyo3::Python::attach(|py| {
492 let struct_array: arrow::array::StructArray = batch.into();
493 let array_data = arrow::array::Array::to_data(&struct_array);
494 let mut ffi_array = arrow::ffi::FFI_ArrowArray::new(&array_data);
495 let fields = match arrow::array::Array::data_type(&struct_array) {
496 arrow::datatypes::DataType::Struct(f) => f.clone(),
497 _ => unreachable!(),
498 };
499 let mut ffi_schema =
500 arrow::ffi::FFI_ArrowSchema::try_from(arrow::datatypes::DataType::Struct(fields))?;
501
502 let pyarrow = py.import("pyarrow")?;
503 let cls = pyarrow.getattr("RecordBatch")?;
504 let py_batch = cls.call_method1(
505 "_import_from_c",
506 (
507 (&raw mut ffi_array as usize),
508 (&raw mut ffi_schema as usize),
509 ),
510 )?;
511
512 let metadata_py = pyo3::types::PyDict::new(py);
513 for (k, v) in metadata {
514 metadata_py.set_item(k, v)?;
515 }
516
517 let py_list = data_class
518 .bind(py)
519 .call_method1("decode_record_batch_py", (metadata_py, py_batch))
520 .map_err(|e| anyhow::anyhow!("Failed to call decode_record_batch_py: {e}"))?;
521
522 let list = py_list
523 .cast::<pyo3::types::PyList>()
524 .map_err(|_| anyhow::anyhow!("Expected list from decode_record_batch_py"))?;
525
526 let mut result = Vec::new();
527 for item in list.iter() {
528 let wrapper = PythonCustomDataWrapper::new(py, &item)
529 .map_err(|e| anyhow::anyhow!("Failed to create wrapper: {e}"))?;
530 result.push(crate::data::Data::Custom(
531 crate::data::CustomData::from_arc(Arc::new(wrapper)),
532 ));
533 }
534 Ok(result)
535 })
536}
537
538#[cfg(feature = "python")]
569#[pyfunction]
570#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.model")]
571pub fn register_custom_data_class(data_class: &Bound<'_, PyAny>) -> PyResult<()> {
572 use std::sync::Arc;
573
574 use crate::data::registry;
575
576 let _py = data_class.py();
577
578 let type_name: String = if data_class.hasattr("type_name_static")? {
579 data_class.call_method0("type_name_static")?.extract()?
580 } else {
581 data_class.getattr("__name__")?.extract()?
582 };
583
584 #[cfg(feature = "arrow")]
585 if !data_class.hasattr("decode_record_batch_py")? {
586 return Err(to_pytype_err(
587 "Custom data class must have decode_record_batch_py(metadata, batch) class method",
588 ));
589 }
590
591 if !data_class.hasattr("from_json")? {
592 return Err(to_pytype_err(
593 "Custom data class must have from_json(data) class method (Rust macro provides it)",
594 ));
595 }
596
597 register_python_data_class(&type_name, data_class);
598
599 if let Some(extractor) = registry::get_rust_extractor(&type_name) {
600 let _ = registry::ensure_py_extractor_registered(&type_name, extractor);
601 }
602
603 let data_class_for_json = data_class.clone().unbind();
604
605 let json_deserializer = Box::new(
606 move |value: serde_json::Value| -> Result<Arc<dyn crate::data::CustomDataTrait>, anyhow::Error> {
607 pyo3::Python::attach(|py| {
608 py_json_deserialize_custom_data(&data_class_for_json.clone_ref(py), &value)
609 })
610 },
611 );
612
613 registry::ensure_json_deserializer_registered(&type_name, json_deserializer).map_err(|e| {
614 to_pyruntime_err(format!(
615 "Failed to register JSON deserializer for {type_name}: {e}"
616 ))
617 })?;
618
619 #[cfg(feature = "arrow")]
620 {
621 let data_class_for_decode = data_class.clone().unbind();
622 let pyarrow_schema = data_class
623 .getattr("_schema")
624 .ok()
625 .filter(|s| s.hasattr("_export_to_c").unwrap_or(false));
626 let schema = if let Some(py_schema) = pyarrow_schema {
627 Arc::new(pyarrow_schema_to_arrow_schema(&py_schema)?)
628 } else if let Some(schema) = registry::get_arrow_schema(&type_name) {
629 schema
630 } else {
631 Arc::new(arrow::datatypes::Schema::empty())
632 };
633
634 let encoder = Box::new(
635 move |items: &[Arc<dyn crate::data::CustomDataTrait>]| -> Result<
636 arrow::record_batch::RecordBatch,
637 anyhow::Error,
638 > { py_encode_custom_data_to_record_batch(items) },
639 );
640
641 let decoder = Box::new(
642 move |metadata: &std::collections::HashMap<String, String>,
643 batch: arrow::record_batch::RecordBatch|
644 -> Result<Vec<crate::data::Data>, anyhow::Error> {
645 pyo3::Python::attach(|py| {
646 py_decode_record_batch_to_custom_data(
647 &data_class_for_decode.clone_ref(py),
648 metadata,
649 batch,
650 )
651 })
652 },
653 );
654
655 registry::ensure_arrow_registered(&type_name, schema, encoder, decoder).map_err(|e| {
656 to_pyruntime_err(format!(
657 "Failed to register Arrow encoder/decoder for {type_name}: {e}"
658 ))
659 })?;
660 }
661
662 Ok(())
663}
664
665pub fn pyobjects_to_funding_rates(data: Vec<Bound<'_, PyAny>>) -> PyResult<Vec<FundingRateUpdate>> {
671 let funding_rates: Vec<FundingRateUpdate> = data
672 .into_iter()
673 .map(|obj| FundingRateUpdate::from_pyobject(&obj))
674 .collect::<PyResult<Vec<FundingRateUpdate>>>()?;
675
676 if !is_monotonically_increasing_by_init(&funding_rates) {
678 return Err(to_pyvalue_err(ERROR_MONOTONICITY));
679 }
680
681 Ok(funding_rates)
682}