1pub mod bar;
19pub mod bet;
20pub mod black_scholes;
21pub mod close;
22pub mod custom;
23pub mod delta;
24pub mod deltas;
25pub mod depth;
26pub mod forward;
27pub mod funding;
28pub mod greeks;
29pub mod option_chain;
30pub mod order;
31pub mod prices;
32pub mod quote;
33pub mod registry;
34pub mod status;
35pub mod trade;
36
37#[cfg(any(test, feature = "stubs"))]
38pub mod stubs;
39
40use std::{
41 fmt::{Debug, Display},
42 hash::{Hash, Hasher},
43 str::FromStr,
44};
45
46use nautilus_core::{Params, UnixNanos};
47use serde::{Deserialize, Serialize};
48use serde_json::{Value as JsonValue, to_string};
49
50#[rustfmt::skip] pub use bar::{Bar, BarSpecification, BarType};
53pub use black_scholes::Greeks;
54pub use close::InstrumentClose;
55#[cfg(feature = "python")]
56pub use custom::PythonCustomDataWrapper;
57pub use custom::{
58 CustomData, CustomDataTrait, ensure_custom_data_json_registered, register_custom_data_json,
59};
60#[cfg(feature = "python")]
61pub use custom::{
62 get_python_data_class, reconstruct_python_custom_data, register_python_data_class,
63};
64pub use delta::OrderBookDelta;
65pub use deltas::{OrderBookDeltas, OrderBookDeltas_API};
66pub use depth::{DEPTH10_LEN, OrderBookDepth10};
67pub use forward::ForwardPrice;
68pub use funding::FundingRateUpdate;
69pub use greeks::{
70 BlackScholesGreeksResult, GreeksData, HasGreeks, OptionGreekValues, PortfolioGreeks,
71 YieldCurveData, black_scholes_greeks, imply_vol_and_greeks, refine_vol_and_greeks,
72};
73pub use option_chain::{OptionChainSlice, OptionGreeks, OptionStrikeData, StrikeRange};
74pub use order::{BookOrder, NULL_ORDER};
75pub use prices::{IndexPriceUpdate, MarkPriceUpdate};
76pub use quote::QuoteTick;
77#[cfg(feature = "arrow")]
78pub use registry::{
79 ArrowDecoder, ArrowEncoder, decode_custom_from_arrow, encode_custom_to_arrow,
80 ensure_arrow_registered, get_arrow_schema, register_arrow,
81};
82#[cfg(feature = "python")]
83pub use registry::{
84 PyExtractor, ensure_py_extractor_registered, ensure_rust_extractor_factory_registered,
85 ensure_rust_extractor_registered, get_rust_extractor, register_py_extractor,
86 register_rust_extractor, register_rust_extractor_factory, try_extract_from_py,
87};
88pub use registry::{
89 deserialize_custom_from_json, ensure_json_deserializer_registered, register_json_deserializer,
90};
91pub use status::InstrumentStatus;
92pub use trade::TradeTick;
93
94use crate::identifiers::{InstrumentId, Venue};
95#[derive(Debug)]
100pub enum Data {
101 Delta(OrderBookDelta),
102 Deltas(OrderBookDeltas_API),
103 Depth10(Box<OrderBookDepth10>), Quote(QuoteTick),
105 Trade(TradeTick),
106 Bar(Bar),
107 MarkPriceUpdate(MarkPriceUpdate), IndexPriceUpdate(IndexPriceUpdate), InstrumentStatus(InstrumentStatus),
110 InstrumentClose(InstrumentClose),
111 Custom(CustomData),
112}
113
114#[cfg(feature = "ffi")]
119#[repr(C)]
120#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
121#[allow(non_camel_case_types)]
122pub enum DataFFI {
123 Delta(OrderBookDelta),
124 Deltas(OrderBookDeltas_API),
125 Depth10(Box<OrderBookDepth10>),
126 Quote(QuoteTick),
127 Trade(TradeTick),
128 Bar(Bar),
129 MarkPriceUpdate(MarkPriceUpdate),
130 IndexPriceUpdate(IndexPriceUpdate),
131 InstrumentClose(InstrumentClose),
132}
133
134#[cfg(feature = "ffi")]
135impl TryFrom<Data> for DataFFI {
136 type Error = anyhow::Error;
137
138 fn try_from(value: Data) -> Result<Self, Self::Error> {
139 match value {
140 Data::Delta(x) => Ok(Self::Delta(x)),
141 Data::Deltas(x) => Ok(Self::Deltas(x)),
142 Data::Depth10(x) => Ok(Self::Depth10(x)),
143 Data::Quote(x) => Ok(Self::Quote(x)),
144 Data::Trade(x) => Ok(Self::Trade(x)),
145 Data::Bar(x) => Ok(Self::Bar(x)),
146 Data::MarkPriceUpdate(x) => Ok(Self::MarkPriceUpdate(x)),
147 Data::IndexPriceUpdate(x) => Ok(Self::IndexPriceUpdate(x)),
148 Data::InstrumentStatus(_) => {
149 anyhow::bail!("Cannot convert Data::InstrumentStatus to DataFFI")
150 }
151 Data::InstrumentClose(x) => Ok(Self::InstrumentClose(x)),
152 Data::Custom(_) => anyhow::bail!("Cannot convert Data::Custom to DataFFI"),
153 }
154 }
155}
156
157#[cfg(feature = "ffi")]
158impl From<DataFFI> for Data {
159 fn from(value: DataFFI) -> Self {
160 match value {
161 DataFFI::Delta(x) => Self::Delta(x),
162 DataFFI::Deltas(x) => Self::Deltas(x),
163 DataFFI::Depth10(x) => Self::Depth10(x),
164 DataFFI::Quote(x) => Self::Quote(x),
165 DataFFI::Trade(x) => Self::Trade(x),
166 DataFFI::Bar(x) => Self::Bar(x),
167 DataFFI::MarkPriceUpdate(x) => Self::MarkPriceUpdate(x),
168 DataFFI::IndexPriceUpdate(x) => Self::IndexPriceUpdate(x),
169 DataFFI::InstrumentClose(x) => Self::InstrumentClose(x),
170 }
171 }
172}
173
174impl<'de> Deserialize<'de> for Data {
175 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
176 where
177 D: serde::Deserializer<'de>,
178 {
179 use serde::de::Error;
180 let value = serde_json::Value::deserialize(deserializer)?;
181 let type_name = value
182 .get("type")
183 .and_then(|v| v.as_str())
184 .ok_or_else(|| D::Error::custom("Missing 'type' field in Data"))?
185 .to_string();
186
187 match type_name.as_str() {
188 "OrderBookDelta" => Ok(Self::Delta(
189 serde_json::from_value(value).map_err(D::Error::custom)?,
190 )),
191 "OrderBookDeltas" => Ok(Self::Deltas(
192 serde_json::from_value(value).map_err(D::Error::custom)?,
193 )),
194 "OrderBookDepth10" => Ok(Self::Depth10(
195 serde_json::from_value(value).map_err(D::Error::custom)?,
196 )),
197 "QuoteTick" => Ok(Self::Quote(
198 serde_json::from_value(value).map_err(D::Error::custom)?,
199 )),
200 "TradeTick" => Ok(Self::Trade(
201 serde_json::from_value(value).map_err(D::Error::custom)?,
202 )),
203 "Bar" => Ok(Self::Bar(
204 serde_json::from_value(value).map_err(D::Error::custom)?,
205 )),
206 "MarkPriceUpdate" => Ok(Self::MarkPriceUpdate(
207 serde_json::from_value(value).map_err(D::Error::custom)?,
208 )),
209 "IndexPriceUpdate" => Ok(Self::IndexPriceUpdate(
210 serde_json::from_value(value).map_err(D::Error::custom)?,
211 )),
212 "InstrumentStatus" => Ok(Self::InstrumentStatus(
213 serde_json::from_value(value).map_err(D::Error::custom)?,
214 )),
215 "InstrumentClose" => Ok(Self::InstrumentClose(
216 serde_json::from_value(value).map_err(D::Error::custom)?,
217 )),
218 _ => {
219 if let Some(data) =
220 deserialize_custom_from_json(&type_name, &value).map_err(D::Error::custom)?
221 {
222 Ok(data)
223 } else {
224 Err(D::Error::custom(format!("Unknown Data type: {type_name}")))
225 }
226 }
227 }
228 }
229}
230
231impl Clone for Data {
232 fn clone(&self) -> Self {
233 match self {
234 Self::Delta(x) => Self::Delta(*x),
235 Self::Deltas(x) => Self::Deltas(x.clone()),
236 Self::Depth10(x) => Self::Depth10(x.clone()),
237 Self::Quote(x) => Self::Quote(*x),
238 Self::Trade(x) => Self::Trade(*x),
239 Self::Bar(x) => Self::Bar(*x),
240 Self::MarkPriceUpdate(x) => Self::MarkPriceUpdate(*x),
241 Self::IndexPriceUpdate(x) => Self::IndexPriceUpdate(*x),
242 Self::InstrumentStatus(x) => Self::InstrumentStatus(*x),
243 Self::InstrumentClose(x) => Self::InstrumentClose(*x),
244 Self::Custom(x) => Self::Custom(x.clone()),
245 }
246 }
247}
248
249impl PartialEq for Data {
250 fn eq(&self, other: &Self) -> bool {
251 match (self, other) {
252 (Self::Delta(a), Self::Delta(b)) => a == b,
253 (Self::Deltas(a), Self::Deltas(b)) => a == b,
254 (Self::Depth10(a), Self::Depth10(b)) => a == b,
255 (Self::Quote(a), Self::Quote(b)) => a == b,
256 (Self::Trade(a), Self::Trade(b)) => a == b,
257 (Self::Bar(a), Self::Bar(b)) => a == b,
258 (Self::MarkPriceUpdate(a), Self::MarkPriceUpdate(b)) => a == b,
259 (Self::IndexPriceUpdate(a), Self::IndexPriceUpdate(b)) => a == b,
260 (Self::InstrumentStatus(a), Self::InstrumentStatus(b)) => a == b,
261 (Self::InstrumentClose(a), Self::InstrumentClose(b)) => a == b,
262 (Self::Custom(a), Self::Custom(b)) => a == b,
263 _ => false,
264 }
265 }
266}
267
268impl Serialize for Data {
269 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
270 where
271 S: serde::Serializer,
272 {
273 match self {
274 Self::Delta(x) => x.serialize(serializer),
275 Self::Deltas(x) => x.serialize(serializer),
276 Self::Depth10(x) => x.serialize(serializer),
277 Self::Quote(x) => x.serialize(serializer),
278 Self::Trade(x) => x.serialize(serializer),
279 Self::Bar(x) => x.serialize(serializer),
280 Self::MarkPriceUpdate(x) => x.serialize(serializer),
281 Self::IndexPriceUpdate(x) => x.serialize(serializer),
282 Self::InstrumentStatus(x) => x.serialize(serializer),
283 Self::InstrumentClose(x) => x.serialize(serializer),
284 Self::Custom(x) => x.serialize(serializer),
285 }
286 }
287}
288
289macro_rules! impl_try_from_data {
290 ($variant:ident, $type:ty) => {
291 impl TryFrom<Data> for $type {
292 type Error = ();
293
294 fn try_from(value: Data) -> Result<Self, Self::Error> {
295 match value {
296 Data::$variant(x) => Ok(x),
297 _ => Err(()),
298 }
299 }
300 }
301 };
302}
303
304impl TryFrom<Data> for OrderBookDepth10 {
305 type Error = ();
306
307 fn try_from(value: Data) -> Result<Self, Self::Error> {
308 match value {
309 Data::Depth10(x) => Ok(*x),
310 _ => Err(()),
311 }
312 }
313}
314
315impl_try_from_data!(Quote, QuoteTick);
316impl_try_from_data!(Delta, OrderBookDelta);
317impl_try_from_data!(Deltas, OrderBookDeltas_API);
318impl_try_from_data!(Trade, TradeTick);
319impl_try_from_data!(Bar, Bar);
320impl_try_from_data!(MarkPriceUpdate, MarkPriceUpdate);
321impl_try_from_data!(IndexPriceUpdate, IndexPriceUpdate);
322impl_try_from_data!(InstrumentStatus, InstrumentStatus);
323impl_try_from_data!(InstrumentClose, InstrumentClose);
324
325#[must_use]
330pub fn to_variant<T: TryFrom<Data>>(data: Vec<Data>) -> Vec<T> {
331 data.into_iter()
332 .filter_map(|d| T::try_from(d).ok())
333 .collect()
334}
335
336impl Data {
337 #[must_use]
339 pub fn instrument_id(&self) -> InstrumentId {
340 match self {
341 Self::Delta(delta) => delta.instrument_id,
342 Self::Deltas(deltas) => deltas.instrument_id,
343 Self::Depth10(depth) => depth.instrument_id,
344 Self::Quote(quote) => quote.instrument_id,
345 Self::Trade(trade) => trade.instrument_id,
346 Self::Bar(bar) => bar.bar_type.instrument_id(),
347 Self::MarkPriceUpdate(mark_price) => mark_price.instrument_id,
348 Self::IndexPriceUpdate(index_price) => index_price.instrument_id,
349 Self::InstrumentStatus(status) => status.instrument_id,
350 Self::InstrumentClose(close) => close.instrument_id,
351 Self::Custom(custom) => custom
352 .data_type
353 .identifier()
354 .and_then(|s| InstrumentId::from_str(s).ok())
355 .or_else(|| {
356 custom
357 .data_type
358 .metadata()
359 .and_then(|m| m.get_str("instrument_id"))
360 .and_then(|s| InstrumentId::from_str(s).ok())
361 })
362 .unwrap_or_else(|| InstrumentId::from("NULL.NULL")),
363 }
364 }
365
366 #[must_use]
368 pub fn is_order_book_data(&self) -> bool {
369 matches!(self, Self::Delta(_) | Self::Deltas(_) | Self::Depth10(_))
370 }
371}
372
373pub trait HasTsInit {
379 fn ts_init(&self) -> UnixNanos;
381}
382
383pub trait CatalogPathPrefix {
385 fn path_prefix() -> &'static str;
387}
388
389#[macro_export]
399macro_rules! impl_catalog_path_prefix {
400 ($type:ty, $path:expr) => {
401 impl $crate::data::CatalogPathPrefix for $type {
402 fn path_prefix() -> &'static str {
403 $path
404 }
405 }
406 };
407}
408
409impl_catalog_path_prefix!(QuoteTick, "quotes");
411impl_catalog_path_prefix!(TradeTick, "trades");
412impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
413impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
414impl_catalog_path_prefix!(Bar, "bars");
415impl_catalog_path_prefix!(IndexPriceUpdate, "index_prices");
416impl_catalog_path_prefix!(MarkPriceUpdate, "mark_prices");
417impl_catalog_path_prefix!(FundingRateUpdate, "funding_rate_update");
418impl_catalog_path_prefix!(InstrumentStatus, "instrument_status");
419impl_catalog_path_prefix!(InstrumentClose, "instrument_closes");
420
421use crate::instruments::InstrumentAny;
422impl_catalog_path_prefix!(InstrumentAny, "instruments");
423
424impl HasTsInit for Data {
425 fn ts_init(&self) -> UnixNanos {
426 match self {
427 Self::Delta(d) => d.ts_init,
428 Self::Deltas(d) => d.ts_init,
429 Self::Depth10(d) => d.ts_init,
430 Self::Quote(q) => q.ts_init,
431 Self::Trade(t) => t.ts_init,
432 Self::Bar(b) => b.ts_init,
433 Self::MarkPriceUpdate(p) => p.ts_init,
434 Self::IndexPriceUpdate(p) => p.ts_init,
435 Self::InstrumentStatus(s) => s.ts_init,
436 Self::InstrumentClose(c) => c.ts_init,
437 Self::Custom(c) => c.data.ts_init(),
438 }
439 }
440}
441
442pub fn is_monotonically_increasing_by_init<T: HasTsInit>(data: &[T]) -> bool {
446 data.array_windows()
447 .all(|[a, b]| a.ts_init() <= b.ts_init())
448}
449
450impl From<OrderBookDelta> for Data {
451 fn from(value: OrderBookDelta) -> Self {
452 Self::Delta(value)
453 }
454}
455
456impl From<OrderBookDeltas_API> for Data {
457 fn from(value: OrderBookDeltas_API) -> Self {
458 Self::Deltas(value)
459 }
460}
461
462impl From<OrderBookDepth10> for Data {
463 fn from(value: OrderBookDepth10) -> Self {
464 Self::Depth10(Box::new(value))
465 }
466}
467
468impl From<QuoteTick> for Data {
469 fn from(value: QuoteTick) -> Self {
470 Self::Quote(value)
471 }
472}
473
474impl From<TradeTick> for Data {
475 fn from(value: TradeTick) -> Self {
476 Self::Trade(value)
477 }
478}
479
480impl From<Bar> for Data {
481 fn from(value: Bar) -> Self {
482 Self::Bar(value)
483 }
484}
485
486impl From<MarkPriceUpdate> for Data {
487 fn from(value: MarkPriceUpdate) -> Self {
488 Self::MarkPriceUpdate(value)
489 }
490}
491
492impl From<IndexPriceUpdate> for Data {
493 fn from(value: IndexPriceUpdate) -> Self {
494 Self::IndexPriceUpdate(value)
495 }
496}
497
498impl From<InstrumentStatus> for Data {
499 fn from(value: InstrumentStatus) -> Self {
500 Self::InstrumentStatus(value)
501 }
502}
503
504impl From<InstrumentClose> for Data {
505 fn from(value: InstrumentClose) -> Self {
506 Self::InstrumentClose(value)
507 }
508}
509
510fn value_to_topic_string(v: &JsonValue) -> String {
512 if let Some(s) = v.as_str() {
513 return s.to_string();
514 }
515
516 if let Some(n) = v.as_u64() {
517 return n.to_string();
518 }
519
520 if let Some(n) = v.as_i64() {
521 return n.to_string();
522 }
523
524 if let Some(b) = v.as_bool() {
525 return b.to_string();
526 }
527
528 if let Some(f) = v.as_f64() {
529 return f.to_string();
530 }
531
532 if v.is_null() {
533 return "null".to_string();
534 }
535 serde_json::to_string(v).unwrap_or_default()
536}
537
538fn params_to_topic_suffix(params: &Params) -> String {
540 params
541 .iter()
542 .map(|(k, v)| format!("{k}={}", value_to_topic_string(v)))
543 .collect::<Vec<_>>()
544 .join(".")
545}
546
547#[derive(Clone, Serialize, Deserialize)]
549#[cfg_attr(
550 feature = "python",
551 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.model", from_py_object)
552)]
553#[cfg_attr(
554 feature = "python",
555 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.model")
556)]
557pub struct DataType {
558 type_name: String,
559 metadata: Option<Params>,
560 topic: String,
561 hash: u64,
562 identifier: Option<String>,
563}
564
565impl DataType {
566 #[must_use]
568 pub fn new(type_name: &str, metadata: Option<Params>, identifier: Option<String>) -> Self {
569 let topic = if let Some(ref meta) = metadata {
571 if meta.is_empty() {
572 type_name.to_string()
573 } else {
574 format!("{type_name}.{}", params_to_topic_suffix(meta))
575 }
576 } else {
577 type_name.to_string()
578 };
579
580 let mut hasher = std::collections::hash_map::DefaultHasher::new();
582 topic.hash(&mut hasher);
583
584 Self {
585 type_name: type_name.to_owned(),
586 metadata,
587 topic,
588 hash: hasher.finish(),
589 identifier,
590 }
591 }
592
593 #[must_use]
597 pub fn from_parts(type_name: &str, topic: &str, metadata: Option<Params>) -> Self {
598 let mut hasher = std::collections::hash_map::DefaultHasher::new();
599 topic.hash(&mut hasher);
600 Self {
601 type_name: type_name.to_owned(),
602 metadata,
603 topic: topic.to_owned(),
604 hash: hasher.finish(),
605 identifier: None,
606 }
607 }
608
609 pub fn to_persistence_json(&self) -> Result<String, serde_json::Error> {
615 let mut map = serde_json::Map::new();
616 map.insert(
617 "type_name".to_string(),
618 serde_json::Value::String(self.type_name.clone()),
619 );
620 map.insert(
621 "metadata".to_string(),
622 self.metadata.as_ref().map_or(serde_json::Value::Null, |m| {
623 serde_json::to_value(m).unwrap_or(serde_json::Value::Null)
624 }),
625 );
626
627 if let Some(ref id) = self.identifier {
628 map.insert(
629 "identifier".to_string(),
630 serde_json::Value::String(id.clone()),
631 );
632 }
633 serde_json::to_string(&serde_json::Value::Object(map))
634 }
635
636 pub fn from_persistence_json(s: &str) -> Result<Self, anyhow::Error> {
643 let value: serde_json::Value =
644 serde_json::from_str(s).map_err(|e| anyhow::anyhow!("Invalid data_type JSON: {e}"))?;
645 let obj = value
646 .as_object()
647 .ok_or_else(|| anyhow::anyhow!("data_type must be a JSON object"))?;
648 let type_name = obj
649 .get("type_name")
650 .and_then(|v| v.as_str())
651 .ok_or_else(|| anyhow::anyhow!("data_type must have type_name"))?
652 .to_string();
653 let metadata = obj.get("metadata").and_then(|m| {
654 if m.is_null() {
655 None
656 } else {
657 let p: Params = serde_json::from_value(m.clone()).ok()?;
658 if p.is_empty() { None } else { Some(p) }
659 }
660 });
661 let identifier = obj
662 .get("identifier")
663 .and_then(|v| v.as_str())
664 .map(String::from);
665 Ok(Self::new(&type_name, metadata, identifier))
666 }
667
668 #[must_use]
670 pub fn type_name(&self) -> &str {
671 self.type_name.as_str()
672 }
673
674 #[must_use]
676 pub fn metadata(&self) -> Option<&Params> {
677 self.metadata.as_ref()
678 }
679
680 #[must_use]
682 pub fn metadata_str(&self) -> String {
683 self.metadata.as_ref().map_or_else(
684 || "null".to_string(),
685 |metadata| to_string(metadata).unwrap_or_default(),
686 )
687 }
688
689 #[must_use]
691 pub fn metadata_string_map(&self) -> Option<std::collections::HashMap<String, String>> {
692 self.metadata.as_ref().map(|p| {
693 p.iter()
694 .map(|(k, v)| (k.clone(), value_to_topic_string(v)))
695 .collect()
696 })
697 }
698
699 #[must_use]
701 pub fn precomputed_hash(&self) -> u64 {
702 self.hash
703 }
704
705 #[must_use]
707 pub fn topic(&self) -> &str {
708 self.topic.as_str()
709 }
710
711 #[must_use]
713 pub fn identifier(&self) -> Option<&str> {
714 self.identifier.as_deref()
715 }
716
717 #[must_use]
725 pub fn instrument_id(&self) -> Option<InstrumentId> {
726 let metadata = self.metadata.as_ref().expect("metadata was `None`");
727 let instrument_id = metadata.get_str("instrument_id")?;
728 Some(
729 InstrumentId::from_str(instrument_id)
730 .expect("Invalid `InstrumentId` for 'instrument_id'"),
731 )
732 }
733
734 #[must_use]
742 pub fn venue(&self) -> Option<Venue> {
743 let metadata = self.metadata.as_ref().expect("metadata was `None`");
744 let venue_str = metadata.get_str("venue")?;
745 Some(Venue::from(venue_str))
746 }
747
748 #[must_use]
756 pub fn start(&self) -> Option<UnixNanos> {
757 let metadata = self.metadata.as_ref()?;
758 let start_str = metadata.get_str("start")?;
759 Some(UnixNanos::from_str(start_str).expect("Invalid `UnixNanos` for 'start'"))
760 }
761
762 #[must_use]
770 pub fn end(&self) -> Option<UnixNanos> {
771 let metadata = self.metadata.as_ref()?;
772 let end_str = metadata.get_str("end")?;
773 Some(UnixNanos::from_str(end_str).expect("Invalid `UnixNanos` for 'end'"))
774 }
775
776 #[must_use]
784 pub fn limit(&self) -> Option<usize> {
785 let metadata = self.metadata.as_ref()?;
786 metadata.get_usize("limit").or_else(|| {
787 metadata
788 .get_str("limit")
789 .map(|s| s.parse::<usize>().expect("Invalid `usize` for 'limit'"))
790 })
791 }
792}
793
794impl PartialEq for DataType {
795 fn eq(&self, other: &Self) -> bool {
796 self.topic == other.topic
797 }
798}
799
800impl Eq for DataType {}
801
802impl PartialOrd for DataType {
803 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
804 Some(self.cmp(other))
805 }
806}
807
808impl Ord for DataType {
809 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
810 self.topic.cmp(&other.topic)
811 }
812}
813
814impl Hash for DataType {
815 fn hash<H: Hasher>(&self, state: &mut H) {
816 self.hash.hash(state);
817 }
818}
819
820impl Display for DataType {
821 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
822 write!(f, "{}", self.topic)
823 }
824}
825
826impl Debug for DataType {
827 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
828 write!(
829 f,
830 "DataType(type_name={}, metadata={:?}, identifier={:?})",
831 self.type_name, self.metadata, self.identifier
832 )
833 }
834}
835
836#[cfg(test)]
837mod tests {
838 use std::hash::DefaultHasher;
839
840 use rstest::*;
841 use serde_json::json;
842
843 use super::*;
844
845 fn params_from_json(value: serde_json::Value) -> Params {
846 serde_json::from_value(value).expect("valid Params JSON")
847 }
848
849 #[rstest]
850 fn test_data_type_creation_with_metadata() {
851 let metadata = Some(params_from_json(
852 json!({"key1": "value1", "key2": "value2"}),
853 ));
854 let data_type = DataType::new("ExampleType", metadata.clone(), None);
855
856 assert_eq!(data_type.type_name(), "ExampleType");
857 assert_eq!(data_type.topic(), "ExampleType.key1=value1.key2=value2");
858 assert_eq!(data_type.metadata(), metadata.as_ref());
859 }
860
861 #[rstest]
862 fn test_data_type_creation_without_metadata() {
863 let data_type = DataType::new("ExampleType", None, None);
864
865 assert_eq!(data_type.type_name(), "ExampleType");
866 assert_eq!(data_type.topic(), "ExampleType");
867 assert_eq!(data_type.metadata(), None);
868 }
869
870 #[rstest]
871 fn test_data_type_equality() {
872 let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
873 let metadata2 = Some(params_from_json(json!({"key1": "value1"})));
874
875 let data_type1 = DataType::new("ExampleType", metadata1, None);
876 let data_type2 = DataType::new("ExampleType", metadata2, None);
877
878 assert_eq!(data_type1, data_type2);
879 }
880
881 #[rstest]
882 fn test_data_type_inequality() {
883 let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
884 let metadata2 = Some(params_from_json(json!({"key2": "value2"})));
885
886 let data_type1 = DataType::new("ExampleType", metadata1, None);
887 let data_type2 = DataType::new("ExampleType", metadata2, None);
888
889 assert_ne!(data_type1, data_type2);
890 }
891
892 #[rstest]
893 fn test_data_type_ordering() {
894 let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
895 let metadata2 = Some(params_from_json(json!({"key2": "value2"})));
896
897 let data_type1 = DataType::new("ExampleTypeA", metadata1, None);
898 let data_type2 = DataType::new("ExampleTypeB", metadata2, None);
899
900 assert!(data_type1 < data_type2);
901 }
902
903 #[rstest]
904 fn test_data_type_hash() {
905 let metadata = Some(params_from_json(json!({"key1": "value1"})));
906
907 let data_type1 = DataType::new("ExampleType", metadata.clone(), None);
908 let data_type2 = DataType::new("ExampleType", metadata, None);
909
910 let mut hasher1 = DefaultHasher::new();
911 data_type1.hash(&mut hasher1);
912 let hash1 = hasher1.finish();
913
914 let mut hasher2 = DefaultHasher::new();
915 data_type2.hash(&mut hasher2);
916 let hash2 = hasher2.finish();
917
918 assert_eq!(hash1, hash2);
919 }
920
921 #[rstest]
922 fn test_data_type_display() {
923 let metadata = Some(params_from_json(json!({"key1": "value1"})));
924 let data_type = DataType::new("ExampleType", metadata, None);
925
926 assert_eq!(format!("{data_type}"), "ExampleType.key1=value1");
927 }
928
929 #[rstest]
930 fn test_data_type_debug() {
931 let metadata = Some(params_from_json(json!({"key1": "value1"})));
932 let data_type = DataType::new("ExampleType", metadata.clone(), None);
933
934 assert_eq!(
935 format!("{data_type:?}"),
936 format!("DataType(type_name=ExampleType, metadata={metadata:?}, identifier=None)")
937 );
938 }
939
940 #[rstest]
941 fn test_parse_instrument_id_from_metadata() {
942 let instrument_id_str = "MSFT.XNAS";
943 let metadata = Some(params_from_json(
944 json!({"instrument_id": instrument_id_str}),
945 ));
946 let data_type = DataType::new("InstrumentAny", metadata, None);
947
948 assert_eq!(
949 data_type.instrument_id().unwrap(),
950 InstrumentId::from_str(instrument_id_str).unwrap()
951 );
952 }
953
954 #[rstest]
955 fn test_parse_venue_from_metadata() {
956 let venue_str = "BINANCE";
957 let metadata = Some(params_from_json(json!({"venue": venue_str})));
958 let data_type = DataType::new(stringify!(InstrumentAny), metadata, None);
959
960 assert_eq!(data_type.venue().unwrap(), Venue::new(venue_str));
961 }
962
963 #[rstest]
964 fn test_parse_start_from_metadata() {
965 let start_ns = 1_600_054_595_844_758_000;
966 let metadata = Some(params_from_json(json!({"start": start_ns.to_string()})));
967 let data_type = DataType::new(stringify!(TradeTick), metadata, None);
968
969 assert_eq!(data_type.start().unwrap(), UnixNanos::from(start_ns),);
970 }
971
972 #[rstest]
973 fn test_parse_end_from_metadata() {
974 let end_ns = 1_720_954_595_844_758_000;
975 let metadata = Some(params_from_json(json!({"end": end_ns.to_string()})));
976 let data_type = DataType::new(stringify!(TradeTick), metadata, None);
977
978 assert_eq!(data_type.end().unwrap(), UnixNanos::from(end_ns),);
979 }
980
981 #[rstest]
982 fn test_parse_limit_from_metadata() {
983 let limit = 1000;
984 let metadata = Some(params_from_json(json!({"limit": limit})));
985 let data_type = DataType::new(stringify!(TradeTick), metadata, None);
986
987 assert_eq!(data_type.limit().unwrap(), limit);
988 }
989
990 #[rstest]
991 fn test_data_type_persistence_json_with_identifier() {
992 let data_type = DataType::new("MyCustomType", None, Some("venue//symbol".to_string()));
993 let json = data_type.to_persistence_json().unwrap();
994 assert!(!json.contains("topic"));
995 assert!(json.contains("\"identifier\":\"venue//symbol\""));
996 let restored = DataType::from_persistence_json(&json).unwrap();
997 assert_eq!(restored.type_name(), "MyCustomType");
998 assert_eq!(restored.identifier(), Some("venue//symbol"));
999 assert_eq!(restored.topic(), "MyCustomType");
1000 }
1001
1002 #[rstest]
1003 fn test_data_type_identifier_getter() {
1004 let data_type = DataType::new("T", None, Some("id".to_string()));
1005 assert_eq!(data_type.identifier(), Some("id"));
1006 let data_type_no_id = DataType::new("T", None, None);
1007 assert_eq!(data_type_no_id.identifier(), None);
1008 }
1009}