1pub mod bar;
19pub mod bet;
20pub mod black_scholes;
21pub mod close;
22pub mod custom;
23pub mod delta;
24pub mod deltas;
25pub mod depth;
26pub mod forward;
27pub mod funding;
28pub mod greeks;
29pub mod option_chain;
30pub mod order;
31pub mod prices;
32pub mod quote;
33pub mod registry;
34pub mod status;
35pub mod trade;
36
37#[cfg(any(test, feature = "stubs"))]
38pub mod stubs;
39
40use std::{
41 fmt::{Debug, Display},
42 hash::{Hash, Hasher},
43 str::FromStr,
44};
45
46use nautilus_core::{Params, UnixNanos};
47use serde::{Deserialize, Serialize};
48use serde_json::{Value as JsonValue, to_string};
49
50#[rustfmt::skip] pub use bar::{Bar, BarSpecification, BarType};
53pub use black_scholes::Greeks;
54pub use close::InstrumentClose;
55#[cfg(feature = "python")]
56pub use custom::PythonCustomDataWrapper;
57pub use custom::{
58 CustomData, CustomDataTrait, ensure_custom_data_json_registered, register_custom_data_json,
59};
60#[cfg(feature = "python")]
61pub use custom::{
62 get_python_data_class, reconstruct_python_custom_data, register_python_data_class,
63};
64pub use delta::OrderBookDelta;
65pub use deltas::{OrderBookDeltas, OrderBookDeltas_API};
66pub use depth::{DEPTH10_LEN, OrderBookDepth10};
67pub use forward::ForwardPrice;
68pub use funding::FundingRateUpdate;
69pub use greeks::{
70 BlackScholesGreeksResult, GreeksData, HasGreeks, OptionGreekValues, PortfolioGreeks,
71 YieldCurveData, black_scholes_greeks, imply_vol_and_greeks, refine_vol_and_greeks,
72};
73pub use option_chain::{OptionChainSlice, OptionGreeks, OptionStrikeData, StrikeRange};
74pub use order::{BookOrder, NULL_ORDER};
75pub use prices::{IndexPriceUpdate, MarkPriceUpdate};
76pub use quote::QuoteTick;
77pub use registry::{
78 ArrowDecoder, ArrowEncoder, decode_custom_from_arrow, deserialize_custom_from_json,
79 encode_custom_to_arrow, ensure_arrow_registered, ensure_json_deserializer_registered,
80 get_arrow_schema, register_arrow, register_json_deserializer,
81};
82#[cfg(feature = "python")]
83pub use registry::{
84 PyExtractor, ensure_py_extractor_registered, ensure_rust_extractor_factory_registered,
85 ensure_rust_extractor_registered, get_rust_extractor, register_py_extractor,
86 register_rust_extractor, register_rust_extractor_factory, try_extract_from_py,
87};
88pub use status::InstrumentStatus;
89pub use trade::TradeTick;
90
91use crate::identifiers::{InstrumentId, Venue};
92#[derive(Debug)]
97pub enum Data {
98 Delta(OrderBookDelta),
99 Deltas(OrderBookDeltas_API),
100 Depth10(Box<OrderBookDepth10>), Quote(QuoteTick),
102 Trade(TradeTick),
103 Bar(Bar),
104 MarkPriceUpdate(MarkPriceUpdate), IndexPriceUpdate(IndexPriceUpdate), InstrumentStatus(InstrumentStatus),
107 InstrumentClose(InstrumentClose),
108 Custom(CustomData),
109}
110
111#[cfg(feature = "ffi")]
116#[repr(C)]
117#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
118#[allow(non_camel_case_types)]
119pub enum DataFFI {
120 Delta(OrderBookDelta),
121 Deltas(OrderBookDeltas_API),
122 Depth10(Box<OrderBookDepth10>),
123 Quote(QuoteTick),
124 Trade(TradeTick),
125 Bar(Bar),
126 MarkPriceUpdate(MarkPriceUpdate),
127 IndexPriceUpdate(IndexPriceUpdate),
128 InstrumentClose(InstrumentClose),
129}
130
131#[cfg(feature = "ffi")]
132impl TryFrom<Data> for DataFFI {
133 type Error = anyhow::Error;
134
135 fn try_from(value: Data) -> Result<Self, Self::Error> {
136 match value {
137 Data::Delta(x) => Ok(Self::Delta(x)),
138 Data::Deltas(x) => Ok(Self::Deltas(x)),
139 Data::Depth10(x) => Ok(Self::Depth10(x)),
140 Data::Quote(x) => Ok(Self::Quote(x)),
141 Data::Trade(x) => Ok(Self::Trade(x)),
142 Data::Bar(x) => Ok(Self::Bar(x)),
143 Data::MarkPriceUpdate(x) => Ok(Self::MarkPriceUpdate(x)),
144 Data::IndexPriceUpdate(x) => Ok(Self::IndexPriceUpdate(x)),
145 Data::InstrumentStatus(_) => {
146 anyhow::bail!("Cannot convert Data::InstrumentStatus to DataFFI")
147 }
148 Data::InstrumentClose(x) => Ok(Self::InstrumentClose(x)),
149 Data::Custom(_) => anyhow::bail!("Cannot convert Data::Custom to DataFFI"),
150 }
151 }
152}
153
154#[cfg(feature = "ffi")]
155impl From<DataFFI> for Data {
156 fn from(value: DataFFI) -> Self {
157 match value {
158 DataFFI::Delta(x) => Self::Delta(x),
159 DataFFI::Deltas(x) => Self::Deltas(x),
160 DataFFI::Depth10(x) => Self::Depth10(x),
161 DataFFI::Quote(x) => Self::Quote(x),
162 DataFFI::Trade(x) => Self::Trade(x),
163 DataFFI::Bar(x) => Self::Bar(x),
164 DataFFI::MarkPriceUpdate(x) => Self::MarkPriceUpdate(x),
165 DataFFI::IndexPriceUpdate(x) => Self::IndexPriceUpdate(x),
166 DataFFI::InstrumentClose(x) => Self::InstrumentClose(x),
167 }
168 }
169}
170
171impl<'de> Deserialize<'de> for Data {
172 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
173 where
174 D: serde::Deserializer<'de>,
175 {
176 use serde::de::Error;
177 let value = serde_json::Value::deserialize(deserializer)?;
178 let type_name = value
179 .get("type")
180 .and_then(|v| v.as_str())
181 .ok_or_else(|| D::Error::custom("Missing 'type' field in Data"))?
182 .to_string();
183
184 match type_name.as_str() {
185 "OrderBookDelta" => Ok(Self::Delta(
186 serde_json::from_value(value).map_err(D::Error::custom)?,
187 )),
188 "OrderBookDeltas" => Ok(Self::Deltas(
189 serde_json::from_value(value).map_err(D::Error::custom)?,
190 )),
191 "OrderBookDepth10" => Ok(Self::Depth10(
192 serde_json::from_value(value).map_err(D::Error::custom)?,
193 )),
194 "QuoteTick" => Ok(Self::Quote(
195 serde_json::from_value(value).map_err(D::Error::custom)?,
196 )),
197 "TradeTick" => Ok(Self::Trade(
198 serde_json::from_value(value).map_err(D::Error::custom)?,
199 )),
200 "Bar" => Ok(Self::Bar(
201 serde_json::from_value(value).map_err(D::Error::custom)?,
202 )),
203 "MarkPriceUpdate" => Ok(Self::MarkPriceUpdate(
204 serde_json::from_value(value).map_err(D::Error::custom)?,
205 )),
206 "IndexPriceUpdate" => Ok(Self::IndexPriceUpdate(
207 serde_json::from_value(value).map_err(D::Error::custom)?,
208 )),
209 "InstrumentStatus" => Ok(Self::InstrumentStatus(
210 serde_json::from_value(value).map_err(D::Error::custom)?,
211 )),
212 "InstrumentClose" => Ok(Self::InstrumentClose(
213 serde_json::from_value(value).map_err(D::Error::custom)?,
214 )),
215 _ => {
216 if let Some(data) =
217 deserialize_custom_from_json(&type_name, &value).map_err(D::Error::custom)?
218 {
219 Ok(data)
220 } else {
221 Err(D::Error::custom(format!("Unknown Data type: {type_name}")))
222 }
223 }
224 }
225 }
226}
227
228impl Clone for Data {
229 fn clone(&self) -> Self {
230 match self {
231 Self::Delta(x) => Self::Delta(*x),
232 Self::Deltas(x) => Self::Deltas(x.clone()),
233 Self::Depth10(x) => Self::Depth10(x.clone()),
234 Self::Quote(x) => Self::Quote(*x),
235 Self::Trade(x) => Self::Trade(*x),
236 Self::Bar(x) => Self::Bar(*x),
237 Self::MarkPriceUpdate(x) => Self::MarkPriceUpdate(*x),
238 Self::IndexPriceUpdate(x) => Self::IndexPriceUpdate(*x),
239 Self::InstrumentStatus(x) => Self::InstrumentStatus(*x),
240 Self::InstrumentClose(x) => Self::InstrumentClose(*x),
241 Self::Custom(x) => Self::Custom(x.clone()),
242 }
243 }
244}
245
246impl PartialEq for Data {
247 fn eq(&self, other: &Self) -> bool {
248 match (self, other) {
249 (Self::Delta(a), Self::Delta(b)) => a == b,
250 (Self::Deltas(a), Self::Deltas(b)) => a == b,
251 (Self::Depth10(a), Self::Depth10(b)) => a == b,
252 (Self::Quote(a), Self::Quote(b)) => a == b,
253 (Self::Trade(a), Self::Trade(b)) => a == b,
254 (Self::Bar(a), Self::Bar(b)) => a == b,
255 (Self::MarkPriceUpdate(a), Self::MarkPriceUpdate(b)) => a == b,
256 (Self::IndexPriceUpdate(a), Self::IndexPriceUpdate(b)) => a == b,
257 (Self::InstrumentStatus(a), Self::InstrumentStatus(b)) => a == b,
258 (Self::InstrumentClose(a), Self::InstrumentClose(b)) => a == b,
259 (Self::Custom(a), Self::Custom(b)) => a == b,
260 _ => false,
261 }
262 }
263}
264
265impl Serialize for Data {
266 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
267 where
268 S: serde::Serializer,
269 {
270 match self {
271 Self::Delta(x) => x.serialize(serializer),
272 Self::Deltas(x) => x.serialize(serializer),
273 Self::Depth10(x) => x.serialize(serializer),
274 Self::Quote(x) => x.serialize(serializer),
275 Self::Trade(x) => x.serialize(serializer),
276 Self::Bar(x) => x.serialize(serializer),
277 Self::MarkPriceUpdate(x) => x.serialize(serializer),
278 Self::IndexPriceUpdate(x) => x.serialize(serializer),
279 Self::InstrumentStatus(x) => x.serialize(serializer),
280 Self::InstrumentClose(x) => x.serialize(serializer),
281 Self::Custom(x) => x.serialize(serializer),
282 }
283 }
284}
285
286macro_rules! impl_try_from_data {
287 ($variant:ident, $type:ty) => {
288 impl TryFrom<Data> for $type {
289 type Error = ();
290
291 fn try_from(value: Data) -> Result<Self, Self::Error> {
292 match value {
293 Data::$variant(x) => Ok(x),
294 _ => Err(()),
295 }
296 }
297 }
298 };
299}
300
301impl TryFrom<Data> for OrderBookDepth10 {
302 type Error = ();
303
304 fn try_from(value: Data) -> Result<Self, Self::Error> {
305 match value {
306 Data::Depth10(x) => Ok(*x),
307 _ => Err(()),
308 }
309 }
310}
311
312impl_try_from_data!(Quote, QuoteTick);
313impl_try_from_data!(Delta, OrderBookDelta);
314impl_try_from_data!(Deltas, OrderBookDeltas_API);
315impl_try_from_data!(Trade, TradeTick);
316impl_try_from_data!(Bar, Bar);
317impl_try_from_data!(MarkPriceUpdate, MarkPriceUpdate);
318impl_try_from_data!(IndexPriceUpdate, IndexPriceUpdate);
319impl_try_from_data!(InstrumentStatus, InstrumentStatus);
320impl_try_from_data!(InstrumentClose, InstrumentClose);
321
322#[must_use]
327pub fn to_variant<T: TryFrom<Data>>(data: Vec<Data>) -> Vec<T> {
328 data.into_iter()
329 .filter_map(|d| T::try_from(d).ok())
330 .collect()
331}
332
333impl Data {
334 #[must_use]
336 pub fn instrument_id(&self) -> InstrumentId {
337 match self {
338 Self::Delta(delta) => delta.instrument_id,
339 Self::Deltas(deltas) => deltas.instrument_id,
340 Self::Depth10(depth) => depth.instrument_id,
341 Self::Quote(quote) => quote.instrument_id,
342 Self::Trade(trade) => trade.instrument_id,
343 Self::Bar(bar) => bar.bar_type.instrument_id(),
344 Self::MarkPriceUpdate(mark_price) => mark_price.instrument_id,
345 Self::IndexPriceUpdate(index_price) => index_price.instrument_id,
346 Self::InstrumentStatus(status) => status.instrument_id,
347 Self::InstrumentClose(close) => close.instrument_id,
348 Self::Custom(custom) => custom
349 .data_type
350 .identifier()
351 .and_then(|s| InstrumentId::from_str(s).ok())
352 .or_else(|| {
353 custom
354 .data_type
355 .metadata()
356 .and_then(|m| m.get_str("instrument_id"))
357 .and_then(|s| InstrumentId::from_str(s).ok())
358 })
359 .unwrap_or_else(|| InstrumentId::from("NULL.NULL")),
360 }
361 }
362
363 #[must_use]
365 pub fn is_order_book_data(&self) -> bool {
366 matches!(self, Self::Delta(_) | Self::Deltas(_) | Self::Depth10(_))
367 }
368}
369
370pub trait HasTsInit {
376 fn ts_init(&self) -> UnixNanos;
378}
379
380pub trait CatalogPathPrefix {
382 fn path_prefix() -> &'static str;
384}
385
386#[macro_export]
396macro_rules! impl_catalog_path_prefix {
397 ($type:ty, $path:expr) => {
398 impl $crate::data::CatalogPathPrefix for $type {
399 fn path_prefix() -> &'static str {
400 $path
401 }
402 }
403 };
404}
405
406impl_catalog_path_prefix!(QuoteTick, "quotes");
408impl_catalog_path_prefix!(TradeTick, "trades");
409impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
410impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
411impl_catalog_path_prefix!(Bar, "bars");
412impl_catalog_path_prefix!(IndexPriceUpdate, "index_prices");
413impl_catalog_path_prefix!(MarkPriceUpdate, "mark_prices");
414impl_catalog_path_prefix!(FundingRateUpdate, "funding_rate_update");
415impl_catalog_path_prefix!(InstrumentStatus, "instrument_status");
416impl_catalog_path_prefix!(InstrumentClose, "instrument_closes");
417
418use crate::instruments::InstrumentAny;
419impl_catalog_path_prefix!(InstrumentAny, "instruments");
420
421impl HasTsInit for Data {
422 fn ts_init(&self) -> UnixNanos {
423 match self {
424 Self::Delta(d) => d.ts_init,
425 Self::Deltas(d) => d.ts_init,
426 Self::Depth10(d) => d.ts_init,
427 Self::Quote(q) => q.ts_init,
428 Self::Trade(t) => t.ts_init,
429 Self::Bar(b) => b.ts_init,
430 Self::MarkPriceUpdate(p) => p.ts_init,
431 Self::IndexPriceUpdate(p) => p.ts_init,
432 Self::InstrumentStatus(s) => s.ts_init,
433 Self::InstrumentClose(c) => c.ts_init,
434 Self::Custom(c) => c.data.ts_init(),
435 }
436 }
437}
438
439pub fn is_monotonically_increasing_by_init<T: HasTsInit>(data: &[T]) -> bool {
443 data.array_windows()
444 .all(|[a, b]| a.ts_init() <= b.ts_init())
445}
446
447impl From<OrderBookDelta> for Data {
448 fn from(value: OrderBookDelta) -> Self {
449 Self::Delta(value)
450 }
451}
452
453impl From<OrderBookDeltas_API> for Data {
454 fn from(value: OrderBookDeltas_API) -> Self {
455 Self::Deltas(value)
456 }
457}
458
459impl From<OrderBookDepth10> for Data {
460 fn from(value: OrderBookDepth10) -> Self {
461 Self::Depth10(Box::new(value))
462 }
463}
464
465impl From<QuoteTick> for Data {
466 fn from(value: QuoteTick) -> Self {
467 Self::Quote(value)
468 }
469}
470
471impl From<TradeTick> for Data {
472 fn from(value: TradeTick) -> Self {
473 Self::Trade(value)
474 }
475}
476
477impl From<Bar> for Data {
478 fn from(value: Bar) -> Self {
479 Self::Bar(value)
480 }
481}
482
483impl From<MarkPriceUpdate> for Data {
484 fn from(value: MarkPriceUpdate) -> Self {
485 Self::MarkPriceUpdate(value)
486 }
487}
488
489impl From<IndexPriceUpdate> for Data {
490 fn from(value: IndexPriceUpdate) -> Self {
491 Self::IndexPriceUpdate(value)
492 }
493}
494
495impl From<InstrumentStatus> for Data {
496 fn from(value: InstrumentStatus) -> Self {
497 Self::InstrumentStatus(value)
498 }
499}
500
501impl From<InstrumentClose> for Data {
502 fn from(value: InstrumentClose) -> Self {
503 Self::InstrumentClose(value)
504 }
505}
506
507fn value_to_topic_string(v: &JsonValue) -> String {
509 if let Some(s) = v.as_str() {
510 return s.to_string();
511 }
512
513 if let Some(n) = v.as_u64() {
514 return n.to_string();
515 }
516
517 if let Some(n) = v.as_i64() {
518 return n.to_string();
519 }
520
521 if let Some(b) = v.as_bool() {
522 return b.to_string();
523 }
524
525 if let Some(f) = v.as_f64() {
526 return f.to_string();
527 }
528
529 if v.is_null() {
530 return "null".to_string();
531 }
532 serde_json::to_string(v).unwrap_or_default()
533}
534
535fn params_to_topic_suffix(params: &Params) -> String {
537 params
538 .iter()
539 .map(|(k, v)| format!("{k}={}", value_to_topic_string(v)))
540 .collect::<Vec<_>>()
541 .join(".")
542}
543
544#[derive(Clone, Serialize, Deserialize)]
546#[cfg_attr(
547 feature = "python",
548 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.model", from_py_object)
549)]
550#[cfg_attr(
551 feature = "python",
552 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.model")
553)]
554pub struct DataType {
555 type_name: String,
556 metadata: Option<Params>,
557 topic: String,
558 hash: u64,
559 identifier: Option<String>,
560}
561
562impl DataType {
563 #[must_use]
565 pub fn new(type_name: &str, metadata: Option<Params>, identifier: Option<String>) -> Self {
566 let topic = if let Some(ref meta) = metadata {
568 if meta.is_empty() {
569 type_name.to_string()
570 } else {
571 format!("{type_name}.{}", params_to_topic_suffix(meta))
572 }
573 } else {
574 type_name.to_string()
575 };
576
577 let mut hasher = std::collections::hash_map::DefaultHasher::new();
579 topic.hash(&mut hasher);
580
581 Self {
582 type_name: type_name.to_owned(),
583 metadata,
584 topic,
585 hash: hasher.finish(),
586 identifier,
587 }
588 }
589
590 #[must_use]
594 pub fn from_parts(type_name: &str, topic: &str, metadata: Option<Params>) -> Self {
595 let mut hasher = std::collections::hash_map::DefaultHasher::new();
596 topic.hash(&mut hasher);
597 Self {
598 type_name: type_name.to_owned(),
599 metadata,
600 topic: topic.to_owned(),
601 hash: hasher.finish(),
602 identifier: None,
603 }
604 }
605
606 pub fn to_persistence_json(&self) -> Result<String, serde_json::Error> {
612 let mut map = serde_json::Map::new();
613 map.insert(
614 "type_name".to_string(),
615 serde_json::Value::String(self.type_name.clone()),
616 );
617 map.insert(
618 "metadata".to_string(),
619 self.metadata.as_ref().map_or(serde_json::Value::Null, |m| {
620 serde_json::to_value(m).unwrap_or(serde_json::Value::Null)
621 }),
622 );
623
624 if let Some(ref id) = self.identifier {
625 map.insert(
626 "identifier".to_string(),
627 serde_json::Value::String(id.clone()),
628 );
629 }
630 serde_json::to_string(&serde_json::Value::Object(map))
631 }
632
633 pub fn from_persistence_json(s: &str) -> Result<Self, anyhow::Error> {
640 let value: serde_json::Value =
641 serde_json::from_str(s).map_err(|e| anyhow::anyhow!("Invalid data_type JSON: {e}"))?;
642 let obj = value
643 .as_object()
644 .ok_or_else(|| anyhow::anyhow!("data_type must be a JSON object"))?;
645 let type_name = obj
646 .get("type_name")
647 .and_then(|v| v.as_str())
648 .ok_or_else(|| anyhow::anyhow!("data_type must have type_name"))?
649 .to_string();
650 let metadata = obj.get("metadata").and_then(|m| {
651 if m.is_null() {
652 None
653 } else {
654 let p: Params = serde_json::from_value(m.clone()).ok()?;
655 if p.is_empty() { None } else { Some(p) }
656 }
657 });
658 let identifier = obj
659 .get("identifier")
660 .and_then(|v| v.as_str())
661 .map(String::from);
662 Ok(Self::new(&type_name, metadata, identifier))
663 }
664
665 #[must_use]
667 pub fn type_name(&self) -> &str {
668 self.type_name.as_str()
669 }
670
671 #[must_use]
673 pub fn metadata(&self) -> Option<&Params> {
674 self.metadata.as_ref()
675 }
676
677 #[must_use]
679 pub fn metadata_str(&self) -> String {
680 self.metadata.as_ref().map_or_else(
681 || "null".to_string(),
682 |metadata| to_string(metadata).unwrap_or_default(),
683 )
684 }
685
686 #[must_use]
688 pub fn metadata_string_map(&self) -> Option<std::collections::HashMap<String, String>> {
689 self.metadata.as_ref().map(|p| {
690 p.iter()
691 .map(|(k, v)| (k.clone(), value_to_topic_string(v)))
692 .collect()
693 })
694 }
695
696 #[must_use]
698 pub fn precomputed_hash(&self) -> u64 {
699 self.hash
700 }
701
702 #[must_use]
704 pub fn topic(&self) -> &str {
705 self.topic.as_str()
706 }
707
708 #[must_use]
710 pub fn identifier(&self) -> Option<&str> {
711 self.identifier.as_deref()
712 }
713
714 #[must_use]
722 pub fn instrument_id(&self) -> Option<InstrumentId> {
723 let metadata = self.metadata.as_ref().expect("metadata was `None`");
724 let instrument_id = metadata.get_str("instrument_id")?;
725 Some(
726 InstrumentId::from_str(instrument_id)
727 .expect("Invalid `InstrumentId` for 'instrument_id'"),
728 )
729 }
730
731 #[must_use]
739 pub fn venue(&self) -> Option<Venue> {
740 let metadata = self.metadata.as_ref().expect("metadata was `None`");
741 let venue_str = metadata.get_str("venue")?;
742 Some(Venue::from(venue_str))
743 }
744
745 #[must_use]
753 pub fn start(&self) -> Option<UnixNanos> {
754 let metadata = self.metadata.as_ref()?;
755 let start_str = metadata.get_str("start")?;
756 Some(UnixNanos::from_str(start_str).expect("Invalid `UnixNanos` for 'start'"))
757 }
758
759 #[must_use]
767 pub fn end(&self) -> Option<UnixNanos> {
768 let metadata = self.metadata.as_ref()?;
769 let end_str = metadata.get_str("end")?;
770 Some(UnixNanos::from_str(end_str).expect("Invalid `UnixNanos` for 'end'"))
771 }
772
773 #[must_use]
781 pub fn limit(&self) -> Option<usize> {
782 let metadata = self.metadata.as_ref()?;
783 metadata.get_usize("limit").or_else(|| {
784 metadata
785 .get_str("limit")
786 .map(|s| s.parse::<usize>().expect("Invalid `usize` for 'limit'"))
787 })
788 }
789}
790
791impl PartialEq for DataType {
792 fn eq(&self, other: &Self) -> bool {
793 self.topic == other.topic
794 }
795}
796
797impl Eq for DataType {}
798
799impl PartialOrd for DataType {
800 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
801 Some(self.cmp(other))
802 }
803}
804
805impl Ord for DataType {
806 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
807 self.topic.cmp(&other.topic)
808 }
809}
810
811impl Hash for DataType {
812 fn hash<H: Hasher>(&self, state: &mut H) {
813 self.hash.hash(state);
814 }
815}
816
817impl Display for DataType {
818 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
819 write!(f, "{}", self.topic)
820 }
821}
822
823impl Debug for DataType {
824 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
825 write!(
826 f,
827 "DataType(type_name={}, metadata={:?}, identifier={:?})",
828 self.type_name, self.metadata, self.identifier
829 )
830 }
831}
832
833#[cfg(test)]
834mod tests {
835 use std::hash::DefaultHasher;
836
837 use rstest::*;
838 use serde_json::json;
839
840 use super::*;
841
842 fn params_from_json(value: serde_json::Value) -> Params {
843 serde_json::from_value(value).expect("valid Params JSON")
844 }
845
846 #[rstest]
847 fn test_data_type_creation_with_metadata() {
848 let metadata = Some(params_from_json(
849 json!({"key1": "value1", "key2": "value2"}),
850 ));
851 let data_type = DataType::new("ExampleType", metadata.clone(), None);
852
853 assert_eq!(data_type.type_name(), "ExampleType");
854 assert_eq!(data_type.topic(), "ExampleType.key1=value1.key2=value2");
855 assert_eq!(data_type.metadata(), metadata.as_ref());
856 }
857
858 #[rstest]
859 fn test_data_type_creation_without_metadata() {
860 let data_type = DataType::new("ExampleType", None, None);
861
862 assert_eq!(data_type.type_name(), "ExampleType");
863 assert_eq!(data_type.topic(), "ExampleType");
864 assert_eq!(data_type.metadata(), None);
865 }
866
867 #[rstest]
868 fn test_data_type_equality() {
869 let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
870 let metadata2 = Some(params_from_json(json!({"key1": "value1"})));
871
872 let data_type1 = DataType::new("ExampleType", metadata1, None);
873 let data_type2 = DataType::new("ExampleType", metadata2, None);
874
875 assert_eq!(data_type1, data_type2);
876 }
877
878 #[rstest]
879 fn test_data_type_inequality() {
880 let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
881 let metadata2 = Some(params_from_json(json!({"key2": "value2"})));
882
883 let data_type1 = DataType::new("ExampleType", metadata1, None);
884 let data_type2 = DataType::new("ExampleType", metadata2, None);
885
886 assert_ne!(data_type1, data_type2);
887 }
888
889 #[rstest]
890 fn test_data_type_ordering() {
891 let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
892 let metadata2 = Some(params_from_json(json!({"key2": "value2"})));
893
894 let data_type1 = DataType::new("ExampleTypeA", metadata1, None);
895 let data_type2 = DataType::new("ExampleTypeB", metadata2, None);
896
897 assert!(data_type1 < data_type2);
898 }
899
900 #[rstest]
901 fn test_data_type_hash() {
902 let metadata = Some(params_from_json(json!({"key1": "value1"})));
903
904 let data_type1 = DataType::new("ExampleType", metadata.clone(), None);
905 let data_type2 = DataType::new("ExampleType", metadata, None);
906
907 let mut hasher1 = DefaultHasher::new();
908 data_type1.hash(&mut hasher1);
909 let hash1 = hasher1.finish();
910
911 let mut hasher2 = DefaultHasher::new();
912 data_type2.hash(&mut hasher2);
913 let hash2 = hasher2.finish();
914
915 assert_eq!(hash1, hash2);
916 }
917
918 #[rstest]
919 fn test_data_type_display() {
920 let metadata = Some(params_from_json(json!({"key1": "value1"})));
921 let data_type = DataType::new("ExampleType", metadata, None);
922
923 assert_eq!(format!("{data_type}"), "ExampleType.key1=value1");
924 }
925
926 #[rstest]
927 fn test_data_type_debug() {
928 let metadata = Some(params_from_json(json!({"key1": "value1"})));
929 let data_type = DataType::new("ExampleType", metadata.clone(), None);
930
931 assert_eq!(
932 format!("{data_type:?}"),
933 format!("DataType(type_name=ExampleType, metadata={metadata:?}, identifier=None)")
934 );
935 }
936
937 #[rstest]
938 fn test_parse_instrument_id_from_metadata() {
939 let instrument_id_str = "MSFT.XNAS";
940 let metadata = Some(params_from_json(
941 json!({"instrument_id": instrument_id_str}),
942 ));
943 let data_type = DataType::new("InstrumentAny", metadata, None);
944
945 assert_eq!(
946 data_type.instrument_id().unwrap(),
947 InstrumentId::from_str(instrument_id_str).unwrap()
948 );
949 }
950
951 #[rstest]
952 fn test_parse_venue_from_metadata() {
953 let venue_str = "BINANCE";
954 let metadata = Some(params_from_json(json!({"venue": venue_str})));
955 let data_type = DataType::new(stringify!(InstrumentAny), metadata, None);
956
957 assert_eq!(data_type.venue().unwrap(), Venue::new(venue_str));
958 }
959
960 #[rstest]
961 fn test_parse_start_from_metadata() {
962 let start_ns = 1_600_054_595_844_758_000;
963 let metadata = Some(params_from_json(json!({"start": start_ns.to_string()})));
964 let data_type = DataType::new(stringify!(TradeTick), metadata, None);
965
966 assert_eq!(data_type.start().unwrap(), UnixNanos::from(start_ns),);
967 }
968
969 #[rstest]
970 fn test_parse_end_from_metadata() {
971 let end_ns = 1_720_954_595_844_758_000;
972 let metadata = Some(params_from_json(json!({"end": end_ns.to_string()})));
973 let data_type = DataType::new(stringify!(TradeTick), metadata, None);
974
975 assert_eq!(data_type.end().unwrap(), UnixNanos::from(end_ns),);
976 }
977
978 #[rstest]
979 fn test_parse_limit_from_metadata() {
980 let limit = 1000;
981 let metadata = Some(params_from_json(json!({"limit": limit})));
982 let data_type = DataType::new(stringify!(TradeTick), metadata, None);
983
984 assert_eq!(data_type.limit().unwrap(), limit);
985 }
986
987 #[rstest]
988 fn test_data_type_persistence_json_with_identifier() {
989 let data_type = DataType::new("MyCustomType", None, Some("venue//symbol".to_string()));
990 let json = data_type.to_persistence_json().unwrap();
991 assert!(!json.contains("topic"));
992 assert!(json.contains("\"identifier\":\"venue//symbol\""));
993 let restored = DataType::from_persistence_json(&json).unwrap();
994 assert_eq!(restored.type_name(), "MyCustomType");
995 assert_eq!(restored.identifier(), Some("venue//symbol"));
996 assert_eq!(restored.topic(), "MyCustomType");
997 }
998
999 #[rstest]
1000 fn test_data_type_identifier_getter() {
1001 let data_type = DataType::new("T", None, Some("id".to_string()));
1002 assert_eq!(data_type.identifier(), Some("id"));
1003 let data_type_no_id = DataType::new("T", None, None);
1004 assert_eq!(data_type_no_id.identifier(), None);
1005 }
1006}