Skip to main content

nautilus_model/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Data types for the trading domain model.
17
18pub mod bar;
19pub mod bet;
20pub mod black_scholes;
21pub mod close;
22pub mod custom;
23pub mod delta;
24pub mod deltas;
25pub mod depth;
26pub mod forward;
27pub mod funding;
28pub mod greeks;
29pub mod option_chain;
30pub mod order;
31pub mod prices;
32pub mod quote;
33pub mod registry;
34pub mod status;
35pub mod trade;
36
37#[cfg(any(test, feature = "stubs"))]
38pub mod stubs;
39
40use std::{
41    fmt::{Debug, Display},
42    hash::{Hash, Hasher},
43    str::FromStr,
44};
45
46use nautilus_core::{Params, UnixNanos};
47use serde::{Deserialize, Serialize};
48use serde_json::{Value as JsonValue, to_string};
49
50// Re-exports
51#[rustfmt::skip]  // Keep these grouped
52pub use bar::{Bar, BarSpecification, BarType};
53pub use black_scholes::Greeks;
54pub use close::InstrumentClose;
55#[cfg(feature = "python")]
56pub use custom::PythonCustomDataWrapper;
57pub use custom::{
58    CustomData, CustomDataTrait, ensure_custom_data_json_registered, register_custom_data_json,
59};
60#[cfg(feature = "python")]
61pub use custom::{
62    get_python_data_class, reconstruct_python_custom_data, register_python_data_class,
63};
64pub use delta::OrderBookDelta;
65pub use deltas::{OrderBookDeltas, OrderBookDeltas_API};
66pub use depth::{DEPTH10_LEN, OrderBookDepth10};
67pub use forward::ForwardPrice;
68pub use funding::FundingRateUpdate;
69pub use greeks::{
70    BlackScholesGreeksResult, GreeksData, HasGreeks, OptionGreekValues, PortfolioGreeks,
71    YieldCurveData, black_scholes_greeks, imply_vol_and_greeks, refine_vol_and_greeks,
72};
73pub use option_chain::{OptionChainSlice, OptionGreeks, OptionStrikeData, StrikeRange};
74pub use order::{BookOrder, NULL_ORDER};
75pub use prices::{IndexPriceUpdate, MarkPriceUpdate};
76pub use quote::QuoteTick;
77#[cfg(feature = "arrow")]
78pub use registry::{
79    ArrowDecoder, ArrowEncoder, decode_custom_from_arrow, encode_custom_to_arrow,
80    ensure_arrow_registered, get_arrow_schema, register_arrow,
81};
82#[cfg(feature = "python")]
83pub use registry::{
84    PyExtractor, ensure_py_extractor_registered, ensure_rust_extractor_factory_registered,
85    ensure_rust_extractor_registered, get_rust_extractor, register_py_extractor,
86    register_rust_extractor, register_rust_extractor_factory, try_extract_from_py,
87};
88pub use registry::{
89    deserialize_custom_from_json, ensure_json_deserializer_registered, register_json_deserializer,
90};
91pub use status::InstrumentStatus;
92pub use trade::TradeTick;
93
94use crate::identifiers::{InstrumentId, Venue};
95/// A built-in Nautilus data type.
96///
97/// Not recommended for storing large amounts of data, as the largest variant is significantly
98/// larger (10x) than the smallest.
99#[derive(Debug)]
100pub enum Data {
101    Delta(OrderBookDelta),
102    Deltas(OrderBookDeltas_API),
103    Depth10(Box<OrderBookDepth10>), // This variant is significantly larger
104    Quote(QuoteTick),
105    Trade(TradeTick),
106    Bar(Bar),
107    MarkPriceUpdate(MarkPriceUpdate), // TODO: Rename to MarkPrice once Cython gone
108    IndexPriceUpdate(IndexPriceUpdate), // TODO: Rename to IndexPrice once Cython gone
109    InstrumentStatus(InstrumentStatus),
110    InstrumentClose(InstrumentClose),
111    Custom(CustomData),
112}
113
114/// A C-compatible representation of [`Data`] for FFI.
115///
116/// This enum matches the standard variants of [`Data`] but excludes the `Custom`
117/// variant which is not FFI-safe.
118#[cfg(feature = "ffi")]
119#[repr(C)]
120#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
121#[allow(non_camel_case_types)]
122pub enum DataFFI {
123    Delta(OrderBookDelta),
124    Deltas(OrderBookDeltas_API),
125    Depth10(Box<OrderBookDepth10>),
126    Quote(QuoteTick),
127    Trade(TradeTick),
128    Bar(Bar),
129    MarkPriceUpdate(MarkPriceUpdate),
130    IndexPriceUpdate(IndexPriceUpdate),
131    InstrumentClose(InstrumentClose),
132}
133
134#[cfg(feature = "ffi")]
135impl TryFrom<Data> for DataFFI {
136    type Error = anyhow::Error;
137
138    fn try_from(value: Data) -> Result<Self, Self::Error> {
139        match value {
140            Data::Delta(x) => Ok(Self::Delta(x)),
141            Data::Deltas(x) => Ok(Self::Deltas(x)),
142            Data::Depth10(x) => Ok(Self::Depth10(x)),
143            Data::Quote(x) => Ok(Self::Quote(x)),
144            Data::Trade(x) => Ok(Self::Trade(x)),
145            Data::Bar(x) => Ok(Self::Bar(x)),
146            Data::MarkPriceUpdate(x) => Ok(Self::MarkPriceUpdate(x)),
147            Data::IndexPriceUpdate(x) => Ok(Self::IndexPriceUpdate(x)),
148            Data::InstrumentStatus(_) => {
149                anyhow::bail!("Cannot convert Data::InstrumentStatus to DataFFI")
150            }
151            Data::InstrumentClose(x) => Ok(Self::InstrumentClose(x)),
152            Data::Custom(_) => anyhow::bail!("Cannot convert Data::Custom to DataFFI"),
153        }
154    }
155}
156
157#[cfg(feature = "ffi")]
158impl From<DataFFI> for Data {
159    fn from(value: DataFFI) -> Self {
160        match value {
161            DataFFI::Delta(x) => Self::Delta(x),
162            DataFFI::Deltas(x) => Self::Deltas(x),
163            DataFFI::Depth10(x) => Self::Depth10(x),
164            DataFFI::Quote(x) => Self::Quote(x),
165            DataFFI::Trade(x) => Self::Trade(x),
166            DataFFI::Bar(x) => Self::Bar(x),
167            DataFFI::MarkPriceUpdate(x) => Self::MarkPriceUpdate(x),
168            DataFFI::IndexPriceUpdate(x) => Self::IndexPriceUpdate(x),
169            DataFFI::InstrumentClose(x) => Self::InstrumentClose(x),
170        }
171    }
172}
173
174impl<'de> Deserialize<'de> for Data {
175    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
176    where
177        D: serde::Deserializer<'de>,
178    {
179        use serde::de::Error;
180        let value = serde_json::Value::deserialize(deserializer)?;
181        let type_name = value
182            .get("type")
183            .and_then(|v| v.as_str())
184            .ok_or_else(|| D::Error::custom("Missing 'type' field in Data"))?
185            .to_string();
186
187        match type_name.as_str() {
188            "OrderBookDelta" => Ok(Self::Delta(
189                serde_json::from_value(value).map_err(D::Error::custom)?,
190            )),
191            "OrderBookDeltas" => Ok(Self::Deltas(
192                serde_json::from_value(value).map_err(D::Error::custom)?,
193            )),
194            "OrderBookDepth10" => Ok(Self::Depth10(
195                serde_json::from_value(value).map_err(D::Error::custom)?,
196            )),
197            "QuoteTick" => Ok(Self::Quote(
198                serde_json::from_value(value).map_err(D::Error::custom)?,
199            )),
200            "TradeTick" => Ok(Self::Trade(
201                serde_json::from_value(value).map_err(D::Error::custom)?,
202            )),
203            "Bar" => Ok(Self::Bar(
204                serde_json::from_value(value).map_err(D::Error::custom)?,
205            )),
206            "MarkPriceUpdate" => Ok(Self::MarkPriceUpdate(
207                serde_json::from_value(value).map_err(D::Error::custom)?,
208            )),
209            "IndexPriceUpdate" => Ok(Self::IndexPriceUpdate(
210                serde_json::from_value(value).map_err(D::Error::custom)?,
211            )),
212            "InstrumentStatus" => Ok(Self::InstrumentStatus(
213                serde_json::from_value(value).map_err(D::Error::custom)?,
214            )),
215            "InstrumentClose" => Ok(Self::InstrumentClose(
216                serde_json::from_value(value).map_err(D::Error::custom)?,
217            )),
218            _ => {
219                if let Some(data) =
220                    deserialize_custom_from_json(&type_name, &value).map_err(D::Error::custom)?
221                {
222                    Ok(data)
223                } else {
224                    Err(D::Error::custom(format!("Unknown Data type: {type_name}")))
225                }
226            }
227        }
228    }
229}
230
231impl Clone for Data {
232    fn clone(&self) -> Self {
233        match self {
234            Self::Delta(x) => Self::Delta(*x),
235            Self::Deltas(x) => Self::Deltas(x.clone()),
236            Self::Depth10(x) => Self::Depth10(x.clone()),
237            Self::Quote(x) => Self::Quote(*x),
238            Self::Trade(x) => Self::Trade(*x),
239            Self::Bar(x) => Self::Bar(*x),
240            Self::MarkPriceUpdate(x) => Self::MarkPriceUpdate(*x),
241            Self::IndexPriceUpdate(x) => Self::IndexPriceUpdate(*x),
242            Self::InstrumentStatus(x) => Self::InstrumentStatus(*x),
243            Self::InstrumentClose(x) => Self::InstrumentClose(*x),
244            Self::Custom(x) => Self::Custom(x.clone()),
245        }
246    }
247}
248
249impl PartialEq for Data {
250    fn eq(&self, other: &Self) -> bool {
251        match (self, other) {
252            (Self::Delta(a), Self::Delta(b)) => a == b,
253            (Self::Deltas(a), Self::Deltas(b)) => a == b,
254            (Self::Depth10(a), Self::Depth10(b)) => a == b,
255            (Self::Quote(a), Self::Quote(b)) => a == b,
256            (Self::Trade(a), Self::Trade(b)) => a == b,
257            (Self::Bar(a), Self::Bar(b)) => a == b,
258            (Self::MarkPriceUpdate(a), Self::MarkPriceUpdate(b)) => a == b,
259            (Self::IndexPriceUpdate(a), Self::IndexPriceUpdate(b)) => a == b,
260            (Self::InstrumentStatus(a), Self::InstrumentStatus(b)) => a == b,
261            (Self::InstrumentClose(a), Self::InstrumentClose(b)) => a == b,
262            (Self::Custom(a), Self::Custom(b)) => a == b,
263            _ => false,
264        }
265    }
266}
267
268impl Serialize for Data {
269    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
270    where
271        S: serde::Serializer,
272    {
273        match self {
274            Self::Delta(x) => x.serialize(serializer),
275            Self::Deltas(x) => x.serialize(serializer),
276            Self::Depth10(x) => x.serialize(serializer),
277            Self::Quote(x) => x.serialize(serializer),
278            Self::Trade(x) => x.serialize(serializer),
279            Self::Bar(x) => x.serialize(serializer),
280            Self::MarkPriceUpdate(x) => x.serialize(serializer),
281            Self::IndexPriceUpdate(x) => x.serialize(serializer),
282            Self::InstrumentStatus(x) => x.serialize(serializer),
283            Self::InstrumentClose(x) => x.serialize(serializer),
284            Self::Custom(x) => x.serialize(serializer),
285        }
286    }
287}
288
289macro_rules! impl_try_from_data {
290    ($variant:ident, $type:ty) => {
291        impl TryFrom<Data> for $type {
292            type Error = ();
293
294            fn try_from(value: Data) -> Result<Self, Self::Error> {
295                match value {
296                    Data::$variant(x) => Ok(x),
297                    _ => Err(()),
298                }
299            }
300        }
301    };
302}
303
304impl TryFrom<Data> for OrderBookDepth10 {
305    type Error = ();
306
307    fn try_from(value: Data) -> Result<Self, Self::Error> {
308        match value {
309            Data::Depth10(x) => Ok(*x),
310            _ => Err(()),
311        }
312    }
313}
314
315impl_try_from_data!(Quote, QuoteTick);
316impl_try_from_data!(Delta, OrderBookDelta);
317impl_try_from_data!(Deltas, OrderBookDeltas_API);
318impl_try_from_data!(Trade, TradeTick);
319impl_try_from_data!(Bar, Bar);
320impl_try_from_data!(MarkPriceUpdate, MarkPriceUpdate);
321impl_try_from_data!(IndexPriceUpdate, IndexPriceUpdate);
322impl_try_from_data!(InstrumentStatus, InstrumentStatus);
323impl_try_from_data!(InstrumentClose, InstrumentClose);
324
325/// Converts a vector of `Data` items to a specific variant type.
326///
327/// Filters and converts the data vector, keeping only items that can be
328/// successfully converted to the target type `T`.
329#[must_use]
330pub fn to_variant<T: TryFrom<Data>>(data: Vec<Data>) -> Vec<T> {
331    data.into_iter()
332        .filter_map(|d| T::try_from(d).ok())
333        .collect()
334}
335
336impl Data {
337    /// Returns the instrument ID for the data.
338    #[must_use]
339    pub fn instrument_id(&self) -> InstrumentId {
340        match self {
341            Self::Delta(delta) => delta.instrument_id,
342            Self::Deltas(deltas) => deltas.instrument_id,
343            Self::Depth10(depth) => depth.instrument_id,
344            Self::Quote(quote) => quote.instrument_id,
345            Self::Trade(trade) => trade.instrument_id,
346            Self::Bar(bar) => bar.bar_type.instrument_id(),
347            Self::MarkPriceUpdate(mark_price) => mark_price.instrument_id,
348            Self::IndexPriceUpdate(index_price) => index_price.instrument_id,
349            Self::InstrumentStatus(status) => status.instrument_id,
350            Self::InstrumentClose(close) => close.instrument_id,
351            Self::Custom(custom) => custom
352                .data_type
353                .identifier()
354                .and_then(|s| InstrumentId::from_str(s).ok())
355                .or_else(|| {
356                    custom
357                        .data_type
358                        .metadata()
359                        .and_then(|m| m.get_str("instrument_id"))
360                        .and_then(|s| InstrumentId::from_str(s).ok())
361                })
362                .unwrap_or_else(|| InstrumentId::from("NULL.NULL")),
363        }
364    }
365
366    /// Returns whether the data is a type of order book data.
367    #[must_use]
368    pub fn is_order_book_data(&self) -> bool {
369        matches!(self, Self::Delta(_) | Self::Deltas(_) | Self::Depth10(_))
370    }
371}
372
373/// Marker trait for types that carry a creation timestamp.
374///
375/// `ts_init` is the moment (UNIX nanoseconds) when this value was first generated or
376/// ingested by Nautilus. It can be used for sequencing, latency measurements,
377/// or monitoring data-pipeline delays.
378pub trait HasTsInit {
379    /// Returns the UNIX timestamp (nanoseconds) when the instance was created.
380    fn ts_init(&self) -> UnixNanos;
381}
382
383/// Trait for data types that have a catalog path prefix.
384pub trait CatalogPathPrefix {
385    /// Returns the path prefix (directory name) for this data type.
386    fn path_prefix() -> &'static str;
387}
388
389/// Macro for implementing [`CatalogPathPrefix`] for data types.
390///
391/// This macro provides a convenient way to implement the trait for multiple types
392/// with their corresponding path prefixes.
393///
394/// # Parameters
395///
396/// - `$type`: The data type to implement the trait for.
397/// - `$path`: The path prefix string for that type.
398#[macro_export]
399macro_rules! impl_catalog_path_prefix {
400    ($type:ty, $path:expr) => {
401        impl $crate::data::CatalogPathPrefix for $type {
402            fn path_prefix() -> &'static str {
403                $path
404            }
405        }
406    };
407}
408
409// Standard implementations for financial data types
410impl_catalog_path_prefix!(QuoteTick, "quotes");
411impl_catalog_path_prefix!(TradeTick, "trades");
412impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
413impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
414impl_catalog_path_prefix!(Bar, "bars");
415impl_catalog_path_prefix!(IndexPriceUpdate, "index_prices");
416impl_catalog_path_prefix!(MarkPriceUpdate, "mark_prices");
417impl_catalog_path_prefix!(FundingRateUpdate, "funding_rate_update");
418impl_catalog_path_prefix!(InstrumentStatus, "instrument_status");
419impl_catalog_path_prefix!(InstrumentClose, "instrument_closes");
420
421use crate::instruments::InstrumentAny;
422impl_catalog_path_prefix!(InstrumentAny, "instruments");
423
424impl HasTsInit for Data {
425    fn ts_init(&self) -> UnixNanos {
426        match self {
427            Self::Delta(d) => d.ts_init,
428            Self::Deltas(d) => d.ts_init,
429            Self::Depth10(d) => d.ts_init,
430            Self::Quote(q) => q.ts_init,
431            Self::Trade(t) => t.ts_init,
432            Self::Bar(b) => b.ts_init,
433            Self::MarkPriceUpdate(p) => p.ts_init,
434            Self::IndexPriceUpdate(p) => p.ts_init,
435            Self::InstrumentStatus(s) => s.ts_init,
436            Self::InstrumentClose(c) => c.ts_init,
437            Self::Custom(c) => c.data.ts_init(),
438        }
439    }
440}
441
442/// Checks if the data slice is monotonically increasing by initialization timestamp.
443///
444/// Returns `true` if each element's `ts_init` is less than or equal to the next element's `ts_init`.
445pub fn is_monotonically_increasing_by_init<T: HasTsInit>(data: &[T]) -> bool {
446    data.array_windows()
447        .all(|[a, b]| a.ts_init() <= b.ts_init())
448}
449
450impl From<OrderBookDelta> for Data {
451    fn from(value: OrderBookDelta) -> Self {
452        Self::Delta(value)
453    }
454}
455
456impl From<OrderBookDeltas_API> for Data {
457    fn from(value: OrderBookDeltas_API) -> Self {
458        Self::Deltas(value)
459    }
460}
461
462impl From<OrderBookDepth10> for Data {
463    fn from(value: OrderBookDepth10) -> Self {
464        Self::Depth10(Box::new(value))
465    }
466}
467
468impl From<QuoteTick> for Data {
469    fn from(value: QuoteTick) -> Self {
470        Self::Quote(value)
471    }
472}
473
474impl From<TradeTick> for Data {
475    fn from(value: TradeTick) -> Self {
476        Self::Trade(value)
477    }
478}
479
480impl From<Bar> for Data {
481    fn from(value: Bar) -> Self {
482        Self::Bar(value)
483    }
484}
485
486impl From<MarkPriceUpdate> for Data {
487    fn from(value: MarkPriceUpdate) -> Self {
488        Self::MarkPriceUpdate(value)
489    }
490}
491
492impl From<IndexPriceUpdate> for Data {
493    fn from(value: IndexPriceUpdate) -> Self {
494        Self::IndexPriceUpdate(value)
495    }
496}
497
498impl From<InstrumentStatus> for Data {
499    fn from(value: InstrumentStatus) -> Self {
500        Self::InstrumentStatus(value)
501    }
502}
503
504impl From<InstrumentClose> for Data {
505    fn from(value: InstrumentClose) -> Self {
506        Self::InstrumentClose(value)
507    }
508}
509
510/// Builds a string-only view of a JSON value for use in topic (key=value).
511fn value_to_topic_string(v: &JsonValue) -> String {
512    if let Some(s) = v.as_str() {
513        return s.to_string();
514    }
515
516    if let Some(n) = v.as_u64() {
517        return n.to_string();
518    }
519
520    if let Some(n) = v.as_i64() {
521        return n.to_string();
522    }
523
524    if let Some(b) = v.as_bool() {
525        return b.to_string();
526    }
527
528    if let Some(f) = v.as_f64() {
529        return f.to_string();
530    }
531
532    if v.is_null() {
533        return "null".to_string();
534    }
535    serde_json::to_string(v).unwrap_or_default()
536}
537
538/// Builds the topic suffix from Params (string-only view: key=value joined by ".").
539fn params_to_topic_suffix(params: &Params) -> String {
540    params
541        .iter()
542        .map(|(k, v)| format!("{k}={}", value_to_topic_string(v)))
543        .collect::<Vec<_>>()
544        .join(".")
545}
546
547/// Represents a data type including metadata.
548#[derive(Clone, Serialize, Deserialize)]
549#[cfg_attr(
550    feature = "python",
551    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.model", from_py_object)
552)]
553#[cfg_attr(
554    feature = "python",
555    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.model")
556)]
557pub struct DataType {
558    type_name: String,
559    metadata: Option<Params>,
560    topic: String,
561    hash: u64,
562    identifier: Option<String>,
563}
564
565impl DataType {
566    /// Creates a new [`DataType`] instance.
567    #[must_use]
568    pub fn new(type_name: &str, metadata: Option<Params>, identifier: Option<String>) -> Self {
569        // Precompute topic from type_name + metadata (string-only view for backward compatibility)
570        let topic = if let Some(ref meta) = metadata {
571            if meta.is_empty() {
572                type_name.to_string()
573            } else {
574                format!("{type_name}.{}", params_to_topic_suffix(meta))
575            }
576        } else {
577            type_name.to_string()
578        };
579
580        // Precompute hash
581        let mut hasher = std::collections::hash_map::DefaultHasher::new();
582        topic.hash(&mut hasher);
583
584        Self {
585            type_name: type_name.to_owned(),
586            metadata,
587            topic,
588            hash: hasher.finish(),
589            identifier,
590        }
591    }
592
593    /// Creates a [`DataType`] from persisted parts (`type_name`, topic, metadata).
594    /// Hash is recomputed from topic. Use when restoring from legacy `data_type` column.
595    /// Identifier is set to None.
596    #[must_use]
597    pub fn from_parts(type_name: &str, topic: &str, metadata: Option<Params>) -> Self {
598        let mut hasher = std::collections::hash_map::DefaultHasher::new();
599        topic.hash(&mut hasher);
600        Self {
601            type_name: type_name.to_owned(),
602            metadata,
603            topic: topic.to_owned(),
604            hash: hasher.finish(),
605            identifier: None,
606        }
607    }
608
609    /// Serializes to JSON for persistence (`type_name`, metadata, identifier; no topic, no hash).
610    ///
611    /// # Errors
612    ///
613    /// Returns a JSON serialization error if the data cannot be serialized.
614    pub fn to_persistence_json(&self) -> Result<String, serde_json::Error> {
615        let mut map = serde_json::Map::new();
616        map.insert(
617            "type_name".to_string(),
618            serde_json::Value::String(self.type_name.clone()),
619        );
620        map.insert(
621            "metadata".to_string(),
622            self.metadata.as_ref().map_or(serde_json::Value::Null, |m| {
623                serde_json::to_value(m).unwrap_or(serde_json::Value::Null)
624            }),
625        );
626
627        if let Some(ref id) = self.identifier {
628            map.insert(
629                "identifier".to_string(),
630                serde_json::Value::String(id.clone()),
631            );
632        }
633        serde_json::to_string(&serde_json::Value::Object(map))
634    }
635
636    /// Deserializes from JSON produced by `to_persistence_json`.
637    /// Accepts legacy JSON with `topic` (ignored); topic is rebuilt from `type_name` + metadata.
638    ///
639    /// # Errors
640    ///
641    /// Returns an error if the string is not valid JSON or missing required fields.
642    pub fn from_persistence_json(s: &str) -> Result<Self, anyhow::Error> {
643        let value: serde_json::Value =
644            serde_json::from_str(s).map_err(|e| anyhow::anyhow!("Invalid data_type JSON: {e}"))?;
645        let obj = value
646            .as_object()
647            .ok_or_else(|| anyhow::anyhow!("data_type must be a JSON object"))?;
648        let type_name = obj
649            .get("type_name")
650            .and_then(|v| v.as_str())
651            .ok_or_else(|| anyhow::anyhow!("data_type must have type_name"))?
652            .to_string();
653        let metadata = obj.get("metadata").and_then(|m| {
654            if m.is_null() {
655                None
656            } else {
657                let p: Params = serde_json::from_value(m.clone()).ok()?;
658                if p.is_empty() { None } else { Some(p) }
659            }
660        });
661        let identifier = obj
662            .get("identifier")
663            .and_then(|v| v.as_str())
664            .map(String::from);
665        Ok(Self::new(&type_name, metadata, identifier))
666    }
667
668    /// Returns the type name for the data type.
669    #[must_use]
670    pub fn type_name(&self) -> &str {
671        self.type_name.as_str()
672    }
673
674    /// Returns the metadata for the data type.
675    #[must_use]
676    pub fn metadata(&self) -> Option<&Params> {
677        self.metadata.as_ref()
678    }
679
680    /// Returns a string representation of the metadata.
681    #[must_use]
682    pub fn metadata_str(&self) -> String {
683        self.metadata.as_ref().map_or_else(
684            || "null".to_string(),
685            |metadata| to_string(metadata).unwrap_or_default(),
686        )
687    }
688
689    /// Returns metadata as a string-only map (e.g. for Arrow schema metadata).
690    #[must_use]
691    pub fn metadata_string_map(&self) -> Option<std::collections::HashMap<String, String>> {
692        self.metadata.as_ref().map(|p| {
693            p.iter()
694                .map(|(k, v)| (k.clone(), value_to_topic_string(v)))
695                .collect()
696        })
697    }
698
699    /// Returns the precomputed hash for this data type.
700    #[must_use]
701    pub fn precomputed_hash(&self) -> u64 {
702        self.hash
703    }
704
705    /// Returns the messaging topic for the data type.
706    #[must_use]
707    pub fn topic(&self) -> &str {
708        self.topic.as_str()
709    }
710
711    /// Returns the optional catalog path identifier (can contain subdirs, e.g. `"venue//symbol"`).
712    #[must_use]
713    pub fn identifier(&self) -> Option<&str> {
714        self.identifier.as_deref()
715    }
716
717    /// Returns an [`Option<InstrumentId>`] parsed from the metadata.
718    ///
719    /// # Panics
720    ///
721    /// This function panics if:
722    /// - There is no metadata.
723    /// - The `instrument_id` value contained in the metadata is invalid.
724    #[must_use]
725    pub fn instrument_id(&self) -> Option<InstrumentId> {
726        let metadata = self.metadata.as_ref().expect("metadata was `None`");
727        let instrument_id = metadata.get_str("instrument_id")?;
728        Some(
729            InstrumentId::from_str(instrument_id)
730                .expect("Invalid `InstrumentId` for 'instrument_id'"),
731        )
732    }
733
734    /// Returns an [`Option<Venue>`] parsed from the metadata.
735    ///
736    /// # Panics
737    ///
738    /// This function panics if:
739    /// - There is no metadata.
740    /// - The `venue` value contained in the metadata is invalid.
741    #[must_use]
742    pub fn venue(&self) -> Option<Venue> {
743        let metadata = self.metadata.as_ref().expect("metadata was `None`");
744        let venue_str = metadata.get_str("venue")?;
745        Some(Venue::from(venue_str))
746    }
747
748    /// Returns an [`Option<UnixNanos>`] parsed from the metadata `start` field.
749    ///
750    /// # Panics
751    ///
752    /// This function panics if:
753    /// - There is no metadata.
754    /// - The `start` value contained in the metadata is invalid.
755    #[must_use]
756    pub fn start(&self) -> Option<UnixNanos> {
757        let metadata = self.metadata.as_ref()?;
758        let start_str = metadata.get_str("start")?;
759        Some(UnixNanos::from_str(start_str).expect("Invalid `UnixNanos` for 'start'"))
760    }
761
762    /// Returns an [`Option<UnixNanos>`] parsed from the metadata `end` field.
763    ///
764    /// # Panics
765    ///
766    /// This function panics if:
767    /// - There is no metadata.
768    /// - The `end` value contained in the metadata is invalid.
769    #[must_use]
770    pub fn end(&self) -> Option<UnixNanos> {
771        let metadata = self.metadata.as_ref()?;
772        let end_str = metadata.get_str("end")?;
773        Some(UnixNanos::from_str(end_str).expect("Invalid `UnixNanos` for 'end'"))
774    }
775
776    /// Returns an [`Option<usize>`] parsed from the metadata `limit` field.
777    ///
778    /// # Panics
779    ///
780    /// This function panics if:
781    /// - There is no metadata.
782    /// - The `limit` value contained in the metadata is invalid.
783    #[must_use]
784    pub fn limit(&self) -> Option<usize> {
785        let metadata = self.metadata.as_ref()?;
786        metadata.get_usize("limit").or_else(|| {
787            metadata
788                .get_str("limit")
789                .map(|s| s.parse::<usize>().expect("Invalid `usize` for 'limit'"))
790        })
791    }
792}
793
794impl PartialEq for DataType {
795    fn eq(&self, other: &Self) -> bool {
796        self.topic == other.topic
797    }
798}
799
800impl Eq for DataType {}
801
802impl PartialOrd for DataType {
803    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
804        Some(self.cmp(other))
805    }
806}
807
808impl Ord for DataType {
809    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
810        self.topic.cmp(&other.topic)
811    }
812}
813
814impl Hash for DataType {
815    fn hash<H: Hasher>(&self, state: &mut H) {
816        self.hash.hash(state);
817    }
818}
819
820impl Display for DataType {
821    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
822        write!(f, "{}", self.topic)
823    }
824}
825
826impl Debug for DataType {
827    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
828        write!(
829            f,
830            "DataType(type_name={}, metadata={:?}, identifier={:?})",
831            self.type_name, self.metadata, self.identifier
832        )
833    }
834}
835
836#[cfg(test)]
837mod tests {
838    use std::hash::DefaultHasher;
839
840    use rstest::*;
841    use serde_json::json;
842
843    use super::*;
844
845    fn params_from_json(value: serde_json::Value) -> Params {
846        serde_json::from_value(value).expect("valid Params JSON")
847    }
848
849    #[rstest]
850    fn test_data_type_creation_with_metadata() {
851        let metadata = Some(params_from_json(
852            json!({"key1": "value1", "key2": "value2"}),
853        ));
854        let data_type = DataType::new("ExampleType", metadata.clone(), None);
855
856        assert_eq!(data_type.type_name(), "ExampleType");
857        assert_eq!(data_type.topic(), "ExampleType.key1=value1.key2=value2");
858        assert_eq!(data_type.metadata(), metadata.as_ref());
859    }
860
861    #[rstest]
862    fn test_data_type_creation_without_metadata() {
863        let data_type = DataType::new("ExampleType", None, None);
864
865        assert_eq!(data_type.type_name(), "ExampleType");
866        assert_eq!(data_type.topic(), "ExampleType");
867        assert_eq!(data_type.metadata(), None);
868    }
869
870    #[rstest]
871    fn test_data_type_equality() {
872        let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
873        let metadata2 = Some(params_from_json(json!({"key1": "value1"})));
874
875        let data_type1 = DataType::new("ExampleType", metadata1, None);
876        let data_type2 = DataType::new("ExampleType", metadata2, None);
877
878        assert_eq!(data_type1, data_type2);
879    }
880
881    #[rstest]
882    fn test_data_type_inequality() {
883        let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
884        let metadata2 = Some(params_from_json(json!({"key2": "value2"})));
885
886        let data_type1 = DataType::new("ExampleType", metadata1, None);
887        let data_type2 = DataType::new("ExampleType", metadata2, None);
888
889        assert_ne!(data_type1, data_type2);
890    }
891
892    #[rstest]
893    fn test_data_type_ordering() {
894        let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
895        let metadata2 = Some(params_from_json(json!({"key2": "value2"})));
896
897        let data_type1 = DataType::new("ExampleTypeA", metadata1, None);
898        let data_type2 = DataType::new("ExampleTypeB", metadata2, None);
899
900        assert!(data_type1 < data_type2);
901    }
902
903    #[rstest]
904    fn test_data_type_hash() {
905        let metadata = Some(params_from_json(json!({"key1": "value1"})));
906
907        let data_type1 = DataType::new("ExampleType", metadata.clone(), None);
908        let data_type2 = DataType::new("ExampleType", metadata, None);
909
910        let mut hasher1 = DefaultHasher::new();
911        data_type1.hash(&mut hasher1);
912        let hash1 = hasher1.finish();
913
914        let mut hasher2 = DefaultHasher::new();
915        data_type2.hash(&mut hasher2);
916        let hash2 = hasher2.finish();
917
918        assert_eq!(hash1, hash2);
919    }
920
921    #[rstest]
922    fn test_data_type_display() {
923        let metadata = Some(params_from_json(json!({"key1": "value1"})));
924        let data_type = DataType::new("ExampleType", metadata, None);
925
926        assert_eq!(format!("{data_type}"), "ExampleType.key1=value1");
927    }
928
929    #[rstest]
930    fn test_data_type_debug() {
931        let metadata = Some(params_from_json(json!({"key1": "value1"})));
932        let data_type = DataType::new("ExampleType", metadata.clone(), None);
933
934        assert_eq!(
935            format!("{data_type:?}"),
936            format!("DataType(type_name=ExampleType, metadata={metadata:?}, identifier=None)")
937        );
938    }
939
940    #[rstest]
941    fn test_parse_instrument_id_from_metadata() {
942        let instrument_id_str = "MSFT.XNAS";
943        let metadata = Some(params_from_json(
944            json!({"instrument_id": instrument_id_str}),
945        ));
946        let data_type = DataType::new("InstrumentAny", metadata, None);
947
948        assert_eq!(
949            data_type.instrument_id().unwrap(),
950            InstrumentId::from_str(instrument_id_str).unwrap()
951        );
952    }
953
954    #[rstest]
955    fn test_parse_venue_from_metadata() {
956        let venue_str = "BINANCE";
957        let metadata = Some(params_from_json(json!({"venue": venue_str})));
958        let data_type = DataType::new(stringify!(InstrumentAny), metadata, None);
959
960        assert_eq!(data_type.venue().unwrap(), Venue::new(venue_str));
961    }
962
963    #[rstest]
964    fn test_parse_start_from_metadata() {
965        let start_ns = 1_600_054_595_844_758_000;
966        let metadata = Some(params_from_json(json!({"start": start_ns.to_string()})));
967        let data_type = DataType::new(stringify!(TradeTick), metadata, None);
968
969        assert_eq!(data_type.start().unwrap(), UnixNanos::from(start_ns),);
970    }
971
972    #[rstest]
973    fn test_parse_end_from_metadata() {
974        let end_ns = 1_720_954_595_844_758_000;
975        let metadata = Some(params_from_json(json!({"end": end_ns.to_string()})));
976        let data_type = DataType::new(stringify!(TradeTick), metadata, None);
977
978        assert_eq!(data_type.end().unwrap(), UnixNanos::from(end_ns),);
979    }
980
981    #[rstest]
982    fn test_parse_limit_from_metadata() {
983        let limit = 1000;
984        let metadata = Some(params_from_json(json!({"limit": limit})));
985        let data_type = DataType::new(stringify!(TradeTick), metadata, None);
986
987        assert_eq!(data_type.limit().unwrap(), limit);
988    }
989
990    #[rstest]
991    fn test_data_type_persistence_json_with_identifier() {
992        let data_type = DataType::new("MyCustomType", None, Some("venue//symbol".to_string()));
993        let json = data_type.to_persistence_json().unwrap();
994        assert!(!json.contains("topic"));
995        assert!(json.contains("\"identifier\":\"venue//symbol\""));
996        let restored = DataType::from_persistence_json(&json).unwrap();
997        assert_eq!(restored.type_name(), "MyCustomType");
998        assert_eq!(restored.identifier(), Some("venue//symbol"));
999        assert_eq!(restored.topic(), "MyCustomType");
1000    }
1001
1002    #[rstest]
1003    fn test_data_type_identifier_getter() {
1004        let data_type = DataType::new("T", None, Some("id".to_string()));
1005        assert_eq!(data_type.identifier(), Some("id"));
1006        let data_type_no_id = DataType::new("T", None, None);
1007        assert_eq!(data_type_no_id.identifier(), None);
1008    }
1009}