Skip to main content

nautilus_model/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Data types for the trading domain model.
17
18pub mod bar;
19pub mod bet;
20pub mod black_scholes;
21pub mod close;
22pub mod custom;
23pub mod delta;
24pub mod deltas;
25pub mod depth;
26pub mod forward;
27pub mod funding;
28pub mod greeks;
29pub mod option_chain;
30pub mod order;
31pub mod prices;
32pub mod quote;
33pub mod registry;
34pub mod status;
35pub mod trade;
36
37#[cfg(any(test, feature = "stubs"))]
38pub mod stubs;
39
40use std::{
41    fmt::{Debug, Display},
42    hash::{Hash, Hasher},
43    str::FromStr,
44};
45
46use nautilus_core::{Params, UnixNanos};
47use serde::{Deserialize, Serialize};
48use serde_json::{Value as JsonValue, to_string};
49
50// Re-exports
51#[rustfmt::skip]  // Keep these grouped
52pub use bar::{Bar, BarSpecification, BarType};
53pub use black_scholes::Greeks;
54pub use close::InstrumentClose;
55#[cfg(feature = "python")]
56pub use custom::PythonCustomDataWrapper;
57pub use custom::{
58    CustomData, CustomDataTrait, ensure_custom_data_json_registered, register_custom_data_json,
59};
60#[cfg(feature = "python")]
61pub use custom::{
62    get_python_data_class, reconstruct_python_custom_data, register_python_data_class,
63};
64pub use delta::OrderBookDelta;
65pub use deltas::{OrderBookDeltas, OrderBookDeltas_API};
66pub use depth::{DEPTH10_LEN, OrderBookDepth10};
67pub use forward::ForwardPrice;
68pub use funding::FundingRateUpdate;
69pub use greeks::{
70    BlackScholesGreeksResult, GreeksData, HasGreeks, OptionGreekValues, PortfolioGreeks,
71    YieldCurveData, black_scholes_greeks, imply_vol_and_greeks, refine_vol_and_greeks,
72};
73pub use option_chain::{OptionChainSlice, OptionGreeks, OptionStrikeData, StrikeRange};
74pub use order::{BookOrder, NULL_ORDER};
75pub use prices::{IndexPriceUpdate, MarkPriceUpdate};
76pub use quote::QuoteTick;
77pub use registry::{
78    ArrowDecoder, ArrowEncoder, decode_custom_from_arrow, deserialize_custom_from_json,
79    encode_custom_to_arrow, ensure_arrow_registered, ensure_json_deserializer_registered,
80    get_arrow_schema, register_arrow, register_json_deserializer,
81};
82#[cfg(feature = "python")]
83pub use registry::{
84    PyExtractor, ensure_py_extractor_registered, ensure_rust_extractor_factory_registered,
85    ensure_rust_extractor_registered, get_rust_extractor, register_py_extractor,
86    register_rust_extractor, register_rust_extractor_factory, try_extract_from_py,
87};
88pub use status::InstrumentStatus;
89pub use trade::TradeTick;
90
91use crate::identifiers::{InstrumentId, Venue};
92/// A built-in Nautilus data type.
93///
94/// Not recommended for storing large amounts of data, as the largest variant is significantly
95/// larger (10x) than the smallest.
96#[derive(Debug)]
97pub enum Data {
98    Delta(OrderBookDelta),
99    Deltas(OrderBookDeltas_API),
100    Depth10(Box<OrderBookDepth10>), // This variant is significantly larger
101    Quote(QuoteTick),
102    Trade(TradeTick),
103    Bar(Bar),
104    MarkPriceUpdate(MarkPriceUpdate), // TODO: Rename to MarkPrice once Cython gone
105    IndexPriceUpdate(IndexPriceUpdate), // TODO: Rename to IndexPrice once Cython gone
106    InstrumentStatus(InstrumentStatus),
107    InstrumentClose(InstrumentClose),
108    Custom(CustomData),
109}
110
111/// A C-compatible representation of [`Data`] for FFI.
112///
113/// This enum matches the standard variants of [`Data`] but excludes the `Custom`
114/// variant which is not FFI-safe.
115#[cfg(feature = "ffi")]
116#[repr(C)]
117#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
118#[allow(non_camel_case_types)]
119pub enum DataFFI {
120    Delta(OrderBookDelta),
121    Deltas(OrderBookDeltas_API),
122    Depth10(Box<OrderBookDepth10>),
123    Quote(QuoteTick),
124    Trade(TradeTick),
125    Bar(Bar),
126    MarkPriceUpdate(MarkPriceUpdate),
127    IndexPriceUpdate(IndexPriceUpdate),
128    InstrumentClose(InstrumentClose),
129}
130
131#[cfg(feature = "ffi")]
132impl TryFrom<Data> for DataFFI {
133    type Error = anyhow::Error;
134
135    fn try_from(value: Data) -> Result<Self, Self::Error> {
136        match value {
137            Data::Delta(x) => Ok(Self::Delta(x)),
138            Data::Deltas(x) => Ok(Self::Deltas(x)),
139            Data::Depth10(x) => Ok(Self::Depth10(x)),
140            Data::Quote(x) => Ok(Self::Quote(x)),
141            Data::Trade(x) => Ok(Self::Trade(x)),
142            Data::Bar(x) => Ok(Self::Bar(x)),
143            Data::MarkPriceUpdate(x) => Ok(Self::MarkPriceUpdate(x)),
144            Data::IndexPriceUpdate(x) => Ok(Self::IndexPriceUpdate(x)),
145            Data::InstrumentStatus(_) => {
146                anyhow::bail!("Cannot convert Data::InstrumentStatus to DataFFI")
147            }
148            Data::InstrumentClose(x) => Ok(Self::InstrumentClose(x)),
149            Data::Custom(_) => anyhow::bail!("Cannot convert Data::Custom to DataFFI"),
150        }
151    }
152}
153
154#[cfg(feature = "ffi")]
155impl From<DataFFI> for Data {
156    fn from(value: DataFFI) -> Self {
157        match value {
158            DataFFI::Delta(x) => Self::Delta(x),
159            DataFFI::Deltas(x) => Self::Deltas(x),
160            DataFFI::Depth10(x) => Self::Depth10(x),
161            DataFFI::Quote(x) => Self::Quote(x),
162            DataFFI::Trade(x) => Self::Trade(x),
163            DataFFI::Bar(x) => Self::Bar(x),
164            DataFFI::MarkPriceUpdate(x) => Self::MarkPriceUpdate(x),
165            DataFFI::IndexPriceUpdate(x) => Self::IndexPriceUpdate(x),
166            DataFFI::InstrumentClose(x) => Self::InstrumentClose(x),
167        }
168    }
169}
170
171impl<'de> Deserialize<'de> for Data {
172    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
173    where
174        D: serde::Deserializer<'de>,
175    {
176        use serde::de::Error;
177        let value = serde_json::Value::deserialize(deserializer)?;
178        let type_name = value
179            .get("type")
180            .and_then(|v| v.as_str())
181            .ok_or_else(|| D::Error::custom("Missing 'type' field in Data"))?
182            .to_string();
183
184        match type_name.as_str() {
185            "OrderBookDelta" => Ok(Self::Delta(
186                serde_json::from_value(value).map_err(D::Error::custom)?,
187            )),
188            "OrderBookDeltas" => Ok(Self::Deltas(
189                serde_json::from_value(value).map_err(D::Error::custom)?,
190            )),
191            "OrderBookDepth10" => Ok(Self::Depth10(
192                serde_json::from_value(value).map_err(D::Error::custom)?,
193            )),
194            "QuoteTick" => Ok(Self::Quote(
195                serde_json::from_value(value).map_err(D::Error::custom)?,
196            )),
197            "TradeTick" => Ok(Self::Trade(
198                serde_json::from_value(value).map_err(D::Error::custom)?,
199            )),
200            "Bar" => Ok(Self::Bar(
201                serde_json::from_value(value).map_err(D::Error::custom)?,
202            )),
203            "MarkPriceUpdate" => Ok(Self::MarkPriceUpdate(
204                serde_json::from_value(value).map_err(D::Error::custom)?,
205            )),
206            "IndexPriceUpdate" => Ok(Self::IndexPriceUpdate(
207                serde_json::from_value(value).map_err(D::Error::custom)?,
208            )),
209            "InstrumentStatus" => Ok(Self::InstrumentStatus(
210                serde_json::from_value(value).map_err(D::Error::custom)?,
211            )),
212            "InstrumentClose" => Ok(Self::InstrumentClose(
213                serde_json::from_value(value).map_err(D::Error::custom)?,
214            )),
215            _ => {
216                if let Some(data) =
217                    deserialize_custom_from_json(&type_name, &value).map_err(D::Error::custom)?
218                {
219                    Ok(data)
220                } else {
221                    Err(D::Error::custom(format!("Unknown Data type: {type_name}")))
222                }
223            }
224        }
225    }
226}
227
228impl Clone for Data {
229    fn clone(&self) -> Self {
230        match self {
231            Self::Delta(x) => Self::Delta(*x),
232            Self::Deltas(x) => Self::Deltas(x.clone()),
233            Self::Depth10(x) => Self::Depth10(x.clone()),
234            Self::Quote(x) => Self::Quote(*x),
235            Self::Trade(x) => Self::Trade(*x),
236            Self::Bar(x) => Self::Bar(*x),
237            Self::MarkPriceUpdate(x) => Self::MarkPriceUpdate(*x),
238            Self::IndexPriceUpdate(x) => Self::IndexPriceUpdate(*x),
239            Self::InstrumentStatus(x) => Self::InstrumentStatus(*x),
240            Self::InstrumentClose(x) => Self::InstrumentClose(*x),
241            Self::Custom(x) => Self::Custom(x.clone()),
242        }
243    }
244}
245
246impl PartialEq for Data {
247    fn eq(&self, other: &Self) -> bool {
248        match (self, other) {
249            (Self::Delta(a), Self::Delta(b)) => a == b,
250            (Self::Deltas(a), Self::Deltas(b)) => a == b,
251            (Self::Depth10(a), Self::Depth10(b)) => a == b,
252            (Self::Quote(a), Self::Quote(b)) => a == b,
253            (Self::Trade(a), Self::Trade(b)) => a == b,
254            (Self::Bar(a), Self::Bar(b)) => a == b,
255            (Self::MarkPriceUpdate(a), Self::MarkPriceUpdate(b)) => a == b,
256            (Self::IndexPriceUpdate(a), Self::IndexPriceUpdate(b)) => a == b,
257            (Self::InstrumentStatus(a), Self::InstrumentStatus(b)) => a == b,
258            (Self::InstrumentClose(a), Self::InstrumentClose(b)) => a == b,
259            (Self::Custom(a), Self::Custom(b)) => a == b,
260            _ => false,
261        }
262    }
263}
264
265impl Serialize for Data {
266    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
267    where
268        S: serde::Serializer,
269    {
270        match self {
271            Self::Delta(x) => x.serialize(serializer),
272            Self::Deltas(x) => x.serialize(serializer),
273            Self::Depth10(x) => x.serialize(serializer),
274            Self::Quote(x) => x.serialize(serializer),
275            Self::Trade(x) => x.serialize(serializer),
276            Self::Bar(x) => x.serialize(serializer),
277            Self::MarkPriceUpdate(x) => x.serialize(serializer),
278            Self::IndexPriceUpdate(x) => x.serialize(serializer),
279            Self::InstrumentStatus(x) => x.serialize(serializer),
280            Self::InstrumentClose(x) => x.serialize(serializer),
281            Self::Custom(x) => x.serialize(serializer),
282        }
283    }
284}
285
286macro_rules! impl_try_from_data {
287    ($variant:ident, $type:ty) => {
288        impl TryFrom<Data> for $type {
289            type Error = ();
290
291            fn try_from(value: Data) -> Result<Self, Self::Error> {
292                match value {
293                    Data::$variant(x) => Ok(x),
294                    _ => Err(()),
295                }
296            }
297        }
298    };
299}
300
301impl TryFrom<Data> for OrderBookDepth10 {
302    type Error = ();
303
304    fn try_from(value: Data) -> Result<Self, Self::Error> {
305        match value {
306            Data::Depth10(x) => Ok(*x),
307            _ => Err(()),
308        }
309    }
310}
311
312impl_try_from_data!(Quote, QuoteTick);
313impl_try_from_data!(Delta, OrderBookDelta);
314impl_try_from_data!(Deltas, OrderBookDeltas_API);
315impl_try_from_data!(Trade, TradeTick);
316impl_try_from_data!(Bar, Bar);
317impl_try_from_data!(MarkPriceUpdate, MarkPriceUpdate);
318impl_try_from_data!(IndexPriceUpdate, IndexPriceUpdate);
319impl_try_from_data!(InstrumentStatus, InstrumentStatus);
320impl_try_from_data!(InstrumentClose, InstrumentClose);
321
322/// Converts a vector of `Data` items to a specific variant type.
323///
324/// Filters and converts the data vector, keeping only items that can be
325/// successfully converted to the target type `T`.
326#[must_use]
327pub fn to_variant<T: TryFrom<Data>>(data: Vec<Data>) -> Vec<T> {
328    data.into_iter()
329        .filter_map(|d| T::try_from(d).ok())
330        .collect()
331}
332
333impl Data {
334    /// Returns the instrument ID for the data.
335    #[must_use]
336    pub fn instrument_id(&self) -> InstrumentId {
337        match self {
338            Self::Delta(delta) => delta.instrument_id,
339            Self::Deltas(deltas) => deltas.instrument_id,
340            Self::Depth10(depth) => depth.instrument_id,
341            Self::Quote(quote) => quote.instrument_id,
342            Self::Trade(trade) => trade.instrument_id,
343            Self::Bar(bar) => bar.bar_type.instrument_id(),
344            Self::MarkPriceUpdate(mark_price) => mark_price.instrument_id,
345            Self::IndexPriceUpdate(index_price) => index_price.instrument_id,
346            Self::InstrumentStatus(status) => status.instrument_id,
347            Self::InstrumentClose(close) => close.instrument_id,
348            Self::Custom(custom) => custom
349                .data_type
350                .identifier()
351                .and_then(|s| InstrumentId::from_str(s).ok())
352                .or_else(|| {
353                    custom
354                        .data_type
355                        .metadata()
356                        .and_then(|m| m.get_str("instrument_id"))
357                        .and_then(|s| InstrumentId::from_str(s).ok())
358                })
359                .unwrap_or_else(|| InstrumentId::from("NULL.NULL")),
360        }
361    }
362
363    /// Returns whether the data is a type of order book data.
364    #[must_use]
365    pub fn is_order_book_data(&self) -> bool {
366        matches!(self, Self::Delta(_) | Self::Deltas(_) | Self::Depth10(_))
367    }
368}
369
370/// Marker trait for types that carry a creation timestamp.
371///
372/// `ts_init` is the moment (UNIX nanoseconds) when this value was first generated or
373/// ingested by Nautilus. It can be used for sequencing, latency measurements,
374/// or monitoring data-pipeline delays.
375pub trait HasTsInit {
376    /// Returns the UNIX timestamp (nanoseconds) when the instance was created.
377    fn ts_init(&self) -> UnixNanos;
378}
379
380/// Trait for data types that have a catalog path prefix.
381pub trait CatalogPathPrefix {
382    /// Returns the path prefix (directory name) for this data type.
383    fn path_prefix() -> &'static str;
384}
385
386/// Macro for implementing [`CatalogPathPrefix`] for data types.
387///
388/// This macro provides a convenient way to implement the trait for multiple types
389/// with their corresponding path prefixes.
390///
391/// # Parameters
392///
393/// - `$type`: The data type to implement the trait for.
394/// - `$path`: The path prefix string for that type.
395#[macro_export]
396macro_rules! impl_catalog_path_prefix {
397    ($type:ty, $path:expr) => {
398        impl $crate::data::CatalogPathPrefix for $type {
399            fn path_prefix() -> &'static str {
400                $path
401            }
402        }
403    };
404}
405
406// Standard implementations for financial data types
407impl_catalog_path_prefix!(QuoteTick, "quotes");
408impl_catalog_path_prefix!(TradeTick, "trades");
409impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
410impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
411impl_catalog_path_prefix!(Bar, "bars");
412impl_catalog_path_prefix!(IndexPriceUpdate, "index_prices");
413impl_catalog_path_prefix!(MarkPriceUpdate, "mark_prices");
414impl_catalog_path_prefix!(FundingRateUpdate, "funding_rate_update");
415impl_catalog_path_prefix!(InstrumentStatus, "instrument_status");
416impl_catalog_path_prefix!(InstrumentClose, "instrument_closes");
417
418use crate::instruments::InstrumentAny;
419impl_catalog_path_prefix!(InstrumentAny, "instruments");
420
421impl HasTsInit for Data {
422    fn ts_init(&self) -> UnixNanos {
423        match self {
424            Self::Delta(d) => d.ts_init,
425            Self::Deltas(d) => d.ts_init,
426            Self::Depth10(d) => d.ts_init,
427            Self::Quote(q) => q.ts_init,
428            Self::Trade(t) => t.ts_init,
429            Self::Bar(b) => b.ts_init,
430            Self::MarkPriceUpdate(p) => p.ts_init,
431            Self::IndexPriceUpdate(p) => p.ts_init,
432            Self::InstrumentStatus(s) => s.ts_init,
433            Self::InstrumentClose(c) => c.ts_init,
434            Self::Custom(c) => c.data.ts_init(),
435        }
436    }
437}
438
439/// Checks if the data slice is monotonically increasing by initialization timestamp.
440///
441/// Returns `true` if each element's `ts_init` is less than or equal to the next element's `ts_init`.
442pub fn is_monotonically_increasing_by_init<T: HasTsInit>(data: &[T]) -> bool {
443    data.array_windows()
444        .all(|[a, b]| a.ts_init() <= b.ts_init())
445}
446
447impl From<OrderBookDelta> for Data {
448    fn from(value: OrderBookDelta) -> Self {
449        Self::Delta(value)
450    }
451}
452
453impl From<OrderBookDeltas_API> for Data {
454    fn from(value: OrderBookDeltas_API) -> Self {
455        Self::Deltas(value)
456    }
457}
458
459impl From<OrderBookDepth10> for Data {
460    fn from(value: OrderBookDepth10) -> Self {
461        Self::Depth10(Box::new(value))
462    }
463}
464
465impl From<QuoteTick> for Data {
466    fn from(value: QuoteTick) -> Self {
467        Self::Quote(value)
468    }
469}
470
471impl From<TradeTick> for Data {
472    fn from(value: TradeTick) -> Self {
473        Self::Trade(value)
474    }
475}
476
477impl From<Bar> for Data {
478    fn from(value: Bar) -> Self {
479        Self::Bar(value)
480    }
481}
482
483impl From<MarkPriceUpdate> for Data {
484    fn from(value: MarkPriceUpdate) -> Self {
485        Self::MarkPriceUpdate(value)
486    }
487}
488
489impl From<IndexPriceUpdate> for Data {
490    fn from(value: IndexPriceUpdate) -> Self {
491        Self::IndexPriceUpdate(value)
492    }
493}
494
495impl From<InstrumentStatus> for Data {
496    fn from(value: InstrumentStatus) -> Self {
497        Self::InstrumentStatus(value)
498    }
499}
500
501impl From<InstrumentClose> for Data {
502    fn from(value: InstrumentClose) -> Self {
503        Self::InstrumentClose(value)
504    }
505}
506
507/// Builds a string-only view of a JSON value for use in topic (key=value).
508fn value_to_topic_string(v: &JsonValue) -> String {
509    if let Some(s) = v.as_str() {
510        return s.to_string();
511    }
512
513    if let Some(n) = v.as_u64() {
514        return n.to_string();
515    }
516
517    if let Some(n) = v.as_i64() {
518        return n.to_string();
519    }
520
521    if let Some(b) = v.as_bool() {
522        return b.to_string();
523    }
524
525    if let Some(f) = v.as_f64() {
526        return f.to_string();
527    }
528
529    if v.is_null() {
530        return "null".to_string();
531    }
532    serde_json::to_string(v).unwrap_or_default()
533}
534
535/// Builds the topic suffix from Params (string-only view: key=value joined by ".").
536fn params_to_topic_suffix(params: &Params) -> String {
537    params
538        .iter()
539        .map(|(k, v)| format!("{k}={}", value_to_topic_string(v)))
540        .collect::<Vec<_>>()
541        .join(".")
542}
543
544/// Represents a data type including metadata.
545#[derive(Clone, Serialize, Deserialize)]
546#[cfg_attr(
547    feature = "python",
548    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.model", from_py_object)
549)]
550#[cfg_attr(
551    feature = "python",
552    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.model")
553)]
554pub struct DataType {
555    type_name: String,
556    metadata: Option<Params>,
557    topic: String,
558    hash: u64,
559    identifier: Option<String>,
560}
561
562impl DataType {
563    /// Creates a new [`DataType`] instance.
564    #[must_use]
565    pub fn new(type_name: &str, metadata: Option<Params>, identifier: Option<String>) -> Self {
566        // Precompute topic from type_name + metadata (string-only view for backward compatibility)
567        let topic = if let Some(ref meta) = metadata {
568            if meta.is_empty() {
569                type_name.to_string()
570            } else {
571                format!("{type_name}.{}", params_to_topic_suffix(meta))
572            }
573        } else {
574            type_name.to_string()
575        };
576
577        // Precompute hash
578        let mut hasher = std::collections::hash_map::DefaultHasher::new();
579        topic.hash(&mut hasher);
580
581        Self {
582            type_name: type_name.to_owned(),
583            metadata,
584            topic,
585            hash: hasher.finish(),
586            identifier,
587        }
588    }
589
590    /// Creates a [`DataType`] from persisted parts (`type_name`, topic, metadata).
591    /// Hash is recomputed from topic. Use when restoring from legacy `data_type` column.
592    /// Identifier is set to None.
593    #[must_use]
594    pub fn from_parts(type_name: &str, topic: &str, metadata: Option<Params>) -> Self {
595        let mut hasher = std::collections::hash_map::DefaultHasher::new();
596        topic.hash(&mut hasher);
597        Self {
598            type_name: type_name.to_owned(),
599            metadata,
600            topic: topic.to_owned(),
601            hash: hasher.finish(),
602            identifier: None,
603        }
604    }
605
606    /// Serializes to JSON for persistence (`type_name`, metadata, identifier; no topic, no hash).
607    ///
608    /// # Errors
609    ///
610    /// Returns a JSON serialization error if the data cannot be serialized.
611    pub fn to_persistence_json(&self) -> Result<String, serde_json::Error> {
612        let mut map = serde_json::Map::new();
613        map.insert(
614            "type_name".to_string(),
615            serde_json::Value::String(self.type_name.clone()),
616        );
617        map.insert(
618            "metadata".to_string(),
619            self.metadata.as_ref().map_or(serde_json::Value::Null, |m| {
620                serde_json::to_value(m).unwrap_or(serde_json::Value::Null)
621            }),
622        );
623
624        if let Some(ref id) = self.identifier {
625            map.insert(
626                "identifier".to_string(),
627                serde_json::Value::String(id.clone()),
628            );
629        }
630        serde_json::to_string(&serde_json::Value::Object(map))
631    }
632
633    /// Deserializes from JSON produced by `to_persistence_json`.
634    /// Accepts legacy JSON with `topic` (ignored); topic is rebuilt from `type_name` + metadata.
635    ///
636    /// # Errors
637    ///
638    /// Returns an error if the string is not valid JSON or missing required fields.
639    pub fn from_persistence_json(s: &str) -> Result<Self, anyhow::Error> {
640        let value: serde_json::Value =
641            serde_json::from_str(s).map_err(|e| anyhow::anyhow!("Invalid data_type JSON: {e}"))?;
642        let obj = value
643            .as_object()
644            .ok_or_else(|| anyhow::anyhow!("data_type must be a JSON object"))?;
645        let type_name = obj
646            .get("type_name")
647            .and_then(|v| v.as_str())
648            .ok_or_else(|| anyhow::anyhow!("data_type must have type_name"))?
649            .to_string();
650        let metadata = obj.get("metadata").and_then(|m| {
651            if m.is_null() {
652                None
653            } else {
654                let p: Params = serde_json::from_value(m.clone()).ok()?;
655                if p.is_empty() { None } else { Some(p) }
656            }
657        });
658        let identifier = obj
659            .get("identifier")
660            .and_then(|v| v.as_str())
661            .map(String::from);
662        Ok(Self::new(&type_name, metadata, identifier))
663    }
664
665    /// Returns the type name for the data type.
666    #[must_use]
667    pub fn type_name(&self) -> &str {
668        self.type_name.as_str()
669    }
670
671    /// Returns the metadata for the data type.
672    #[must_use]
673    pub fn metadata(&self) -> Option<&Params> {
674        self.metadata.as_ref()
675    }
676
677    /// Returns a string representation of the metadata.
678    #[must_use]
679    pub fn metadata_str(&self) -> String {
680        self.metadata.as_ref().map_or_else(
681            || "null".to_string(),
682            |metadata| to_string(metadata).unwrap_or_default(),
683        )
684    }
685
686    /// Returns metadata as a string-only map (e.g. for Arrow schema metadata).
687    #[must_use]
688    pub fn metadata_string_map(&self) -> Option<std::collections::HashMap<String, String>> {
689        self.metadata.as_ref().map(|p| {
690            p.iter()
691                .map(|(k, v)| (k.clone(), value_to_topic_string(v)))
692                .collect()
693        })
694    }
695
696    /// Returns the precomputed hash for this data type.
697    #[must_use]
698    pub fn precomputed_hash(&self) -> u64 {
699        self.hash
700    }
701
702    /// Returns the messaging topic for the data type.
703    #[must_use]
704    pub fn topic(&self) -> &str {
705        self.topic.as_str()
706    }
707
708    /// Returns the optional catalog path identifier (can contain subdirs, e.g. `"venue//symbol"`).
709    #[must_use]
710    pub fn identifier(&self) -> Option<&str> {
711        self.identifier.as_deref()
712    }
713
714    /// Returns an [`Option<InstrumentId>`] parsed from the metadata.
715    ///
716    /// # Panics
717    ///
718    /// This function panics if:
719    /// - There is no metadata.
720    /// - The `instrument_id` value contained in the metadata is invalid.
721    #[must_use]
722    pub fn instrument_id(&self) -> Option<InstrumentId> {
723        let metadata = self.metadata.as_ref().expect("metadata was `None`");
724        let instrument_id = metadata.get_str("instrument_id")?;
725        Some(
726            InstrumentId::from_str(instrument_id)
727                .expect("Invalid `InstrumentId` for 'instrument_id'"),
728        )
729    }
730
731    /// Returns an [`Option<Venue>`] parsed from the metadata.
732    ///
733    /// # Panics
734    ///
735    /// This function panics if:
736    /// - There is no metadata.
737    /// - The `venue` value contained in the metadata is invalid.
738    #[must_use]
739    pub fn venue(&self) -> Option<Venue> {
740        let metadata = self.metadata.as_ref().expect("metadata was `None`");
741        let venue_str = metadata.get_str("venue")?;
742        Some(Venue::from(venue_str))
743    }
744
745    /// Returns an [`Option<UnixNanos>`] parsed from the metadata `start` field.
746    ///
747    /// # Panics
748    ///
749    /// This function panics if:
750    /// - There is no metadata.
751    /// - The `start` value contained in the metadata is invalid.
752    #[must_use]
753    pub fn start(&self) -> Option<UnixNanos> {
754        let metadata = self.metadata.as_ref()?;
755        let start_str = metadata.get_str("start")?;
756        Some(UnixNanos::from_str(start_str).expect("Invalid `UnixNanos` for 'start'"))
757    }
758
759    /// Returns an [`Option<UnixNanos>`] parsed from the metadata `end` field.
760    ///
761    /// # Panics
762    ///
763    /// This function panics if:
764    /// - There is no metadata.
765    /// - The `end` value contained in the metadata is invalid.
766    #[must_use]
767    pub fn end(&self) -> Option<UnixNanos> {
768        let metadata = self.metadata.as_ref()?;
769        let end_str = metadata.get_str("end")?;
770        Some(UnixNanos::from_str(end_str).expect("Invalid `UnixNanos` for 'end'"))
771    }
772
773    /// Returns an [`Option<usize>`] parsed from the metadata `limit` field.
774    ///
775    /// # Panics
776    ///
777    /// This function panics if:
778    /// - There is no metadata.
779    /// - The `limit` value contained in the metadata is invalid.
780    #[must_use]
781    pub fn limit(&self) -> Option<usize> {
782        let metadata = self.metadata.as_ref()?;
783        metadata.get_usize("limit").or_else(|| {
784            metadata
785                .get_str("limit")
786                .map(|s| s.parse::<usize>().expect("Invalid `usize` for 'limit'"))
787        })
788    }
789}
790
791impl PartialEq for DataType {
792    fn eq(&self, other: &Self) -> bool {
793        self.topic == other.topic
794    }
795}
796
797impl Eq for DataType {}
798
799impl PartialOrd for DataType {
800    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
801        Some(self.cmp(other))
802    }
803}
804
805impl Ord for DataType {
806    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
807        self.topic.cmp(&other.topic)
808    }
809}
810
811impl Hash for DataType {
812    fn hash<H: Hasher>(&self, state: &mut H) {
813        self.hash.hash(state);
814    }
815}
816
817impl Display for DataType {
818    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
819        write!(f, "{}", self.topic)
820    }
821}
822
823impl Debug for DataType {
824    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
825        write!(
826            f,
827            "DataType(type_name={}, metadata={:?}, identifier={:?})",
828            self.type_name, self.metadata, self.identifier
829        )
830    }
831}
832
833#[cfg(test)]
834mod tests {
835    use std::hash::DefaultHasher;
836
837    use rstest::*;
838    use serde_json::json;
839
840    use super::*;
841
842    fn params_from_json(value: serde_json::Value) -> Params {
843        serde_json::from_value(value).expect("valid Params JSON")
844    }
845
846    #[rstest]
847    fn test_data_type_creation_with_metadata() {
848        let metadata = Some(params_from_json(
849            json!({"key1": "value1", "key2": "value2"}),
850        ));
851        let data_type = DataType::new("ExampleType", metadata.clone(), None);
852
853        assert_eq!(data_type.type_name(), "ExampleType");
854        assert_eq!(data_type.topic(), "ExampleType.key1=value1.key2=value2");
855        assert_eq!(data_type.metadata(), metadata.as_ref());
856    }
857
858    #[rstest]
859    fn test_data_type_creation_without_metadata() {
860        let data_type = DataType::new("ExampleType", None, None);
861
862        assert_eq!(data_type.type_name(), "ExampleType");
863        assert_eq!(data_type.topic(), "ExampleType");
864        assert_eq!(data_type.metadata(), None);
865    }
866
867    #[rstest]
868    fn test_data_type_equality() {
869        let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
870        let metadata2 = Some(params_from_json(json!({"key1": "value1"})));
871
872        let data_type1 = DataType::new("ExampleType", metadata1, None);
873        let data_type2 = DataType::new("ExampleType", metadata2, None);
874
875        assert_eq!(data_type1, data_type2);
876    }
877
878    #[rstest]
879    fn test_data_type_inequality() {
880        let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
881        let metadata2 = Some(params_from_json(json!({"key2": "value2"})));
882
883        let data_type1 = DataType::new("ExampleType", metadata1, None);
884        let data_type2 = DataType::new("ExampleType", metadata2, None);
885
886        assert_ne!(data_type1, data_type2);
887    }
888
889    #[rstest]
890    fn test_data_type_ordering() {
891        let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
892        let metadata2 = Some(params_from_json(json!({"key2": "value2"})));
893
894        let data_type1 = DataType::new("ExampleTypeA", metadata1, None);
895        let data_type2 = DataType::new("ExampleTypeB", metadata2, None);
896
897        assert!(data_type1 < data_type2);
898    }
899
900    #[rstest]
901    fn test_data_type_hash() {
902        let metadata = Some(params_from_json(json!({"key1": "value1"})));
903
904        let data_type1 = DataType::new("ExampleType", metadata.clone(), None);
905        let data_type2 = DataType::new("ExampleType", metadata, None);
906
907        let mut hasher1 = DefaultHasher::new();
908        data_type1.hash(&mut hasher1);
909        let hash1 = hasher1.finish();
910
911        let mut hasher2 = DefaultHasher::new();
912        data_type2.hash(&mut hasher2);
913        let hash2 = hasher2.finish();
914
915        assert_eq!(hash1, hash2);
916    }
917
918    #[rstest]
919    fn test_data_type_display() {
920        let metadata = Some(params_from_json(json!({"key1": "value1"})));
921        let data_type = DataType::new("ExampleType", metadata, None);
922
923        assert_eq!(format!("{data_type}"), "ExampleType.key1=value1");
924    }
925
926    #[rstest]
927    fn test_data_type_debug() {
928        let metadata = Some(params_from_json(json!({"key1": "value1"})));
929        let data_type = DataType::new("ExampleType", metadata.clone(), None);
930
931        assert_eq!(
932            format!("{data_type:?}"),
933            format!("DataType(type_name=ExampleType, metadata={metadata:?}, identifier=None)")
934        );
935    }
936
937    #[rstest]
938    fn test_parse_instrument_id_from_metadata() {
939        let instrument_id_str = "MSFT.XNAS";
940        let metadata = Some(params_from_json(
941            json!({"instrument_id": instrument_id_str}),
942        ));
943        let data_type = DataType::new("InstrumentAny", metadata, None);
944
945        assert_eq!(
946            data_type.instrument_id().unwrap(),
947            InstrumentId::from_str(instrument_id_str).unwrap()
948        );
949    }
950
951    #[rstest]
952    fn test_parse_venue_from_metadata() {
953        let venue_str = "BINANCE";
954        let metadata = Some(params_from_json(json!({"venue": venue_str})));
955        let data_type = DataType::new(stringify!(InstrumentAny), metadata, None);
956
957        assert_eq!(data_type.venue().unwrap(), Venue::new(venue_str));
958    }
959
960    #[rstest]
961    fn test_parse_start_from_metadata() {
962        let start_ns = 1_600_054_595_844_758_000;
963        let metadata = Some(params_from_json(json!({"start": start_ns.to_string()})));
964        let data_type = DataType::new(stringify!(TradeTick), metadata, None);
965
966        assert_eq!(data_type.start().unwrap(), UnixNanos::from(start_ns),);
967    }
968
969    #[rstest]
970    fn test_parse_end_from_metadata() {
971        let end_ns = 1_720_954_595_844_758_000;
972        let metadata = Some(params_from_json(json!({"end": end_ns.to_string()})));
973        let data_type = DataType::new(stringify!(TradeTick), metadata, None);
974
975        assert_eq!(data_type.end().unwrap(), UnixNanos::from(end_ns),);
976    }
977
978    #[rstest]
979    fn test_parse_limit_from_metadata() {
980        let limit = 1000;
981        let metadata = Some(params_from_json(json!({"limit": limit})));
982        let data_type = DataType::new(stringify!(TradeTick), metadata, None);
983
984        assert_eq!(data_type.limit().unwrap(), limit);
985    }
986
987    #[rstest]
988    fn test_data_type_persistence_json_with_identifier() {
989        let data_type = DataType::new("MyCustomType", None, Some("venue//symbol".to_string()));
990        let json = data_type.to_persistence_json().unwrap();
991        assert!(!json.contains("topic"));
992        assert!(json.contains("\"identifier\":\"venue//symbol\""));
993        let restored = DataType::from_persistence_json(&json).unwrap();
994        assert_eq!(restored.type_name(), "MyCustomType");
995        assert_eq!(restored.identifier(), Some("venue//symbol"));
996        assert_eq!(restored.topic(), "MyCustomType");
997    }
998
999    #[rstest]
1000    fn test_data_type_identifier_getter() {
1001        let data_type = DataType::new("T", None, Some("id".to_string()));
1002        assert_eq!(data_type.identifier(), Some("id"));
1003        let data_type_no_id = DataType::new("T", None, None);
1004        assert_eq!(data_type_no_id.identifier(), None);
1005    }
1006}