Skip to main content

nautilus_model/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Data types for the trading domain model.
17
18pub mod bar;
19pub mod bet;
20pub mod black_scholes;
21pub mod close;
22pub mod custom;
23pub mod delta;
24pub mod deltas;
25pub mod depth;
26pub mod forward;
27pub mod funding;
28pub mod greeks;
29pub mod option_chain;
30pub mod order;
31pub mod prices;
32pub mod quote;
33pub mod registry;
34pub mod status;
35pub mod trade;
36
37#[cfg(any(test, feature = "stubs"))]
38pub mod stubs;
39
40use std::{
41    fmt::{Debug, Display},
42    hash::{Hash, Hasher},
43    str::FromStr,
44};
45
46use nautilus_core::{Params, UnixNanos};
47use serde::{Deserialize, Serialize};
48use serde_json::{Value as JsonValue, to_string};
49
50// Re-exports
51#[rustfmt::skip]  // Keep these grouped
52pub use bar::{Bar, BarSpecification, BarType};
53pub use black_scholes::Greeks;
54pub use close::InstrumentClose;
55#[cfg(feature = "python")]
56pub use custom::PythonCustomDataWrapper;
57pub use custom::{
58    CustomData, CustomDataTrait, ensure_custom_data_json_registered, register_custom_data_json,
59};
60#[cfg(feature = "python")]
61pub use custom::{
62    get_python_data_class, reconstruct_python_custom_data, register_python_data_class,
63};
64pub use delta::OrderBookDelta;
65pub use deltas::{OrderBookDeltas, OrderBookDeltas_API};
66pub use depth::{DEPTH10_LEN, OrderBookDepth10};
67pub use forward::ForwardPrice;
68pub use funding::FundingRateUpdate;
69pub use greeks::{
70    BlackScholesGreeksResult, GreeksData, HasGreeks, OptionGreekValues, PortfolioGreeks,
71    YieldCurveData, black_scholes_greeks, imply_vol_and_greeks, refine_vol_and_greeks,
72};
73pub use option_chain::{OptionChainSlice, OptionGreeks, OptionStrikeData, StrikeRange};
74pub use order::{BookOrder, NULL_ORDER};
75pub use prices::{IndexPriceUpdate, MarkPriceUpdate};
76pub use quote::QuoteTick;
77pub use registry::{
78    ArrowDecoder, ArrowEncoder, decode_custom_from_arrow, deserialize_custom_from_json,
79    encode_custom_to_arrow, ensure_arrow_registered, ensure_json_deserializer_registered,
80    get_arrow_schema, register_arrow, register_json_deserializer,
81};
82#[cfg(feature = "python")]
83pub use registry::{
84    PyExtractor, ensure_py_extractor_registered, ensure_rust_extractor_factory_registered,
85    ensure_rust_extractor_registered, get_rust_extractor, register_py_extractor,
86    register_rust_extractor, register_rust_extractor_factory, try_extract_from_py,
87};
88pub use status::InstrumentStatus;
89pub use trade::TradeTick;
90
91use crate::identifiers::{InstrumentId, Venue};
92/// A built-in Nautilus data type.
93///
94/// Not recommended for storing large amounts of data, as the largest variant is significantly
95/// larger (10x) than the smallest.
96#[derive(Debug)]
97pub enum Data {
98    Delta(OrderBookDelta),
99    Deltas(OrderBookDeltas_API),
100    Depth10(Box<OrderBookDepth10>), // This variant is significantly larger
101    Quote(QuoteTick),
102    Trade(TradeTick),
103    Bar(Bar),
104    MarkPriceUpdate(MarkPriceUpdate), // TODO: Rename to MarkPrice once Cython gone
105    IndexPriceUpdate(IndexPriceUpdate), // TODO: Rename to IndexPrice once Cython gone
106    InstrumentClose(InstrumentClose),
107    Custom(CustomData),
108}
109
110/// A C-compatible representation of [`Data`] for FFI.
111///
112/// This enum matches the standard variants of [`Data`] but excludes the `Custom`
113/// variant which is not FFI-safe.
114#[cfg(feature = "ffi")]
115#[repr(C)]
116#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
117#[allow(non_camel_case_types)]
118pub enum DataFFI {
119    Delta(OrderBookDelta),
120    Deltas(OrderBookDeltas_API),
121    Depth10(Box<OrderBookDepth10>),
122    Quote(QuoteTick),
123    Trade(TradeTick),
124    Bar(Bar),
125    MarkPriceUpdate(MarkPriceUpdate),
126    IndexPriceUpdate(IndexPriceUpdate),
127    InstrumentClose(InstrumentClose),
128}
129
130#[cfg(feature = "ffi")]
131impl TryFrom<Data> for DataFFI {
132    type Error = anyhow::Error;
133
134    fn try_from(value: Data) -> Result<Self, Self::Error> {
135        match value {
136            Data::Delta(x) => Ok(Self::Delta(x)),
137            Data::Deltas(x) => Ok(Self::Deltas(x)),
138            Data::Depth10(x) => Ok(Self::Depth10(x)),
139            Data::Quote(x) => Ok(Self::Quote(x)),
140            Data::Trade(x) => Ok(Self::Trade(x)),
141            Data::Bar(x) => Ok(Self::Bar(x)),
142            Data::MarkPriceUpdate(x) => Ok(Self::MarkPriceUpdate(x)),
143            Data::IndexPriceUpdate(x) => Ok(Self::IndexPriceUpdate(x)),
144            Data::InstrumentClose(x) => Ok(Self::InstrumentClose(x)),
145            Data::Custom(_) => anyhow::bail!("Cannot convert Data::Custom to DataFFI"),
146        }
147    }
148}
149
150#[cfg(feature = "ffi")]
151impl From<DataFFI> for Data {
152    fn from(value: DataFFI) -> Self {
153        match value {
154            DataFFI::Delta(x) => Self::Delta(x),
155            DataFFI::Deltas(x) => Self::Deltas(x),
156            DataFFI::Depth10(x) => Self::Depth10(x),
157            DataFFI::Quote(x) => Self::Quote(x),
158            DataFFI::Trade(x) => Self::Trade(x),
159            DataFFI::Bar(x) => Self::Bar(x),
160            DataFFI::MarkPriceUpdate(x) => Self::MarkPriceUpdate(x),
161            DataFFI::IndexPriceUpdate(x) => Self::IndexPriceUpdate(x),
162            DataFFI::InstrumentClose(x) => Self::InstrumentClose(x),
163        }
164    }
165}
166
167impl<'de> Deserialize<'de> for Data {
168    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
169    where
170        D: serde::Deserializer<'de>,
171    {
172        use serde::de::Error;
173        let value = serde_json::Value::deserialize(deserializer)?;
174        let type_name = value
175            .get("type")
176            .and_then(|v| v.as_str())
177            .ok_or_else(|| D::Error::custom("Missing 'type' field in Data"))?
178            .to_string();
179
180        match type_name.as_str() {
181            "OrderBookDelta" => Ok(Self::Delta(
182                serde_json::from_value(value).map_err(D::Error::custom)?,
183            )),
184            "OrderBookDeltas" => Ok(Self::Deltas(
185                serde_json::from_value(value).map_err(D::Error::custom)?,
186            )),
187            "OrderBookDepth10" => Ok(Self::Depth10(
188                serde_json::from_value(value).map_err(D::Error::custom)?,
189            )),
190            "QuoteTick" => Ok(Self::Quote(
191                serde_json::from_value(value).map_err(D::Error::custom)?,
192            )),
193            "TradeTick" => Ok(Self::Trade(
194                serde_json::from_value(value).map_err(D::Error::custom)?,
195            )),
196            "Bar" => Ok(Self::Bar(
197                serde_json::from_value(value).map_err(D::Error::custom)?,
198            )),
199            "MarkPriceUpdate" => Ok(Self::MarkPriceUpdate(
200                serde_json::from_value(value).map_err(D::Error::custom)?,
201            )),
202            "IndexPriceUpdate" => Ok(Self::IndexPriceUpdate(
203                serde_json::from_value(value).map_err(D::Error::custom)?,
204            )),
205            "InstrumentClose" => Ok(Self::InstrumentClose(
206                serde_json::from_value(value).map_err(D::Error::custom)?,
207            )),
208            _ => {
209                if let Some(data) =
210                    deserialize_custom_from_json(&type_name, &value).map_err(D::Error::custom)?
211                {
212                    Ok(data)
213                } else {
214                    Err(D::Error::custom(format!("Unknown Data type: {type_name}")))
215                }
216            }
217        }
218    }
219}
220
221impl Clone for Data {
222    fn clone(&self) -> Self {
223        match self {
224            Self::Delta(x) => Self::Delta(*x),
225            Self::Deltas(x) => Self::Deltas(x.clone()),
226            Self::Depth10(x) => Self::Depth10(x.clone()),
227            Self::Quote(x) => Self::Quote(*x),
228            Self::Trade(x) => Self::Trade(*x),
229            Self::Bar(x) => Self::Bar(*x),
230            Self::MarkPriceUpdate(x) => Self::MarkPriceUpdate(*x),
231            Self::IndexPriceUpdate(x) => Self::IndexPriceUpdate(*x),
232            Self::InstrumentClose(x) => Self::InstrumentClose(*x),
233            Self::Custom(x) => Self::Custom(x.clone()),
234        }
235    }
236}
237
238impl PartialEq for Data {
239    fn eq(&self, other: &Self) -> bool {
240        match (self, other) {
241            (Self::Delta(a), Self::Delta(b)) => a == b,
242            (Self::Deltas(a), Self::Deltas(b)) => a == b,
243            (Self::Depth10(a), Self::Depth10(b)) => a == b,
244            (Self::Quote(a), Self::Quote(b)) => a == b,
245            (Self::Trade(a), Self::Trade(b)) => a == b,
246            (Self::Bar(a), Self::Bar(b)) => a == b,
247            (Self::MarkPriceUpdate(a), Self::MarkPriceUpdate(b)) => a == b,
248            (Self::IndexPriceUpdate(a), Self::IndexPriceUpdate(b)) => a == b,
249            (Self::InstrumentClose(a), Self::InstrumentClose(b)) => a == b,
250            (Self::Custom(a), Self::Custom(b)) => a == b,
251            _ => false,
252        }
253    }
254}
255
256impl Serialize for Data {
257    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
258    where
259        S: serde::Serializer,
260    {
261        match self {
262            Self::Delta(x) => x.serialize(serializer),
263            Self::Deltas(x) => x.serialize(serializer),
264            Self::Depth10(x) => x.serialize(serializer),
265            Self::Quote(x) => x.serialize(serializer),
266            Self::Trade(x) => x.serialize(serializer),
267            Self::Bar(x) => x.serialize(serializer),
268            Self::MarkPriceUpdate(x) => x.serialize(serializer),
269            Self::IndexPriceUpdate(x) => x.serialize(serializer),
270            Self::InstrumentClose(x) => x.serialize(serializer),
271            Self::Custom(x) => x.serialize(serializer),
272        }
273    }
274}
275
276macro_rules! impl_try_from_data {
277    ($variant:ident, $type:ty) => {
278        impl TryFrom<Data> for $type {
279            type Error = ();
280
281            fn try_from(value: Data) -> Result<Self, Self::Error> {
282                match value {
283                    Data::$variant(x) => Ok(x),
284                    _ => Err(()),
285                }
286            }
287        }
288    };
289}
290
291impl TryFrom<Data> for OrderBookDepth10 {
292    type Error = ();
293
294    fn try_from(value: Data) -> Result<Self, Self::Error> {
295        match value {
296            Data::Depth10(x) => Ok(*x),
297            _ => Err(()),
298        }
299    }
300}
301
302impl_try_from_data!(Quote, QuoteTick);
303impl_try_from_data!(Delta, OrderBookDelta);
304impl_try_from_data!(Deltas, OrderBookDeltas_API);
305impl_try_from_data!(Trade, TradeTick);
306impl_try_from_data!(Bar, Bar);
307impl_try_from_data!(MarkPriceUpdate, MarkPriceUpdate);
308impl_try_from_data!(IndexPriceUpdate, IndexPriceUpdate);
309impl_try_from_data!(InstrumentClose, InstrumentClose);
310
311/// Converts a vector of `Data` items to a specific variant type.
312///
313/// Filters and converts the data vector, keeping only items that can be
314/// successfully converted to the target type `T`.
315pub fn to_variant<T: TryFrom<Data>>(data: Vec<Data>) -> Vec<T> {
316    data.into_iter()
317        .filter_map(|d| T::try_from(d).ok())
318        .collect()
319}
320
321impl Data {
322    /// Returns the instrument ID for the data.
323    pub fn instrument_id(&self) -> InstrumentId {
324        match self {
325            Self::Delta(delta) => delta.instrument_id,
326            Self::Deltas(deltas) => deltas.instrument_id,
327            Self::Depth10(depth) => depth.instrument_id,
328            Self::Quote(quote) => quote.instrument_id,
329            Self::Trade(trade) => trade.instrument_id,
330            Self::Bar(bar) => bar.bar_type.instrument_id(),
331            Self::MarkPriceUpdate(mark_price) => mark_price.instrument_id,
332            Self::IndexPriceUpdate(index_price) => index_price.instrument_id,
333            Self::InstrumentClose(close) => close.instrument_id,
334            Self::Custom(custom) => custom
335                .data_type
336                .identifier()
337                .and_then(|s| InstrumentId::from_str(s).ok())
338                .or_else(|| {
339                    custom
340                        .data_type
341                        .metadata()
342                        .and_then(|m| m.get_str("instrument_id"))
343                        .and_then(|s| InstrumentId::from_str(s).ok())
344                })
345                .unwrap_or_else(|| InstrumentId::from("NULL.NULL")),
346        }
347    }
348
349    /// Returns whether the data is a type of order book data.
350    pub fn is_order_book_data(&self) -> bool {
351        matches!(self, Self::Delta(_) | Self::Deltas(_) | Self::Depth10(_))
352    }
353}
354
355/// Marker trait for types that carry a creation timestamp.
356///
357/// `ts_init` is the moment (UNIX nanoseconds) when this value was first generated or
358/// ingested by Nautilus. It can be used for sequencing, latency measurements,
359/// or monitoring data-pipeline delays.
360pub trait HasTsInit {
361    /// Returns the UNIX timestamp (nanoseconds) when the instance was created.
362    fn ts_init(&self) -> UnixNanos;
363}
364
365/// Trait for data types that have a catalog path prefix.
366pub trait CatalogPathPrefix {
367    /// Returns the path prefix (directory name) for this data type.
368    fn path_prefix() -> &'static str;
369}
370
371/// Macro for implementing [`CatalogPathPrefix`] for data types.
372///
373/// This macro provides a convenient way to implement the trait for multiple types
374/// with their corresponding path prefixes.
375///
376/// # Parameters
377///
378/// - `$type`: The data type to implement the trait for.
379/// - `$path`: The path prefix string for that type.
380#[macro_export]
381macro_rules! impl_catalog_path_prefix {
382    ($type:ty, $path:expr) => {
383        impl $crate::data::CatalogPathPrefix for $type {
384            fn path_prefix() -> &'static str {
385                $path
386            }
387        }
388    };
389}
390
391// Standard implementations for financial data types
392impl_catalog_path_prefix!(QuoteTick, "quotes");
393impl_catalog_path_prefix!(TradeTick, "trades");
394impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
395impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
396impl_catalog_path_prefix!(Bar, "bars");
397impl_catalog_path_prefix!(IndexPriceUpdate, "index_prices");
398impl_catalog_path_prefix!(MarkPriceUpdate, "mark_prices");
399impl_catalog_path_prefix!(InstrumentClose, "instrument_closes");
400
401use crate::instruments::InstrumentAny;
402impl_catalog_path_prefix!(InstrumentAny, "instruments");
403
404impl HasTsInit for Data {
405    fn ts_init(&self) -> UnixNanos {
406        match self {
407            Self::Delta(d) => d.ts_init,
408            Self::Deltas(d) => d.ts_init,
409            Self::Depth10(d) => d.ts_init,
410            Self::Quote(q) => q.ts_init,
411            Self::Trade(t) => t.ts_init,
412            Self::Bar(b) => b.ts_init,
413            Self::MarkPriceUpdate(p) => p.ts_init,
414            Self::IndexPriceUpdate(p) => p.ts_init,
415            Self::InstrumentClose(c) => c.ts_init,
416            Self::Custom(c) => c.data.ts_init(),
417        }
418    }
419}
420
421/// Checks if the data slice is monotonically increasing by initialization timestamp.
422///
423/// Returns `true` if each element's `ts_init` is less than or equal to the next element's `ts_init`.
424pub fn is_monotonically_increasing_by_init<T: HasTsInit>(data: &[T]) -> bool {
425    data.array_windows()
426        .all(|[a, b]| a.ts_init() <= b.ts_init())
427}
428
429impl From<OrderBookDelta> for Data {
430    fn from(value: OrderBookDelta) -> Self {
431        Self::Delta(value)
432    }
433}
434
435impl From<OrderBookDeltas_API> for Data {
436    fn from(value: OrderBookDeltas_API) -> Self {
437        Self::Deltas(value)
438    }
439}
440
441impl From<OrderBookDepth10> for Data {
442    fn from(value: OrderBookDepth10) -> Self {
443        Self::Depth10(Box::new(value))
444    }
445}
446
447impl From<QuoteTick> for Data {
448    fn from(value: QuoteTick) -> Self {
449        Self::Quote(value)
450    }
451}
452
453impl From<TradeTick> for Data {
454    fn from(value: TradeTick) -> Self {
455        Self::Trade(value)
456    }
457}
458
459impl From<Bar> for Data {
460    fn from(value: Bar) -> Self {
461        Self::Bar(value)
462    }
463}
464
465impl From<MarkPriceUpdate> for Data {
466    fn from(value: MarkPriceUpdate) -> Self {
467        Self::MarkPriceUpdate(value)
468    }
469}
470
471impl From<IndexPriceUpdate> for Data {
472    fn from(value: IndexPriceUpdate) -> Self {
473        Self::IndexPriceUpdate(value)
474    }
475}
476
477impl From<InstrumentClose> for Data {
478    fn from(value: InstrumentClose) -> Self {
479        Self::InstrumentClose(value)
480    }
481}
482
483/// Builds a string-only view of a JSON value for use in topic (key=value).
484fn value_to_topic_string(v: &JsonValue) -> String {
485    if let Some(s) = v.as_str() {
486        return s.to_string();
487    }
488
489    if let Some(n) = v.as_u64() {
490        return n.to_string();
491    }
492
493    if let Some(n) = v.as_i64() {
494        return n.to_string();
495    }
496
497    if let Some(b) = v.as_bool() {
498        return b.to_string();
499    }
500
501    if let Some(f) = v.as_f64() {
502        return f.to_string();
503    }
504
505    if v.is_null() {
506        return "null".to_string();
507    }
508    serde_json::to_string(v).unwrap_or_default()
509}
510
511/// Builds the topic suffix from Params (string-only view: key=value joined by ".").
512fn params_to_topic_suffix(params: &Params) -> String {
513    params
514        .iter()
515        .map(|(k, v)| format!("{k}={}", value_to_topic_string(v)))
516        .collect::<Vec<_>>()
517        .join(".")
518}
519
520/// Represents a data type including metadata.
521#[derive(Clone, Serialize, Deserialize)]
522#[cfg_attr(
523    feature = "python",
524    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.model", from_py_object)
525)]
526#[cfg_attr(
527    feature = "python",
528    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.model")
529)]
530pub struct DataType {
531    type_name: String,
532    metadata: Option<Params>,
533    topic: String,
534    hash: u64,
535    identifier: Option<String>,
536}
537
538impl DataType {
539    /// Creates a new [`DataType`] instance.
540    pub fn new(type_name: &str, metadata: Option<Params>, identifier: Option<String>) -> Self {
541        // Precompute topic from type_name + metadata (string-only view for backward compatibility)
542        let topic = if let Some(ref meta) = metadata {
543            if meta.is_empty() {
544                type_name.to_string()
545            } else {
546                format!("{type_name}.{}", params_to_topic_suffix(meta))
547            }
548        } else {
549            type_name.to_string()
550        };
551
552        // Precompute hash
553        let mut hasher = std::collections::hash_map::DefaultHasher::new();
554        topic.hash(&mut hasher);
555
556        Self {
557            type_name: type_name.to_owned(),
558            metadata,
559            topic,
560            hash: hasher.finish(),
561            identifier,
562        }
563    }
564
565    /// Creates a [`DataType`] from persisted parts (type_name, topic, metadata).
566    /// Hash is recomputed from topic. Use when restoring from legacy data_type column.
567    /// Identifier is set to None.
568    pub fn from_parts(type_name: &str, topic: &str, metadata: Option<Params>) -> Self {
569        let mut hasher = std::collections::hash_map::DefaultHasher::new();
570        topic.hash(&mut hasher);
571        Self {
572            type_name: type_name.to_owned(),
573            metadata,
574            topic: topic.to_owned(),
575            hash: hasher.finish(),
576            identifier: None,
577        }
578    }
579
580    /// Serializes to JSON for persistence (type_name, metadata, identifier; no topic, no hash).
581    ///
582    /// # Errors
583    ///
584    /// Returns a JSON serialization error if the data cannot be serialized.
585    pub fn to_persistence_json(&self) -> Result<String, serde_json::Error> {
586        let mut map = serde_json::Map::new();
587        map.insert(
588            "type_name".to_string(),
589            serde_json::Value::String(self.type_name.clone()),
590        );
591        map.insert(
592            "metadata".to_string(),
593            self.metadata.as_ref().map_or(serde_json::Value::Null, |m| {
594                serde_json::to_value(m).unwrap_or(serde_json::Value::Null)
595            }),
596        );
597
598        if let Some(ref id) = self.identifier {
599            map.insert(
600                "identifier".to_string(),
601                serde_json::Value::String(id.clone()),
602            );
603        }
604        serde_json::to_string(&serde_json::Value::Object(map))
605    }
606
607    /// Deserializes from JSON produced by `to_persistence_json`.
608    /// Accepts legacy JSON with `topic` (ignored); topic is rebuilt from type_name + metadata.
609    ///
610    /// # Errors
611    ///
612    /// Returns an error if the string is not valid JSON or missing required fields.
613    pub fn from_persistence_json(s: &str) -> Result<Self, anyhow::Error> {
614        let value: serde_json::Value =
615            serde_json::from_str(s).map_err(|e| anyhow::anyhow!("Invalid data_type JSON: {e}"))?;
616        let obj = value
617            .as_object()
618            .ok_or_else(|| anyhow::anyhow!("data_type must be a JSON object"))?;
619        let type_name = obj
620            .get("type_name")
621            .and_then(|v| v.as_str())
622            .ok_or_else(|| anyhow::anyhow!("data_type must have type_name"))?
623            .to_string();
624        let metadata = obj.get("metadata").and_then(|m| {
625            if m.is_null() {
626                None
627            } else {
628                let p: Params = serde_json::from_value(m.clone()).ok()?;
629                if p.is_empty() { None } else { Some(p) }
630            }
631        });
632        let identifier = obj
633            .get("identifier")
634            .and_then(|v| v.as_str())
635            .map(String::from);
636        Ok(Self::new(&type_name, metadata, identifier))
637    }
638
639    /// Returns the type name for the data type.
640    pub fn type_name(&self) -> &str {
641        self.type_name.as_str()
642    }
643
644    /// Returns the metadata for the data type.
645    pub fn metadata(&self) -> Option<&Params> {
646        self.metadata.as_ref()
647    }
648
649    /// Returns a string representation of the metadata.
650    pub fn metadata_str(&self) -> String {
651        self.metadata.as_ref().map_or_else(
652            || "null".to_string(),
653            |metadata| to_string(metadata).unwrap_or_default(),
654        )
655    }
656
657    /// Returns metadata as a string-only map (e.g. for Arrow schema metadata).
658    pub fn metadata_string_map(&self) -> Option<std::collections::HashMap<String, String>> {
659        self.metadata.as_ref().map(|p| {
660            p.iter()
661                .map(|(k, v)| (k.clone(), value_to_topic_string(v)))
662                .collect()
663        })
664    }
665
666    /// Returns the precomputed hash for this data type.
667    pub fn precomputed_hash(&self) -> u64 {
668        self.hash
669    }
670
671    /// Returns the messaging topic for the data type.
672    pub fn topic(&self) -> &str {
673        self.topic.as_str()
674    }
675
676    /// Returns the optional catalog path identifier (can contain subdirs, e.g. `"venue//symbol"`).
677    pub fn identifier(&self) -> Option<&str> {
678        self.identifier.as_deref()
679    }
680
681    /// Returns an [`Option<InstrumentId>`] parsed from the metadata.
682    ///
683    /// # Panics
684    ///
685    /// This function panics if:
686    /// - There is no metadata.
687    /// - The `instrument_id` value contained in the metadata is invalid.
688    pub fn instrument_id(&self) -> Option<InstrumentId> {
689        let metadata = self.metadata.as_ref().expect("metadata was `None`");
690        let instrument_id = metadata.get_str("instrument_id")?;
691        Some(
692            InstrumentId::from_str(instrument_id)
693                .expect("Invalid `InstrumentId` for 'instrument_id'"),
694        )
695    }
696
697    /// Returns an [`Option<Venue>`] parsed from the metadata.
698    ///
699    /// # Panics
700    ///
701    /// This function panics if:
702    /// - There is no metadata.
703    /// - The `venue` value contained in the metadata is invalid.
704    pub fn venue(&self) -> Option<Venue> {
705        let metadata = self.metadata.as_ref().expect("metadata was `None`");
706        let venue_str = metadata.get_str("venue")?;
707        Some(Venue::from(venue_str))
708    }
709
710    /// Returns an [`Option<UnixNanos>`] parsed from the metadata `start` field.
711    ///
712    /// # Panics
713    ///
714    /// This function panics if:
715    /// - There is no metadata.
716    /// - The `start` value contained in the metadata is invalid.
717    pub fn start(&self) -> Option<UnixNanos> {
718        let metadata = self.metadata.as_ref()?;
719        let start_str = metadata.get_str("start")?;
720        Some(UnixNanos::from_str(start_str).expect("Invalid `UnixNanos` for 'start'"))
721    }
722
723    /// Returns an [`Option<UnixNanos>`] parsed from the metadata `end` field.
724    ///
725    /// # Panics
726    ///
727    /// This function panics if:
728    /// - There is no metadata.
729    /// - The `end` value contained in the metadata is invalid.
730    pub fn end(&self) -> Option<UnixNanos> {
731        let metadata = self.metadata.as_ref()?;
732        let end_str = metadata.get_str("end")?;
733        Some(UnixNanos::from_str(end_str).expect("Invalid `UnixNanos` for 'end'"))
734    }
735
736    /// Returns an [`Option<usize>`] parsed from the metadata `limit` field.
737    ///
738    /// # Panics
739    ///
740    /// This function panics if:
741    /// - There is no metadata.
742    /// - The `limit` value contained in the metadata is invalid.
743    pub fn limit(&self) -> Option<usize> {
744        let metadata = self.metadata.as_ref()?;
745        metadata.get_usize("limit").or_else(|| {
746            metadata
747                .get_str("limit")
748                .map(|s| s.parse::<usize>().expect("Invalid `usize` for 'limit'"))
749        })
750    }
751}
752
753impl PartialEq for DataType {
754    fn eq(&self, other: &Self) -> bool {
755        self.topic == other.topic
756    }
757}
758
759impl Eq for DataType {}
760
761impl PartialOrd for DataType {
762    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
763        Some(self.cmp(other))
764    }
765}
766
767impl Ord for DataType {
768    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
769        self.topic.cmp(&other.topic)
770    }
771}
772
773impl Hash for DataType {
774    fn hash<H: Hasher>(&self, state: &mut H) {
775        self.hash.hash(state);
776    }
777}
778
779impl Display for DataType {
780    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
781        write!(f, "{}", self.topic)
782    }
783}
784
785impl Debug for DataType {
786    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
787        write!(
788            f,
789            "DataType(type_name={}, metadata={:?}, identifier={:?})",
790            self.type_name, self.metadata, self.identifier
791        )
792    }
793}
794
795#[cfg(test)]
796mod tests {
797    use std::hash::DefaultHasher;
798
799    use rstest::*;
800    use serde_json::json;
801
802    use super::*;
803
804    fn params_from_json(value: serde_json::Value) -> Params {
805        serde_json::from_value(value).expect("valid Params JSON")
806    }
807
808    #[rstest]
809    fn test_data_type_creation_with_metadata() {
810        let metadata = Some(params_from_json(
811            json!({"key1": "value1", "key2": "value2"}),
812        ));
813        let data_type = DataType::new("ExampleType", metadata.clone(), None);
814
815        assert_eq!(data_type.type_name(), "ExampleType");
816        assert_eq!(data_type.topic(), "ExampleType.key1=value1.key2=value2");
817        assert_eq!(data_type.metadata(), metadata.as_ref());
818    }
819
820    #[rstest]
821    fn test_data_type_creation_without_metadata() {
822        let data_type = DataType::new("ExampleType", None, None);
823
824        assert_eq!(data_type.type_name(), "ExampleType");
825        assert_eq!(data_type.topic(), "ExampleType");
826        assert_eq!(data_type.metadata(), None);
827    }
828
829    #[rstest]
830    fn test_data_type_equality() {
831        let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
832        let metadata2 = Some(params_from_json(json!({"key1": "value1"})));
833
834        let data_type1 = DataType::new("ExampleType", metadata1, None);
835        let data_type2 = DataType::new("ExampleType", metadata2, None);
836
837        assert_eq!(data_type1, data_type2);
838    }
839
840    #[rstest]
841    fn test_data_type_inequality() {
842        let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
843        let metadata2 = Some(params_from_json(json!({"key2": "value2"})));
844
845        let data_type1 = DataType::new("ExampleType", metadata1, None);
846        let data_type2 = DataType::new("ExampleType", metadata2, None);
847
848        assert_ne!(data_type1, data_type2);
849    }
850
851    #[rstest]
852    fn test_data_type_ordering() {
853        let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
854        let metadata2 = Some(params_from_json(json!({"key2": "value2"})));
855
856        let data_type1 = DataType::new("ExampleTypeA", metadata1, None);
857        let data_type2 = DataType::new("ExampleTypeB", metadata2, None);
858
859        assert!(data_type1 < data_type2);
860    }
861
862    #[rstest]
863    fn test_data_type_hash() {
864        let metadata = Some(params_from_json(json!({"key1": "value1"})));
865
866        let data_type1 = DataType::new("ExampleType", metadata.clone(), None);
867        let data_type2 = DataType::new("ExampleType", metadata, None);
868
869        let mut hasher1 = DefaultHasher::new();
870        data_type1.hash(&mut hasher1);
871        let hash1 = hasher1.finish();
872
873        let mut hasher2 = DefaultHasher::new();
874        data_type2.hash(&mut hasher2);
875        let hash2 = hasher2.finish();
876
877        assert_eq!(hash1, hash2);
878    }
879
880    #[rstest]
881    fn test_data_type_display() {
882        let metadata = Some(params_from_json(json!({"key1": "value1"})));
883        let data_type = DataType::new("ExampleType", metadata, None);
884
885        assert_eq!(format!("{data_type}"), "ExampleType.key1=value1");
886    }
887
888    #[rstest]
889    fn test_data_type_debug() {
890        let metadata = Some(params_from_json(json!({"key1": "value1"})));
891        let data_type = DataType::new("ExampleType", metadata.clone(), None);
892
893        assert_eq!(
894            format!("{data_type:?}"),
895            format!("DataType(type_name=ExampleType, metadata={metadata:?}, identifier=None)")
896        );
897    }
898
899    #[rstest]
900    fn test_parse_instrument_id_from_metadata() {
901        let instrument_id_str = "MSFT.XNAS";
902        let metadata = Some(params_from_json(
903            json!({"instrument_id": instrument_id_str}),
904        ));
905        let data_type = DataType::new("InstrumentAny", metadata, None);
906
907        assert_eq!(
908            data_type.instrument_id().unwrap(),
909            InstrumentId::from_str(instrument_id_str).unwrap()
910        );
911    }
912
913    #[rstest]
914    fn test_parse_venue_from_metadata() {
915        let venue_str = "BINANCE";
916        let metadata = Some(params_from_json(json!({"venue": venue_str})));
917        let data_type = DataType::new(stringify!(InstrumentAny), metadata, None);
918
919        assert_eq!(data_type.venue().unwrap(), Venue::new(venue_str));
920    }
921
922    #[rstest]
923    fn test_parse_start_from_metadata() {
924        let start_ns = 1600054595844758000;
925        let metadata = Some(params_from_json(json!({"start": start_ns.to_string()})));
926        let data_type = DataType::new(stringify!(TradeTick), metadata, None);
927
928        assert_eq!(data_type.start().unwrap(), UnixNanos::from(start_ns),);
929    }
930
931    #[rstest]
932    fn test_parse_end_from_metadata() {
933        let end_ns = 1720954595844758000;
934        let metadata = Some(params_from_json(json!({"end": end_ns.to_string()})));
935        let data_type = DataType::new(stringify!(TradeTick), metadata, None);
936
937        assert_eq!(data_type.end().unwrap(), UnixNanos::from(end_ns),);
938    }
939
940    #[rstest]
941    fn test_parse_limit_from_metadata() {
942        let limit = 1000;
943        let metadata = Some(params_from_json(json!({"limit": limit})));
944        let data_type = DataType::new(stringify!(TradeTick), metadata, None);
945
946        assert_eq!(data_type.limit().unwrap(), limit);
947    }
948
949    #[rstest]
950    fn test_data_type_persistence_json_with_identifier() {
951        let data_type = DataType::new("MyCustomType", None, Some("venue//symbol".to_string()));
952        let json = data_type.to_persistence_json().unwrap();
953        assert!(!json.contains("topic"));
954        assert!(json.contains("\"identifier\":\"venue//symbol\""));
955        let restored = DataType::from_persistence_json(&json).unwrap();
956        assert_eq!(restored.type_name(), "MyCustomType");
957        assert_eq!(restored.identifier(), Some("venue//symbol"));
958        assert_eq!(restored.topic(), "MyCustomType");
959    }
960
961    #[rstest]
962    fn test_data_type_identifier_getter() {
963        let data_type = DataType::new("T", None, Some("id".to_string()));
964        assert_eq!(data_type.identifier(), Some("id"));
965        let data_type_no_id = DataType::new("T", None, None);
966        assert_eq!(data_type_no_id.identifier(), None);
967    }
968}