Skip to main content

nautilus_model/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Data types for the trading domain model.
17
18pub mod bar;
19pub mod bet;
20pub mod black_scholes;
21pub mod close;
22pub mod custom;
23pub mod delta;
24pub mod deltas;
25pub mod depth;
26pub mod forward;
27pub mod funding;
28pub mod greeks;
29pub mod option_chain;
30pub mod order;
31pub mod prices;
32pub mod quote;
33pub mod registry;
34pub mod status;
35pub mod trade;
36
37#[cfg(any(test, feature = "stubs"))]
38pub mod stubs;
39
40use std::{
41    fmt::{Debug, Display},
42    hash::{Hash, Hasher},
43    str::FromStr,
44};
45
46use nautilus_core::{Params, UnixNanos};
47use serde::{Deserialize, Serialize};
48use serde_json::Value as JsonValue;
49
50#[cfg(feature = "defi")]
51use crate::defi::DefiData;
52// Re-exports
53#[rustfmt::skip]  // Keep these grouped
54pub use bar::{Bar, BarSpecification, BarType};
55pub use black_scholes::Greeks;
56pub use close::InstrumentClose;
57#[cfg(feature = "python")]
58pub use custom::PythonCustomDataWrapper;
59pub use custom::{
60    CustomData, CustomDataTrait, ensure_custom_data_json_registered, register_custom_data_json,
61};
62#[cfg(feature = "python")]
63pub use custom::{
64    get_python_data_class, reconstruct_python_custom_data, register_python_data_class,
65};
66pub use delta::OrderBookDelta;
67pub use deltas::{OrderBookDeltas, OrderBookDeltas_API};
68pub use depth::{DEPTH10_LEN, OrderBookDepth10};
69pub use forward::ForwardPrice;
70pub use funding::FundingRateUpdate;
71pub use greeks::{
72    BlackScholesGreeksResult, GreeksData, HasGreeks, OptionGreekValues, PortfolioGreeks,
73    YieldCurveData, black_scholes_greeks, imply_vol_and_greeks, refine_vol_and_greeks,
74};
75pub use option_chain::{OptionChainSlice, OptionGreeks, OptionStrikeData, StrikeRange};
76pub use order::{BookOrder, NULL_ORDER};
77pub use prices::{IndexPriceUpdate, MarkPriceUpdate};
78pub use quote::QuoteTick;
79#[cfg(feature = "arrow")]
80pub use registry::{
81    ArrowDecoder, ArrowEncoder, decode_custom_from_arrow, encode_custom_to_arrow,
82    ensure_arrow_registered, get_arrow_schema, register_arrow,
83};
84#[cfg(feature = "python")]
85pub use registry::{
86    PyExtractor, ensure_py_extractor_registered, ensure_rust_extractor_factory_registered,
87    ensure_rust_extractor_registered, get_rust_extractor, register_py_extractor,
88    register_rust_extractor, register_rust_extractor_factory, try_extract_from_py,
89};
90pub use registry::{
91    deserialize_custom_from_json, ensure_json_deserializer_registered, register_json_deserializer,
92};
93pub use status::InstrumentStatus;
94pub use trade::TradeTick;
95
96use crate::identifiers::{InstrumentId, Venue};
97/// A built-in Nautilus data type.
98///
99/// Not recommended for storing large amounts of data, as the largest variant is significantly
100/// larger (10x) than the smallest.
101#[derive(Debug)]
102pub enum Data {
103    Delta(OrderBookDelta),
104    Deltas(OrderBookDeltas_API),
105    Depth10(Box<OrderBookDepth10>), // This variant is significantly larger
106    Quote(QuoteTick),
107    Trade(TradeTick),
108    Bar(Bar),
109    MarkPriceUpdate(MarkPriceUpdate), // TODO: Rename to MarkPrice once Cython gone
110    IndexPriceUpdate(IndexPriceUpdate), // TODO: Rename to IndexPrice once Cython gone
111    FundingRateUpdate(FundingRateUpdate),
112    OptionGreeks(OptionGreeks),
113    InstrumentStatus(InstrumentStatus),
114    InstrumentClose(InstrumentClose),
115    Custom(CustomData),
116    #[cfg(feature = "defi")]
117    Defi(Box<DefiData>), // This variant is significantly larger
118}
119
120/// A C-compatible representation of [`Data`] for FFI.
121///
122/// This enum matches the standard variants of [`Data`] but excludes the `Custom`
123/// variant which is not FFI-safe.
124#[cfg(feature = "ffi")]
125#[repr(C)]
126#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
127#[allow(non_camel_case_types)]
128pub enum DataFFI {
129    Delta(OrderBookDelta),
130    Deltas(OrderBookDeltas_API),
131    Depth10(Box<OrderBookDepth10>),
132    Quote(QuoteTick),
133    Trade(TradeTick),
134    Bar(Bar),
135    MarkPriceUpdate(MarkPriceUpdate),
136    IndexPriceUpdate(IndexPriceUpdate),
137    InstrumentClose(InstrumentClose),
138}
139
140#[cfg(feature = "ffi")]
141impl TryFrom<Data> for DataFFI {
142    type Error = anyhow::Error;
143
144    fn try_from(value: Data) -> Result<Self, Self::Error> {
145        match value {
146            Data::Delta(x) => Ok(Self::Delta(x)),
147            Data::Deltas(x) => Ok(Self::Deltas(x)),
148            Data::Depth10(x) => Ok(Self::Depth10(x)),
149            Data::Quote(x) => Ok(Self::Quote(x)),
150            Data::Trade(x) => Ok(Self::Trade(x)),
151            Data::Bar(x) => Ok(Self::Bar(x)),
152            Data::MarkPriceUpdate(x) => Ok(Self::MarkPriceUpdate(x)),
153            Data::IndexPriceUpdate(x) => Ok(Self::IndexPriceUpdate(x)),
154            Data::FundingRateUpdate(_) => {
155                anyhow::bail!("Cannot convert Data::FundingRateUpdate to DataFFI")
156            }
157            Data::OptionGreeks(_) => {
158                anyhow::bail!("Cannot convert Data::OptionGreeks to DataFFI")
159            }
160            Data::InstrumentStatus(_) => {
161                anyhow::bail!("Cannot convert Data::InstrumentStatus to DataFFI")
162            }
163            Data::InstrumentClose(x) => Ok(Self::InstrumentClose(x)),
164            Data::Custom(_) => anyhow::bail!("Cannot convert Data::Custom to DataFFI"),
165            #[cfg(feature = "defi")]
166            Data::Defi(_) => anyhow::bail!("Cannot convert Data::Defi to DataFFI"),
167        }
168    }
169}
170
171#[cfg(feature = "ffi")]
172impl From<DataFFI> for Data {
173    fn from(value: DataFFI) -> Self {
174        match value {
175            DataFFI::Delta(x) => Self::Delta(x),
176            DataFFI::Deltas(x) => Self::Deltas(x),
177            DataFFI::Depth10(x) => Self::Depth10(x),
178            DataFFI::Quote(x) => Self::Quote(x),
179            DataFFI::Trade(x) => Self::Trade(x),
180            DataFFI::Bar(x) => Self::Bar(x),
181            DataFFI::MarkPriceUpdate(x) => Self::MarkPriceUpdate(x),
182            DataFFI::IndexPriceUpdate(x) => Self::IndexPriceUpdate(x),
183            DataFFI::InstrumentClose(x) => Self::InstrumentClose(x),
184        }
185    }
186}
187
188impl<'de> Deserialize<'de> for Data {
189    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
190    where
191        D: serde::Deserializer<'de>,
192    {
193        use serde::de::Error;
194        let value = serde_json::Value::deserialize(deserializer)?;
195        let type_name = value
196            .get("type")
197            .and_then(|v| v.as_str())
198            .ok_or_else(|| D::Error::custom("Missing 'type' field in Data"))?
199            .to_string();
200
201        match type_name.as_str() {
202            "OrderBookDelta" => Ok(Self::Delta(
203                serde_json::from_value(value).map_err(D::Error::custom)?,
204            )),
205            "OrderBookDeltas" => Ok(Self::Deltas(
206                serde_json::from_value(value).map_err(D::Error::custom)?,
207            )),
208            "OrderBookDepth10" => Ok(Self::Depth10(
209                serde_json::from_value(value).map_err(D::Error::custom)?,
210            )),
211            "QuoteTick" => Ok(Self::Quote(
212                serde_json::from_value(value).map_err(D::Error::custom)?,
213            )),
214            "TradeTick" => Ok(Self::Trade(
215                serde_json::from_value(value).map_err(D::Error::custom)?,
216            )),
217            "Bar" => Ok(Self::Bar(
218                serde_json::from_value(value).map_err(D::Error::custom)?,
219            )),
220            "MarkPriceUpdate" => Ok(Self::MarkPriceUpdate(
221                serde_json::from_value(value).map_err(D::Error::custom)?,
222            )),
223            "IndexPriceUpdate" => Ok(Self::IndexPriceUpdate(
224                serde_json::from_value(value).map_err(D::Error::custom)?,
225            )),
226            "FundingRateUpdate" => Ok(Self::FundingRateUpdate(
227                serde_json::from_value(value).map_err(D::Error::custom)?,
228            )),
229            "OptionGreeks" => Ok(Self::OptionGreeks(
230                serde_json::from_value(value).map_err(D::Error::custom)?,
231            )),
232            "InstrumentStatus" => Ok(Self::InstrumentStatus(
233                serde_json::from_value(value).map_err(D::Error::custom)?,
234            )),
235            "InstrumentClose" => Ok(Self::InstrumentClose(
236                serde_json::from_value(value).map_err(D::Error::custom)?,
237            )),
238            _ => {
239                if let Some(data) =
240                    deserialize_custom_from_json(&type_name, &value).map_err(D::Error::custom)?
241                {
242                    Ok(data)
243                } else {
244                    Err(D::Error::custom(format!("Unknown Data type: {type_name}")))
245                }
246            }
247        }
248    }
249}
250
251impl Clone for Data {
252    fn clone(&self) -> Self {
253        match self {
254            Self::Delta(x) => Self::Delta(*x),
255            Self::Deltas(x) => Self::Deltas(x.clone()),
256            Self::Depth10(x) => Self::Depth10(x.clone()),
257            Self::Quote(x) => Self::Quote(*x),
258            Self::Trade(x) => Self::Trade(*x),
259            Self::Bar(x) => Self::Bar(*x),
260            Self::MarkPriceUpdate(x) => Self::MarkPriceUpdate(*x),
261            Self::IndexPriceUpdate(x) => Self::IndexPriceUpdate(*x),
262            Self::FundingRateUpdate(x) => Self::FundingRateUpdate(*x),
263            Self::OptionGreeks(x) => Self::OptionGreeks(*x),
264            Self::InstrumentStatus(x) => Self::InstrumentStatus(*x),
265            Self::InstrumentClose(x) => Self::InstrumentClose(*x),
266            Self::Custom(x) => Self::Custom(x.clone()),
267            #[cfg(feature = "defi")]
268            Self::Defi(x) => Self::Defi(x.clone()),
269        }
270    }
271}
272
273impl PartialEq for Data {
274    fn eq(&self, other: &Self) -> bool {
275        match (self, other) {
276            (Self::Delta(a), Self::Delta(b)) => a == b,
277            (Self::Deltas(a), Self::Deltas(b)) => a == b,
278            (Self::Depth10(a), Self::Depth10(b)) => a == b,
279            (Self::Quote(a), Self::Quote(b)) => a == b,
280            (Self::Trade(a), Self::Trade(b)) => a == b,
281            (Self::Bar(a), Self::Bar(b)) => a == b,
282            (Self::MarkPriceUpdate(a), Self::MarkPriceUpdate(b)) => a == b,
283            (Self::IndexPriceUpdate(a), Self::IndexPriceUpdate(b)) => a == b,
284            (Self::FundingRateUpdate(a), Self::FundingRateUpdate(b)) => a == b,
285            (Self::OptionGreeks(a), Self::OptionGreeks(b)) => a == b,
286            (Self::InstrumentStatus(a), Self::InstrumentStatus(b)) => a == b,
287            (Self::InstrumentClose(a), Self::InstrumentClose(b)) => a == b,
288            (Self::Custom(a), Self::Custom(b)) => a == b,
289            #[cfg(feature = "defi")]
290            (Self::Defi(a), Self::Defi(b)) => a == b,
291            _ => false,
292        }
293    }
294}
295
296impl Serialize for Data {
297    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
298    where
299        S: serde::Serializer,
300    {
301        match self {
302            Self::Delta(x) => x.serialize(serializer),
303            Self::Deltas(x) => x.serialize(serializer),
304            Self::Depth10(x) => x.serialize(serializer),
305            Self::Quote(x) => x.serialize(serializer),
306            Self::Trade(x) => x.serialize(serializer),
307            Self::Bar(x) => x.serialize(serializer),
308            Self::MarkPriceUpdate(x) => x.serialize(serializer),
309            Self::IndexPriceUpdate(x) => x.serialize(serializer),
310            Self::FundingRateUpdate(x) => x.serialize(serializer),
311            Self::OptionGreeks(x) => x.serialize(serializer),
312            Self::InstrumentStatus(x) => x.serialize(serializer),
313            Self::InstrumentClose(x) => x.serialize(serializer),
314            Self::Custom(x) => x.serialize(serializer),
315            #[cfg(feature = "defi")]
316            Self::Defi(_) => Err(serde::ser::Error::custom(
317                "Data::Defi serialization is not supported",
318            )),
319        }
320    }
321}
322
323macro_rules! impl_try_from_data {
324    ($variant:ident, $type:ty) => {
325        impl TryFrom<Data> for $type {
326            type Error = ();
327
328            fn try_from(value: Data) -> Result<Self, Self::Error> {
329                match value {
330                    Data::$variant(x) => Ok(x),
331                    _ => Err(()),
332                }
333            }
334        }
335    };
336}
337
338impl TryFrom<Data> for OrderBookDepth10 {
339    type Error = ();
340
341    fn try_from(value: Data) -> Result<Self, Self::Error> {
342        match value {
343            Data::Depth10(x) => Ok(*x),
344            _ => Err(()),
345        }
346    }
347}
348
349impl_try_from_data!(Quote, QuoteTick);
350impl_try_from_data!(Delta, OrderBookDelta);
351impl_try_from_data!(Deltas, OrderBookDeltas_API);
352impl_try_from_data!(Trade, TradeTick);
353impl_try_from_data!(Bar, Bar);
354impl_try_from_data!(MarkPriceUpdate, MarkPriceUpdate);
355impl_try_from_data!(IndexPriceUpdate, IndexPriceUpdate);
356impl_try_from_data!(FundingRateUpdate, FundingRateUpdate);
357impl_try_from_data!(OptionGreeks, OptionGreeks);
358impl_try_from_data!(InstrumentStatus, InstrumentStatus);
359impl_try_from_data!(InstrumentClose, InstrumentClose);
360
361/// Converts a vector of `Data` items to a specific variant type.
362///
363/// Filters and converts the data vector, keeping only items that can be
364/// successfully converted to the target type `T`.
365#[must_use]
366pub fn to_variant<T: TryFrom<Data>>(data: Vec<Data>) -> Vec<T> {
367    data.into_iter()
368        .filter_map(|d| T::try_from(d).ok())
369        .collect()
370}
371
372impl Data {
373    /// Returns the instrument ID for the data.
374    #[must_use]
375    pub fn instrument_id(&self) -> InstrumentId {
376        match self {
377            Self::Delta(delta) => delta.instrument_id,
378            Self::Deltas(deltas) => deltas.instrument_id,
379            Self::Depth10(depth) => depth.instrument_id,
380            Self::Quote(quote) => quote.instrument_id,
381            Self::Trade(trade) => trade.instrument_id,
382            Self::Bar(bar) => bar.bar_type.instrument_id(),
383            Self::MarkPriceUpdate(mark_price) => mark_price.instrument_id,
384            Self::IndexPriceUpdate(index_price) => index_price.instrument_id,
385            Self::FundingRateUpdate(funding_rate) => funding_rate.instrument_id,
386            Self::OptionGreeks(greeks) => greeks.instrument_id,
387            Self::InstrumentStatus(status) => status.instrument_id,
388            Self::InstrumentClose(close) => close.instrument_id,
389            Self::Custom(custom) => custom
390                .data_type
391                .identifier()
392                .and_then(|s| InstrumentId::from_str(s).ok())
393                .or_else(|| {
394                    custom
395                        .data_type
396                        .metadata()
397                        .and_then(|m| m.get_str("instrument_id"))
398                        .and_then(|s| InstrumentId::from_str(s).ok())
399                })
400                .unwrap_or_else(|| InstrumentId::from("NULL.NULL")),
401            #[cfg(feature = "defi")]
402            Self::Defi(defi) => defi.instrument_id(),
403        }
404    }
405
406    /// Returns whether the data is a type of order book data.
407    #[must_use]
408    pub fn is_order_book_data(&self) -> bool {
409        matches!(self, Self::Delta(_) | Self::Deltas(_) | Self::Depth10(_))
410    }
411}
412
413/// Marker trait for types that carry a creation timestamp.
414///
415/// `ts_init` is the moment (UNIX nanoseconds) when this value was first generated or
416/// ingested by Nautilus. It can be used for sequencing, latency measurements,
417/// or monitoring data-pipeline delays.
418pub trait HasTsInit {
419    /// Returns the UNIX timestamp (nanoseconds) when the instance was created.
420    fn ts_init(&self) -> UnixNanos;
421}
422
423/// Trait for data types that have a catalog path prefix.
424pub trait CatalogPathPrefix {
425    /// Returns the path prefix (directory name) for this data type.
426    fn path_prefix() -> &'static str;
427}
428
429/// Macro for implementing [`CatalogPathPrefix`] for data types.
430///
431/// This macro provides a convenient way to implement the trait for multiple types
432/// with their corresponding path prefixes.
433///
434/// # Parameters
435///
436/// - `$type`: The data type to implement the trait for.
437/// - `$path`: The path prefix string for that type.
438#[macro_export]
439macro_rules! impl_catalog_path_prefix {
440    ($type:ty, $path:expr) => {
441        impl $crate::data::CatalogPathPrefix for $type {
442            fn path_prefix() -> &'static str {
443                $path
444            }
445        }
446    };
447}
448
449// Standard implementations for financial data types
450impl_catalog_path_prefix!(QuoteTick, "quotes");
451impl_catalog_path_prefix!(TradeTick, "trades");
452impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
453impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
454impl_catalog_path_prefix!(Bar, "bars");
455impl_catalog_path_prefix!(IndexPriceUpdate, "index_prices");
456impl_catalog_path_prefix!(MarkPriceUpdate, "mark_prices");
457impl_catalog_path_prefix!(FundingRateUpdate, "funding_rate_update");
458impl_catalog_path_prefix!(OptionGreeks, "option_greeks");
459impl_catalog_path_prefix!(InstrumentStatus, "instrument_status");
460impl_catalog_path_prefix!(InstrumentClose, "instrument_closes");
461
462use crate::instruments::InstrumentAny;
463impl_catalog_path_prefix!(InstrumentAny, "instruments");
464
465impl HasTsInit for Data {
466    fn ts_init(&self) -> UnixNanos {
467        match self {
468            Self::Delta(d) => d.ts_init,
469            Self::Deltas(d) => d.ts_init,
470            Self::Depth10(d) => d.ts_init,
471            Self::Quote(q) => q.ts_init,
472            Self::Trade(t) => t.ts_init,
473            Self::Bar(b) => b.ts_init,
474            Self::MarkPriceUpdate(p) => p.ts_init,
475            Self::IndexPriceUpdate(p) => p.ts_init,
476            Self::FundingRateUpdate(f) => f.ts_init,
477            Self::OptionGreeks(g) => g.ts_init,
478            Self::InstrumentStatus(s) => s.ts_init,
479            Self::InstrumentClose(c) => c.ts_init,
480            Self::Custom(c) => c.data.ts_init(),
481            #[cfg(feature = "defi")]
482            Self::Defi(d) => d.ts_init(),
483        }
484    }
485}
486
487/// Checks if the data slice is monotonically increasing by initialization timestamp.
488///
489/// Returns `true` if each element's `ts_init` is less than or equal to the next element's `ts_init`.
490pub fn is_monotonically_increasing_by_init<T: HasTsInit>(data: &[T]) -> bool {
491    data.array_windows()
492        .all(|[a, b]| a.ts_init() <= b.ts_init())
493}
494
495impl From<OrderBookDelta> for Data {
496    fn from(value: OrderBookDelta) -> Self {
497        Self::Delta(value)
498    }
499}
500
501impl From<OrderBookDeltas_API> for Data {
502    fn from(value: OrderBookDeltas_API) -> Self {
503        Self::Deltas(value)
504    }
505}
506
507impl From<OrderBookDepth10> for Data {
508    fn from(value: OrderBookDepth10) -> Self {
509        Self::Depth10(Box::new(value))
510    }
511}
512
513impl From<QuoteTick> for Data {
514    fn from(value: QuoteTick) -> Self {
515        Self::Quote(value)
516    }
517}
518
519impl From<TradeTick> for Data {
520    fn from(value: TradeTick) -> Self {
521        Self::Trade(value)
522    }
523}
524
525impl From<Bar> for Data {
526    fn from(value: Bar) -> Self {
527        Self::Bar(value)
528    }
529}
530
531impl From<MarkPriceUpdate> for Data {
532    fn from(value: MarkPriceUpdate) -> Self {
533        Self::MarkPriceUpdate(value)
534    }
535}
536
537impl From<IndexPriceUpdate> for Data {
538    fn from(value: IndexPriceUpdate) -> Self {
539        Self::IndexPriceUpdate(value)
540    }
541}
542
543impl From<FundingRateUpdate> for Data {
544    fn from(value: FundingRateUpdate) -> Self {
545        Self::FundingRateUpdate(value)
546    }
547}
548
549impl From<OptionGreeks> for Data {
550    fn from(value: OptionGreeks) -> Self {
551        Self::OptionGreeks(value)
552    }
553}
554
555impl From<InstrumentStatus> for Data {
556    fn from(value: InstrumentStatus) -> Self {
557        Self::InstrumentStatus(value)
558    }
559}
560
561impl From<InstrumentClose> for Data {
562    fn from(value: InstrumentClose) -> Self {
563        Self::InstrumentClose(value)
564    }
565}
566
567#[cfg(feature = "defi")]
568impl From<DefiData> for Data {
569    fn from(value: DefiData) -> Self {
570        Self::Defi(Box::new(value))
571    }
572}
573
574/// Builds a string-only view of a JSON value for use in topic (key=value).
575fn value_to_topic_string(v: &JsonValue) -> String {
576    if let Some(s) = v.as_str() {
577        return s.to_string();
578    }
579
580    if let Some(n) = v.as_u64() {
581        return n.to_string();
582    }
583
584    if let Some(n) = v.as_i64() {
585        return n.to_string();
586    }
587
588    if let Some(b) = v.as_bool() {
589        return b.to_string();
590    }
591
592    if let Some(f) = v.as_f64() {
593        return f.to_string();
594    }
595
596    if v.is_null() {
597        return "null".to_string();
598    }
599    serde_json::to_string(v).unwrap_or_default()
600}
601
602/// Builds the topic suffix from Params (string-only view: key=value joined by ".").
603fn params_to_topic_suffix(params: &Params) -> String {
604    let mut entries = params.iter().collect::<Vec<_>>();
605    entries.sort_by_key(|(key, _)| *key);
606
607    entries
608        .into_iter()
609        .map(|(k, v)| format!("{k}={}", value_to_topic_string(v)))
610        .collect::<Vec<_>>()
611        .join(".")
612}
613
614/// Represents a data type including metadata.
615#[derive(Clone, Serialize, Deserialize)]
616#[cfg_attr(
617    feature = "python",
618    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.model", from_py_object)
619)]
620#[cfg_attr(
621    feature = "python",
622    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.model")
623)]
624pub struct DataType {
625    type_name: String,
626    metadata: Option<Params>,
627    topic: String,
628    hash: u64,
629    identifier: Option<String>,
630}
631
632impl DataType {
633    /// Creates a new [`DataType`] instance.
634    #[must_use]
635    pub fn new(type_name: &str, metadata: Option<Params>, identifier: Option<String>) -> Self {
636        // Precompute topic from type_name + metadata (string-only view for backward compatibility)
637        let topic = if let Some(ref meta) = metadata {
638            if meta.is_empty() {
639                type_name.to_string()
640            } else {
641                format!("{type_name}.{}", params_to_topic_suffix(meta))
642            }
643        } else {
644            type_name.to_string()
645        };
646
647        // Precompute hash
648        let mut hasher = std::collections::hash_map::DefaultHasher::new();
649        topic.hash(&mut hasher);
650
651        Self {
652            type_name: type_name.to_owned(),
653            metadata,
654            topic,
655            hash: hasher.finish(),
656            identifier,
657        }
658    }
659
660    /// Creates a [`DataType`] from persisted parts (`type_name`, topic, metadata).
661    /// Hash is recomputed from topic. Use when restoring from legacy `data_type` column.
662    /// Identifier is set to None.
663    #[must_use]
664    pub fn from_parts(type_name: &str, topic: &str, metadata: Option<Params>) -> Self {
665        let mut hasher = std::collections::hash_map::DefaultHasher::new();
666        topic.hash(&mut hasher);
667        Self {
668            type_name: type_name.to_owned(),
669            metadata,
670            topic: topic.to_owned(),
671            hash: hasher.finish(),
672            identifier: None,
673        }
674    }
675
676    /// Serializes to JSON for persistence (`type_name`, metadata, identifier; no topic, no hash).
677    ///
678    /// # Errors
679    ///
680    /// Returns a JSON serialization error if the data cannot be serialized.
681    pub fn to_persistence_json(&self) -> Result<String, serde_json::Error> {
682        let mut map = serde_json::Map::new();
683        map.insert(
684            "type_name".to_string(),
685            serde_json::Value::String(self.type_name.clone()),
686        );
687        map.insert(
688            "metadata".to_string(),
689            self.metadata.as_ref().map_or(serde_json::Value::Null, |m| {
690                serde_json::to_value(m).unwrap_or(serde_json::Value::Null)
691            }),
692        );
693
694        if let Some(ref id) = self.identifier {
695            map.insert(
696                "identifier".to_string(),
697                serde_json::Value::String(id.clone()),
698            );
699        }
700        serde_json::to_string(&serde_json::Value::Object(map))
701    }
702
703    /// Deserializes from JSON produced by `to_persistence_json`.
704    /// Accepts legacy JSON with `topic` (ignored); topic is rebuilt from `type_name` + metadata.
705    ///
706    /// # Errors
707    ///
708    /// Returns an error if the string is not valid JSON or missing required fields.
709    pub fn from_persistence_json(s: &str) -> Result<Self, anyhow::Error> {
710        let value: serde_json::Value =
711            serde_json::from_str(s).map_err(|e| anyhow::anyhow!("Invalid data_type JSON: {e}"))?;
712        let obj = value
713            .as_object()
714            .ok_or_else(|| anyhow::anyhow!("data_type must be a JSON object"))?;
715        let type_name = obj
716            .get("type_name")
717            .and_then(|v| v.as_str())
718            .ok_or_else(|| anyhow::anyhow!("data_type must have type_name"))?
719            .to_string();
720        let metadata = obj.get("metadata").and_then(|m| {
721            if m.is_null() {
722                None
723            } else {
724                let p: Params = serde_json::from_value(m.clone()).ok()?;
725                if p.is_empty() { None } else { Some(p) }
726            }
727        });
728        let identifier = obj
729            .get("identifier")
730            .and_then(|v| v.as_str())
731            .map(String::from);
732        Ok(Self::new(&type_name, metadata, identifier))
733    }
734
735    /// Returns the type name for the data type.
736    #[must_use]
737    pub fn type_name(&self) -> &str {
738        self.type_name.as_str()
739    }
740
741    /// Returns the metadata for the data type.
742    #[must_use]
743    pub fn metadata(&self) -> Option<&Params> {
744        self.metadata.as_ref()
745    }
746
747    /// Returns a string representation of the metadata.
748    #[must_use]
749    pub fn metadata_str(&self) -> String {
750        self.metadata.as_ref().map_or_else(
751            || "null".to_string(),
752            |metadata| {
753                let mut entries = metadata.iter().collect::<Vec<_>>();
754                entries.sort_by_key(|(key, _)| *key);
755
756                let mut metadata_map = serde_json::Map::new();
757                for (key, value) in entries {
758                    metadata_map.insert(key.clone(), value.clone());
759                }
760
761                serde_json::to_string(&metadata_map).unwrap_or_default()
762            },
763        )
764    }
765
766    /// Returns metadata as a string-only map (e.g. for Arrow schema metadata).
767    #[must_use]
768    pub fn metadata_string_map(&self) -> Option<std::collections::HashMap<String, String>> {
769        self.metadata.as_ref().map(|p| {
770            p.iter()
771                .map(|(k, v)| (k.clone(), value_to_topic_string(v)))
772                .collect()
773        })
774    }
775
776    /// Returns the precomputed hash for this data type.
777    #[must_use]
778    pub fn precomputed_hash(&self) -> u64 {
779        self.hash
780    }
781
782    /// Returns the messaging topic for the data type.
783    #[must_use]
784    pub fn topic(&self) -> &str {
785        self.topic.as_str()
786    }
787
788    /// Returns the optional catalog path identifier (can contain subdirs, e.g. `"venue//symbol"`).
789    #[must_use]
790    pub fn identifier(&self) -> Option<&str> {
791        self.identifier.as_deref()
792    }
793
794    /// Returns an [`Option<InstrumentId>`] parsed from the metadata.
795    ///
796    /// # Panics
797    ///
798    /// This function panics if:
799    /// - The `instrument_id` value contained in the metadata is invalid.
800    #[must_use]
801    pub fn instrument_id(&self) -> Option<InstrumentId> {
802        let metadata = self.metadata.as_ref()?;
803        let instrument_id = metadata.get_str("instrument_id")?;
804        Some(
805            InstrumentId::from_str(instrument_id)
806                .expect("Invalid `InstrumentId` for 'instrument_id'"),
807        )
808    }
809
810    /// Returns an [`Option<Venue>`] parsed from the metadata.
811    ///
812    /// # Panics
813    ///
814    /// This function panics if:
815    /// - The `venue` value contained in the metadata is invalid.
816    #[must_use]
817    pub fn venue(&self) -> Option<Venue> {
818        let metadata = self.metadata.as_ref()?;
819        let venue_str = metadata.get_str("venue")?;
820        Some(Venue::from(venue_str))
821    }
822
823    /// Returns an [`Option<UnixNanos>`] parsed from the metadata `start` field.
824    ///
825    /// # Panics
826    ///
827    /// This function panics if:
828    /// - The `start` value contained in the metadata is invalid.
829    #[must_use]
830    pub fn start(&self) -> Option<UnixNanos> {
831        let metadata = self.metadata.as_ref()?;
832        let start_str = metadata.get_str("start")?;
833        Some(UnixNanos::from_str(start_str).expect("Invalid `UnixNanos` for 'start'"))
834    }
835
836    /// Returns an [`Option<UnixNanos>`] parsed from the metadata `end` field.
837    ///
838    /// # Panics
839    ///
840    /// This function panics if:
841    /// - The `end` value contained in the metadata is invalid.
842    #[must_use]
843    pub fn end(&self) -> Option<UnixNanos> {
844        let metadata = self.metadata.as_ref()?;
845        let end_str = metadata.get_str("end")?;
846        Some(UnixNanos::from_str(end_str).expect("Invalid `UnixNanos` for 'end'"))
847    }
848
849    /// Returns an [`Option<usize>`] parsed from the metadata `limit` field.
850    ///
851    /// # Panics
852    ///
853    /// This function panics if:
854    /// - The `limit` value contained in the metadata is invalid.
855    #[must_use]
856    pub fn limit(&self) -> Option<usize> {
857        let metadata = self.metadata.as_ref()?;
858        metadata.get_usize("limit").or_else(|| {
859            metadata
860                .get_str("limit")
861                .map(|s| s.parse::<usize>().expect("Invalid `usize` for 'limit'"))
862        })
863    }
864}
865
866impl PartialEq for DataType {
867    fn eq(&self, other: &Self) -> bool {
868        self.topic == other.topic
869    }
870}
871
872impl Eq for DataType {}
873
874impl PartialOrd for DataType {
875    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
876        Some(self.cmp(other))
877    }
878}
879
880impl Ord for DataType {
881    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
882        self.topic.cmp(&other.topic)
883    }
884}
885
886impl Hash for DataType {
887    fn hash<H: Hasher>(&self, state: &mut H) {
888        self.hash.hash(state);
889    }
890}
891
892impl Display for DataType {
893    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
894        write!(f, "{}", self.topic)
895    }
896}
897
898impl Debug for DataType {
899    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
900        write!(
901            f,
902            "DataType(type_name={}, metadata={:?}, identifier={:?})",
903            self.type_name, self.metadata, self.identifier
904        )
905    }
906}
907
908#[cfg(test)]
909mod tests {
910    use std::hash::DefaultHasher;
911
912    use rstest::*;
913    use serde_json::json;
914
915    use super::*;
916
917    fn params_from_json(value: serde_json::Value) -> Params {
918        serde_json::from_value(value).expect("valid Params JSON")
919    }
920
921    #[cfg(feature = "ffi")]
922    #[rstest]
923    fn test_funding_rate_update_does_not_convert_to_data_ffi() {
924        let funding_rate = FundingRateUpdate::new(
925            InstrumentId::from("BTCUSDT-PERP.BINANCE"),
926            "0.0001".parse().unwrap(),
927            Some(480),
928            Some(UnixNanos::from(1_000_000_000)),
929            UnixNanos::from(1),
930            UnixNanos::from(2),
931        );
932
933        let err = DataFFI::try_from(Data::FundingRateUpdate(funding_rate)).unwrap_err();
934
935        assert_eq!(
936            err.to_string(),
937            "Cannot convert Data::FundingRateUpdate to DataFFI"
938        );
939    }
940
941    #[rstest]
942    fn test_data_type_creation_with_metadata() {
943        let metadata = Some(params_from_json(
944            json!({"key1": "value1", "key2": "value2"}),
945        ));
946        let data_type = DataType::new("ExampleType", metadata.clone(), None);
947
948        assert_eq!(data_type.type_name(), "ExampleType");
949        assert_eq!(data_type.topic(), "ExampleType.key1=value1.key2=value2");
950        assert_eq!(data_type.metadata(), metadata.as_ref());
951    }
952
953    #[rstest]
954    fn test_data_type_topic_identity_uses_canonical_metadata_order() {
955        let mut metadata1 = Params::new();
956        metadata1.insert("b".to_string(), json!(2));
957        metadata1.insert("a".to_string(), json!(1));
958        let mut metadata2 = Params::new();
959        metadata2.insert("a".to_string(), json!(1));
960        metadata2.insert("b".to_string(), json!(2));
961
962        let data_type1 = DataType::new("ExampleType", Some(metadata1), None);
963        let data_type2 = DataType::new("ExampleType", Some(metadata2), None);
964        let mut hasher1 = DefaultHasher::new();
965        data_type1.hash(&mut hasher1);
966        let hash1 = hasher1.finish();
967        let mut hasher2 = DefaultHasher::new();
968        data_type2.hash(&mut hasher2);
969        let hash2 = hasher2.finish();
970
971        assert_eq!(data_type1.topic(), "ExampleType.a=1.b=2");
972        assert_eq!(data_type1.topic(), data_type2.topic());
973        assert_eq!(data_type1, data_type2);
974        assert_eq!(hash1, hash2);
975        assert_eq!(format!("{data_type1}"), format!("{data_type2}"));
976        assert_eq!(data_type1.metadata_str(), r#"{"a":1,"b":2}"#);
977        assert_eq!(data_type1.metadata_str(), data_type2.metadata_str());
978    }
979
980    #[rstest]
981    fn test_data_type_creation_without_metadata() {
982        let data_type = DataType::new("ExampleType", None, None);
983
984        assert_eq!(data_type.type_name(), "ExampleType");
985        assert_eq!(data_type.topic(), "ExampleType");
986        assert_eq!(data_type.metadata(), None);
987    }
988
989    #[rstest]
990    fn test_data_type_equality() {
991        let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
992        let metadata2 = Some(params_from_json(json!({"key1": "value1"})));
993
994        let data_type1 = DataType::new("ExampleType", metadata1, None);
995        let data_type2 = DataType::new("ExampleType", metadata2, None);
996
997        assert_eq!(data_type1, data_type2);
998    }
999
1000    #[rstest]
1001    fn test_data_type_inequality() {
1002        let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
1003        let metadata2 = Some(params_from_json(json!({"key2": "value2"})));
1004
1005        let data_type1 = DataType::new("ExampleType", metadata1, None);
1006        let data_type2 = DataType::new("ExampleType", metadata2, None);
1007
1008        assert_ne!(data_type1, data_type2);
1009    }
1010
1011    #[rstest]
1012    fn test_data_type_ordering() {
1013        let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
1014        let metadata2 = Some(params_from_json(json!({"key2": "value2"})));
1015
1016        let data_type1 = DataType::new("ExampleTypeA", metadata1, None);
1017        let data_type2 = DataType::new("ExampleTypeB", metadata2, None);
1018
1019        assert!(data_type1 < data_type2);
1020    }
1021
1022    #[rstest]
1023    fn test_data_type_hash() {
1024        let metadata = Some(params_from_json(json!({"key1": "value1"})));
1025
1026        let data_type1 = DataType::new("ExampleType", metadata.clone(), None);
1027        let data_type2 = DataType::new("ExampleType", metadata, None);
1028
1029        let mut hasher1 = DefaultHasher::new();
1030        data_type1.hash(&mut hasher1);
1031        let hash1 = hasher1.finish();
1032
1033        let mut hasher2 = DefaultHasher::new();
1034        data_type2.hash(&mut hasher2);
1035        let hash2 = hasher2.finish();
1036
1037        assert_eq!(hash1, hash2);
1038    }
1039
1040    #[rstest]
1041    fn test_data_type_display() {
1042        let metadata = Some(params_from_json(json!({"key1": "value1"})));
1043        let data_type = DataType::new("ExampleType", metadata, None);
1044
1045        assert_eq!(format!("{data_type}"), "ExampleType.key1=value1");
1046    }
1047
1048    #[rstest]
1049    fn test_data_type_debug() {
1050        let metadata = Some(params_from_json(json!({"key1": "value1"})));
1051        let data_type = DataType::new("ExampleType", metadata.clone(), None);
1052
1053        assert_eq!(
1054            format!("{data_type:?}"),
1055            format!("DataType(type_name=ExampleType, metadata={metadata:?}, identifier=None)")
1056        );
1057    }
1058
1059    #[rstest]
1060    fn test_parse_instrument_id_from_metadata() {
1061        let instrument_id_str = "MSFT.XNAS";
1062        let metadata = Some(params_from_json(
1063            json!({"instrument_id": instrument_id_str}),
1064        ));
1065        let data_type = DataType::new("InstrumentAny", metadata, None);
1066
1067        assert_eq!(
1068            data_type.instrument_id().unwrap(),
1069            InstrumentId::from_str(instrument_id_str).unwrap()
1070        );
1071    }
1072
1073    #[rstest]
1074    fn test_parse_venue_from_metadata() {
1075        let venue_str = "BINANCE";
1076        let metadata = Some(params_from_json(json!({"venue": venue_str})));
1077        let data_type = DataType::new(stringify!(InstrumentAny), metadata, None);
1078
1079        assert_eq!(data_type.venue().unwrap(), Venue::new(venue_str));
1080    }
1081
1082    #[rstest]
1083    fn test_parse_start_from_metadata() {
1084        let start_ns = 1_600_054_595_844_758_000;
1085        let metadata = Some(params_from_json(json!({"start": start_ns.to_string()})));
1086        let data_type = DataType::new(stringify!(TradeTick), metadata, None);
1087
1088        assert_eq!(data_type.start().unwrap(), UnixNanos::from(start_ns),);
1089    }
1090
1091    #[rstest]
1092    fn test_parse_end_from_metadata() {
1093        let end_ns = 1_720_954_595_844_758_000;
1094        let metadata = Some(params_from_json(json!({"end": end_ns.to_string()})));
1095        let data_type = DataType::new(stringify!(TradeTick), metadata, None);
1096
1097        assert_eq!(data_type.end().unwrap(), UnixNanos::from(end_ns),);
1098    }
1099
1100    #[rstest]
1101    fn test_parse_limit_from_metadata() {
1102        let limit = 1000;
1103        let metadata = Some(params_from_json(json!({"limit": limit})));
1104        let data_type = DataType::new(stringify!(TradeTick), metadata, None);
1105
1106        assert_eq!(data_type.limit().unwrap(), limit);
1107    }
1108
1109    #[rstest]
1110    fn test_data_type_metadata_accessors_return_none_without_metadata() {
1111        let data_type = DataType::new(stringify!(TradeTick), None, None);
1112
1113        assert_eq!(data_type.instrument_id(), None);
1114        assert_eq!(data_type.venue(), None);
1115        assert_eq!(data_type.start(), None);
1116        assert_eq!(data_type.end(), None);
1117    }
1118
1119    #[rstest]
1120    fn test_data_type_persistence_json_with_identifier() {
1121        let data_type = DataType::new("MyCustomType", None, Some("venue//symbol".to_string()));
1122        let json = data_type.to_persistence_json().unwrap();
1123        assert!(!json.contains("topic"));
1124        assert!(json.contains("\"identifier\":\"venue//symbol\""));
1125        let restored = DataType::from_persistence_json(&json).unwrap();
1126        assert_eq!(restored.type_name(), "MyCustomType");
1127        assert_eq!(restored.identifier(), Some("venue//symbol"));
1128        assert_eq!(restored.topic(), "MyCustomType");
1129    }
1130
1131    #[rstest]
1132    fn test_data_type_from_persistence_json_rebuilds_canonical_topic() {
1133        let json = r#"{
1134            "type_name": "ExampleType",
1135            "topic": "ExampleType.z=9.a=1",
1136            "metadata": {"z": 9, "a": 1}
1137        }"#;
1138
1139        let restored = DataType::from_persistence_json(json).unwrap();
1140
1141        assert_eq!(restored.topic(), "ExampleType.a=1.z=9");
1142    }
1143
1144    #[rstest]
1145    fn test_data_type_identifier_getter() {
1146        let data_type = DataType::new("T", None, Some("id".to_string()));
1147        assert_eq!(data_type.identifier(), Some("id"));
1148        let data_type_no_id = DataType::new("T", None, None);
1149        assert_eq!(data_type_no_id.identifier(), None);
1150    }
1151}