1pub mod bar;
19pub mod bet;
20pub mod black_scholes;
21pub mod close;
22pub mod custom;
23pub mod delta;
24pub mod deltas;
25pub mod depth;
26pub mod forward;
27pub mod funding;
28pub mod greeks;
29pub mod option_chain;
30pub mod order;
31pub mod prices;
32pub mod quote;
33pub mod registry;
34pub mod status;
35pub mod trade;
36
37#[cfg(any(test, feature = "stubs"))]
38pub mod stubs;
39
40use std::{
41 fmt::{Debug, Display},
42 hash::{Hash, Hasher},
43 str::FromStr,
44};
45
46use nautilus_core::{Params, UnixNanos};
47use serde::{Deserialize, Serialize};
48use serde_json::{Value as JsonValue, to_string};
49
50#[rustfmt::skip] pub use bar::{Bar, BarSpecification, BarType};
53pub use black_scholes::Greeks;
54pub use close::InstrumentClose;
55#[cfg(feature = "python")]
56pub use custom::PythonCustomDataWrapper;
57pub use custom::{
58 CustomData, CustomDataTrait, ensure_custom_data_json_registered, register_custom_data_json,
59};
60#[cfg(feature = "python")]
61pub use custom::{
62 get_python_data_class, reconstruct_python_custom_data, register_python_data_class,
63};
64pub use delta::OrderBookDelta;
65pub use deltas::{OrderBookDeltas, OrderBookDeltas_API};
66pub use depth::{DEPTH10_LEN, OrderBookDepth10};
67pub use forward::ForwardPrice;
68pub use funding::FundingRateUpdate;
69pub use greeks::{
70 BlackScholesGreeksResult, GreeksData, HasGreeks, OptionGreekValues, PortfolioGreeks,
71 YieldCurveData, black_scholes_greeks, imply_vol_and_greeks, refine_vol_and_greeks,
72};
73pub use option_chain::{OptionChainSlice, OptionGreeks, OptionStrikeData, StrikeRange};
74pub use order::{BookOrder, NULL_ORDER};
75pub use prices::{IndexPriceUpdate, MarkPriceUpdate};
76pub use quote::QuoteTick;
77pub use registry::{
78 ArrowDecoder, ArrowEncoder, decode_custom_from_arrow, deserialize_custom_from_json,
79 encode_custom_to_arrow, ensure_arrow_registered, ensure_json_deserializer_registered,
80 get_arrow_schema, register_arrow, register_json_deserializer,
81};
82#[cfg(feature = "python")]
83pub use registry::{
84 PyExtractor, ensure_py_extractor_registered, ensure_rust_extractor_factory_registered,
85 ensure_rust_extractor_registered, get_rust_extractor, register_py_extractor,
86 register_rust_extractor, register_rust_extractor_factory, try_extract_from_py,
87};
88pub use status::InstrumentStatus;
89pub use trade::TradeTick;
90
91use crate::identifiers::{InstrumentId, Venue};
92#[derive(Debug)]
97pub enum Data {
98 Delta(OrderBookDelta),
99 Deltas(OrderBookDeltas_API),
100 Depth10(Box<OrderBookDepth10>), Quote(QuoteTick),
102 Trade(TradeTick),
103 Bar(Bar),
104 MarkPriceUpdate(MarkPriceUpdate), IndexPriceUpdate(IndexPriceUpdate), InstrumentClose(InstrumentClose),
107 Custom(CustomData),
108}
109
110#[cfg(feature = "ffi")]
115#[repr(C)]
116#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
117#[allow(non_camel_case_types)]
118pub enum DataFFI {
119 Delta(OrderBookDelta),
120 Deltas(OrderBookDeltas_API),
121 Depth10(Box<OrderBookDepth10>),
122 Quote(QuoteTick),
123 Trade(TradeTick),
124 Bar(Bar),
125 MarkPriceUpdate(MarkPriceUpdate),
126 IndexPriceUpdate(IndexPriceUpdate),
127 InstrumentClose(InstrumentClose),
128}
129
130#[cfg(feature = "ffi")]
131impl TryFrom<Data> for DataFFI {
132 type Error = anyhow::Error;
133
134 fn try_from(value: Data) -> Result<Self, Self::Error> {
135 match value {
136 Data::Delta(x) => Ok(Self::Delta(x)),
137 Data::Deltas(x) => Ok(Self::Deltas(x)),
138 Data::Depth10(x) => Ok(Self::Depth10(x)),
139 Data::Quote(x) => Ok(Self::Quote(x)),
140 Data::Trade(x) => Ok(Self::Trade(x)),
141 Data::Bar(x) => Ok(Self::Bar(x)),
142 Data::MarkPriceUpdate(x) => Ok(Self::MarkPriceUpdate(x)),
143 Data::IndexPriceUpdate(x) => Ok(Self::IndexPriceUpdate(x)),
144 Data::InstrumentClose(x) => Ok(Self::InstrumentClose(x)),
145 Data::Custom(_) => anyhow::bail!("Cannot convert Data::Custom to DataFFI"),
146 }
147 }
148}
149
150#[cfg(feature = "ffi")]
151impl From<DataFFI> for Data {
152 fn from(value: DataFFI) -> Self {
153 match value {
154 DataFFI::Delta(x) => Self::Delta(x),
155 DataFFI::Deltas(x) => Self::Deltas(x),
156 DataFFI::Depth10(x) => Self::Depth10(x),
157 DataFFI::Quote(x) => Self::Quote(x),
158 DataFFI::Trade(x) => Self::Trade(x),
159 DataFFI::Bar(x) => Self::Bar(x),
160 DataFFI::MarkPriceUpdate(x) => Self::MarkPriceUpdate(x),
161 DataFFI::IndexPriceUpdate(x) => Self::IndexPriceUpdate(x),
162 DataFFI::InstrumentClose(x) => Self::InstrumentClose(x),
163 }
164 }
165}
166
167impl<'de> Deserialize<'de> for Data {
168 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
169 where
170 D: serde::Deserializer<'de>,
171 {
172 use serde::de::Error;
173 let value = serde_json::Value::deserialize(deserializer)?;
174 let type_name = value
175 .get("type")
176 .and_then(|v| v.as_str())
177 .ok_or_else(|| D::Error::custom("Missing 'type' field in Data"))?
178 .to_string();
179
180 match type_name.as_str() {
181 "OrderBookDelta" => Ok(Self::Delta(
182 serde_json::from_value(value).map_err(D::Error::custom)?,
183 )),
184 "OrderBookDeltas" => Ok(Self::Deltas(
185 serde_json::from_value(value).map_err(D::Error::custom)?,
186 )),
187 "OrderBookDepth10" => Ok(Self::Depth10(
188 serde_json::from_value(value).map_err(D::Error::custom)?,
189 )),
190 "QuoteTick" => Ok(Self::Quote(
191 serde_json::from_value(value).map_err(D::Error::custom)?,
192 )),
193 "TradeTick" => Ok(Self::Trade(
194 serde_json::from_value(value).map_err(D::Error::custom)?,
195 )),
196 "Bar" => Ok(Self::Bar(
197 serde_json::from_value(value).map_err(D::Error::custom)?,
198 )),
199 "MarkPriceUpdate" => Ok(Self::MarkPriceUpdate(
200 serde_json::from_value(value).map_err(D::Error::custom)?,
201 )),
202 "IndexPriceUpdate" => Ok(Self::IndexPriceUpdate(
203 serde_json::from_value(value).map_err(D::Error::custom)?,
204 )),
205 "InstrumentClose" => Ok(Self::InstrumentClose(
206 serde_json::from_value(value).map_err(D::Error::custom)?,
207 )),
208 _ => {
209 if let Some(data) =
210 deserialize_custom_from_json(&type_name, &value).map_err(D::Error::custom)?
211 {
212 Ok(data)
213 } else {
214 Err(D::Error::custom(format!("Unknown Data type: {type_name}")))
215 }
216 }
217 }
218 }
219}
220
221impl Clone for Data {
222 fn clone(&self) -> Self {
223 match self {
224 Self::Delta(x) => Self::Delta(*x),
225 Self::Deltas(x) => Self::Deltas(x.clone()),
226 Self::Depth10(x) => Self::Depth10(x.clone()),
227 Self::Quote(x) => Self::Quote(*x),
228 Self::Trade(x) => Self::Trade(*x),
229 Self::Bar(x) => Self::Bar(*x),
230 Self::MarkPriceUpdate(x) => Self::MarkPriceUpdate(*x),
231 Self::IndexPriceUpdate(x) => Self::IndexPriceUpdate(*x),
232 Self::InstrumentClose(x) => Self::InstrumentClose(*x),
233 Self::Custom(x) => Self::Custom(x.clone()),
234 }
235 }
236}
237
238impl PartialEq for Data {
239 fn eq(&self, other: &Self) -> bool {
240 match (self, other) {
241 (Self::Delta(a), Self::Delta(b)) => a == b,
242 (Self::Deltas(a), Self::Deltas(b)) => a == b,
243 (Self::Depth10(a), Self::Depth10(b)) => a == b,
244 (Self::Quote(a), Self::Quote(b)) => a == b,
245 (Self::Trade(a), Self::Trade(b)) => a == b,
246 (Self::Bar(a), Self::Bar(b)) => a == b,
247 (Self::MarkPriceUpdate(a), Self::MarkPriceUpdate(b)) => a == b,
248 (Self::IndexPriceUpdate(a), Self::IndexPriceUpdate(b)) => a == b,
249 (Self::InstrumentClose(a), Self::InstrumentClose(b)) => a == b,
250 (Self::Custom(a), Self::Custom(b)) => a == b,
251 _ => false,
252 }
253 }
254}
255
256impl Serialize for Data {
257 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
258 where
259 S: serde::Serializer,
260 {
261 match self {
262 Self::Delta(x) => x.serialize(serializer),
263 Self::Deltas(x) => x.serialize(serializer),
264 Self::Depth10(x) => x.serialize(serializer),
265 Self::Quote(x) => x.serialize(serializer),
266 Self::Trade(x) => x.serialize(serializer),
267 Self::Bar(x) => x.serialize(serializer),
268 Self::MarkPriceUpdate(x) => x.serialize(serializer),
269 Self::IndexPriceUpdate(x) => x.serialize(serializer),
270 Self::InstrumentClose(x) => x.serialize(serializer),
271 Self::Custom(x) => x.serialize(serializer),
272 }
273 }
274}
275
276macro_rules! impl_try_from_data {
277 ($variant:ident, $type:ty) => {
278 impl TryFrom<Data> for $type {
279 type Error = ();
280
281 fn try_from(value: Data) -> Result<Self, Self::Error> {
282 match value {
283 Data::$variant(x) => Ok(x),
284 _ => Err(()),
285 }
286 }
287 }
288 };
289}
290
291impl TryFrom<Data> for OrderBookDepth10 {
292 type Error = ();
293
294 fn try_from(value: Data) -> Result<Self, Self::Error> {
295 match value {
296 Data::Depth10(x) => Ok(*x),
297 _ => Err(()),
298 }
299 }
300}
301
302impl_try_from_data!(Quote, QuoteTick);
303impl_try_from_data!(Delta, OrderBookDelta);
304impl_try_from_data!(Deltas, OrderBookDeltas_API);
305impl_try_from_data!(Trade, TradeTick);
306impl_try_from_data!(Bar, Bar);
307impl_try_from_data!(MarkPriceUpdate, MarkPriceUpdate);
308impl_try_from_data!(IndexPriceUpdate, IndexPriceUpdate);
309impl_try_from_data!(InstrumentClose, InstrumentClose);
310
311pub fn to_variant<T: TryFrom<Data>>(data: Vec<Data>) -> Vec<T> {
316 data.into_iter()
317 .filter_map(|d| T::try_from(d).ok())
318 .collect()
319}
320
321impl Data {
322 pub fn instrument_id(&self) -> InstrumentId {
324 match self {
325 Self::Delta(delta) => delta.instrument_id,
326 Self::Deltas(deltas) => deltas.instrument_id,
327 Self::Depth10(depth) => depth.instrument_id,
328 Self::Quote(quote) => quote.instrument_id,
329 Self::Trade(trade) => trade.instrument_id,
330 Self::Bar(bar) => bar.bar_type.instrument_id(),
331 Self::MarkPriceUpdate(mark_price) => mark_price.instrument_id,
332 Self::IndexPriceUpdate(index_price) => index_price.instrument_id,
333 Self::InstrumentClose(close) => close.instrument_id,
334 Self::Custom(custom) => custom
335 .data_type
336 .identifier()
337 .and_then(|s| InstrumentId::from_str(s).ok())
338 .or_else(|| {
339 custom
340 .data_type
341 .metadata()
342 .and_then(|m| m.get_str("instrument_id"))
343 .and_then(|s| InstrumentId::from_str(s).ok())
344 })
345 .unwrap_or_else(|| InstrumentId::from("NULL.NULL")),
346 }
347 }
348
349 pub fn is_order_book_data(&self) -> bool {
351 matches!(self, Self::Delta(_) | Self::Deltas(_) | Self::Depth10(_))
352 }
353}
354
355pub trait HasTsInit {
361 fn ts_init(&self) -> UnixNanos;
363}
364
365pub trait CatalogPathPrefix {
367 fn path_prefix() -> &'static str;
369}
370
371#[macro_export]
381macro_rules! impl_catalog_path_prefix {
382 ($type:ty, $path:expr) => {
383 impl $crate::data::CatalogPathPrefix for $type {
384 fn path_prefix() -> &'static str {
385 $path
386 }
387 }
388 };
389}
390
391impl_catalog_path_prefix!(QuoteTick, "quotes");
393impl_catalog_path_prefix!(TradeTick, "trades");
394impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
395impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
396impl_catalog_path_prefix!(Bar, "bars");
397impl_catalog_path_prefix!(IndexPriceUpdate, "index_prices");
398impl_catalog_path_prefix!(MarkPriceUpdate, "mark_prices");
399impl_catalog_path_prefix!(InstrumentClose, "instrument_closes");
400
401use crate::instruments::InstrumentAny;
402impl_catalog_path_prefix!(InstrumentAny, "instruments");
403
404impl HasTsInit for Data {
405 fn ts_init(&self) -> UnixNanos {
406 match self {
407 Self::Delta(d) => d.ts_init,
408 Self::Deltas(d) => d.ts_init,
409 Self::Depth10(d) => d.ts_init,
410 Self::Quote(q) => q.ts_init,
411 Self::Trade(t) => t.ts_init,
412 Self::Bar(b) => b.ts_init,
413 Self::MarkPriceUpdate(p) => p.ts_init,
414 Self::IndexPriceUpdate(p) => p.ts_init,
415 Self::InstrumentClose(c) => c.ts_init,
416 Self::Custom(c) => c.data.ts_init(),
417 }
418 }
419}
420
421pub fn is_monotonically_increasing_by_init<T: HasTsInit>(data: &[T]) -> bool {
425 data.array_windows()
426 .all(|[a, b]| a.ts_init() <= b.ts_init())
427}
428
429impl From<OrderBookDelta> for Data {
430 fn from(value: OrderBookDelta) -> Self {
431 Self::Delta(value)
432 }
433}
434
435impl From<OrderBookDeltas_API> for Data {
436 fn from(value: OrderBookDeltas_API) -> Self {
437 Self::Deltas(value)
438 }
439}
440
441impl From<OrderBookDepth10> for Data {
442 fn from(value: OrderBookDepth10) -> Self {
443 Self::Depth10(Box::new(value))
444 }
445}
446
447impl From<QuoteTick> for Data {
448 fn from(value: QuoteTick) -> Self {
449 Self::Quote(value)
450 }
451}
452
453impl From<TradeTick> for Data {
454 fn from(value: TradeTick) -> Self {
455 Self::Trade(value)
456 }
457}
458
459impl From<Bar> for Data {
460 fn from(value: Bar) -> Self {
461 Self::Bar(value)
462 }
463}
464
465impl From<MarkPriceUpdate> for Data {
466 fn from(value: MarkPriceUpdate) -> Self {
467 Self::MarkPriceUpdate(value)
468 }
469}
470
471impl From<IndexPriceUpdate> for Data {
472 fn from(value: IndexPriceUpdate) -> Self {
473 Self::IndexPriceUpdate(value)
474 }
475}
476
477impl From<InstrumentClose> for Data {
478 fn from(value: InstrumentClose) -> Self {
479 Self::InstrumentClose(value)
480 }
481}
482
483fn value_to_topic_string(v: &JsonValue) -> String {
485 if let Some(s) = v.as_str() {
486 return s.to_string();
487 }
488
489 if let Some(n) = v.as_u64() {
490 return n.to_string();
491 }
492
493 if let Some(n) = v.as_i64() {
494 return n.to_string();
495 }
496
497 if let Some(b) = v.as_bool() {
498 return b.to_string();
499 }
500
501 if let Some(f) = v.as_f64() {
502 return f.to_string();
503 }
504
505 if v.is_null() {
506 return "null".to_string();
507 }
508 serde_json::to_string(v).unwrap_or_default()
509}
510
511fn params_to_topic_suffix(params: &Params) -> String {
513 params
514 .iter()
515 .map(|(k, v)| format!("{k}={}", value_to_topic_string(v)))
516 .collect::<Vec<_>>()
517 .join(".")
518}
519
520#[derive(Clone, Serialize, Deserialize)]
522#[cfg_attr(
523 feature = "python",
524 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.model", from_py_object)
525)]
526#[cfg_attr(
527 feature = "python",
528 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.model")
529)]
530pub struct DataType {
531 type_name: String,
532 metadata: Option<Params>,
533 topic: String,
534 hash: u64,
535 identifier: Option<String>,
536}
537
538impl DataType {
539 pub fn new(type_name: &str, metadata: Option<Params>, identifier: Option<String>) -> Self {
541 let topic = if let Some(ref meta) = metadata {
543 if meta.is_empty() {
544 type_name.to_string()
545 } else {
546 format!("{type_name}.{}", params_to_topic_suffix(meta))
547 }
548 } else {
549 type_name.to_string()
550 };
551
552 let mut hasher = std::collections::hash_map::DefaultHasher::new();
554 topic.hash(&mut hasher);
555
556 Self {
557 type_name: type_name.to_owned(),
558 metadata,
559 topic,
560 hash: hasher.finish(),
561 identifier,
562 }
563 }
564
565 pub fn from_parts(type_name: &str, topic: &str, metadata: Option<Params>) -> Self {
569 let mut hasher = std::collections::hash_map::DefaultHasher::new();
570 topic.hash(&mut hasher);
571 Self {
572 type_name: type_name.to_owned(),
573 metadata,
574 topic: topic.to_owned(),
575 hash: hasher.finish(),
576 identifier: None,
577 }
578 }
579
580 pub fn to_persistence_json(&self) -> Result<String, serde_json::Error> {
586 let mut map = serde_json::Map::new();
587 map.insert(
588 "type_name".to_string(),
589 serde_json::Value::String(self.type_name.clone()),
590 );
591 map.insert(
592 "metadata".to_string(),
593 self.metadata.as_ref().map_or(serde_json::Value::Null, |m| {
594 serde_json::to_value(m).unwrap_or(serde_json::Value::Null)
595 }),
596 );
597
598 if let Some(ref id) = self.identifier {
599 map.insert(
600 "identifier".to_string(),
601 serde_json::Value::String(id.clone()),
602 );
603 }
604 serde_json::to_string(&serde_json::Value::Object(map))
605 }
606
607 pub fn from_persistence_json(s: &str) -> Result<Self, anyhow::Error> {
614 let value: serde_json::Value =
615 serde_json::from_str(s).map_err(|e| anyhow::anyhow!("Invalid data_type JSON: {e}"))?;
616 let obj = value
617 .as_object()
618 .ok_or_else(|| anyhow::anyhow!("data_type must be a JSON object"))?;
619 let type_name = obj
620 .get("type_name")
621 .and_then(|v| v.as_str())
622 .ok_or_else(|| anyhow::anyhow!("data_type must have type_name"))?
623 .to_string();
624 let metadata = obj.get("metadata").and_then(|m| {
625 if m.is_null() {
626 None
627 } else {
628 let p: Params = serde_json::from_value(m.clone()).ok()?;
629 if p.is_empty() { None } else { Some(p) }
630 }
631 });
632 let identifier = obj
633 .get("identifier")
634 .and_then(|v| v.as_str())
635 .map(String::from);
636 Ok(Self::new(&type_name, metadata, identifier))
637 }
638
639 pub fn type_name(&self) -> &str {
641 self.type_name.as_str()
642 }
643
644 pub fn metadata(&self) -> Option<&Params> {
646 self.metadata.as_ref()
647 }
648
649 pub fn metadata_str(&self) -> String {
651 self.metadata.as_ref().map_or_else(
652 || "null".to_string(),
653 |metadata| to_string(metadata).unwrap_or_default(),
654 )
655 }
656
657 pub fn metadata_string_map(&self) -> Option<std::collections::HashMap<String, String>> {
659 self.metadata.as_ref().map(|p| {
660 p.iter()
661 .map(|(k, v)| (k.clone(), value_to_topic_string(v)))
662 .collect()
663 })
664 }
665
666 pub fn precomputed_hash(&self) -> u64 {
668 self.hash
669 }
670
671 pub fn topic(&self) -> &str {
673 self.topic.as_str()
674 }
675
676 pub fn identifier(&self) -> Option<&str> {
678 self.identifier.as_deref()
679 }
680
681 pub fn instrument_id(&self) -> Option<InstrumentId> {
689 let metadata = self.metadata.as_ref().expect("metadata was `None`");
690 let instrument_id = metadata.get_str("instrument_id")?;
691 Some(
692 InstrumentId::from_str(instrument_id)
693 .expect("Invalid `InstrumentId` for 'instrument_id'"),
694 )
695 }
696
697 pub fn venue(&self) -> Option<Venue> {
705 let metadata = self.metadata.as_ref().expect("metadata was `None`");
706 let venue_str = metadata.get_str("venue")?;
707 Some(Venue::from(venue_str))
708 }
709
710 pub fn start(&self) -> Option<UnixNanos> {
718 let metadata = self.metadata.as_ref()?;
719 let start_str = metadata.get_str("start")?;
720 Some(UnixNanos::from_str(start_str).expect("Invalid `UnixNanos` for 'start'"))
721 }
722
723 pub fn end(&self) -> Option<UnixNanos> {
731 let metadata = self.metadata.as_ref()?;
732 let end_str = metadata.get_str("end")?;
733 Some(UnixNanos::from_str(end_str).expect("Invalid `UnixNanos` for 'end'"))
734 }
735
736 pub fn limit(&self) -> Option<usize> {
744 let metadata = self.metadata.as_ref()?;
745 metadata.get_usize("limit").or_else(|| {
746 metadata
747 .get_str("limit")
748 .map(|s| s.parse::<usize>().expect("Invalid `usize` for 'limit'"))
749 })
750 }
751}
752
753impl PartialEq for DataType {
754 fn eq(&self, other: &Self) -> bool {
755 self.topic == other.topic
756 }
757}
758
759impl Eq for DataType {}
760
761impl PartialOrd for DataType {
762 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
763 Some(self.cmp(other))
764 }
765}
766
767impl Ord for DataType {
768 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
769 self.topic.cmp(&other.topic)
770 }
771}
772
773impl Hash for DataType {
774 fn hash<H: Hasher>(&self, state: &mut H) {
775 self.hash.hash(state);
776 }
777}
778
779impl Display for DataType {
780 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
781 write!(f, "{}", self.topic)
782 }
783}
784
785impl Debug for DataType {
786 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
787 write!(
788 f,
789 "DataType(type_name={}, metadata={:?}, identifier={:?})",
790 self.type_name, self.metadata, self.identifier
791 )
792 }
793}
794
795#[cfg(test)]
796mod tests {
797 use std::hash::DefaultHasher;
798
799 use rstest::*;
800 use serde_json::json;
801
802 use super::*;
803
804 fn params_from_json(value: serde_json::Value) -> Params {
805 serde_json::from_value(value).expect("valid Params JSON")
806 }
807
808 #[rstest]
809 fn test_data_type_creation_with_metadata() {
810 let metadata = Some(params_from_json(
811 json!({"key1": "value1", "key2": "value2"}),
812 ));
813 let data_type = DataType::new("ExampleType", metadata.clone(), None);
814
815 assert_eq!(data_type.type_name(), "ExampleType");
816 assert_eq!(data_type.topic(), "ExampleType.key1=value1.key2=value2");
817 assert_eq!(data_type.metadata(), metadata.as_ref());
818 }
819
820 #[rstest]
821 fn test_data_type_creation_without_metadata() {
822 let data_type = DataType::new("ExampleType", None, None);
823
824 assert_eq!(data_type.type_name(), "ExampleType");
825 assert_eq!(data_type.topic(), "ExampleType");
826 assert_eq!(data_type.metadata(), None);
827 }
828
829 #[rstest]
830 fn test_data_type_equality() {
831 let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
832 let metadata2 = Some(params_from_json(json!({"key1": "value1"})));
833
834 let data_type1 = DataType::new("ExampleType", metadata1, None);
835 let data_type2 = DataType::new("ExampleType", metadata2, None);
836
837 assert_eq!(data_type1, data_type2);
838 }
839
840 #[rstest]
841 fn test_data_type_inequality() {
842 let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
843 let metadata2 = Some(params_from_json(json!({"key2": "value2"})));
844
845 let data_type1 = DataType::new("ExampleType", metadata1, None);
846 let data_type2 = DataType::new("ExampleType", metadata2, None);
847
848 assert_ne!(data_type1, data_type2);
849 }
850
851 #[rstest]
852 fn test_data_type_ordering() {
853 let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
854 let metadata2 = Some(params_from_json(json!({"key2": "value2"})));
855
856 let data_type1 = DataType::new("ExampleTypeA", metadata1, None);
857 let data_type2 = DataType::new("ExampleTypeB", metadata2, None);
858
859 assert!(data_type1 < data_type2);
860 }
861
862 #[rstest]
863 fn test_data_type_hash() {
864 let metadata = Some(params_from_json(json!({"key1": "value1"})));
865
866 let data_type1 = DataType::new("ExampleType", metadata.clone(), None);
867 let data_type2 = DataType::new("ExampleType", metadata, None);
868
869 let mut hasher1 = DefaultHasher::new();
870 data_type1.hash(&mut hasher1);
871 let hash1 = hasher1.finish();
872
873 let mut hasher2 = DefaultHasher::new();
874 data_type2.hash(&mut hasher2);
875 let hash2 = hasher2.finish();
876
877 assert_eq!(hash1, hash2);
878 }
879
880 #[rstest]
881 fn test_data_type_display() {
882 let metadata = Some(params_from_json(json!({"key1": "value1"})));
883 let data_type = DataType::new("ExampleType", metadata, None);
884
885 assert_eq!(format!("{data_type}"), "ExampleType.key1=value1");
886 }
887
888 #[rstest]
889 fn test_data_type_debug() {
890 let metadata = Some(params_from_json(json!({"key1": "value1"})));
891 let data_type = DataType::new("ExampleType", metadata.clone(), None);
892
893 assert_eq!(
894 format!("{data_type:?}"),
895 format!("DataType(type_name=ExampleType, metadata={metadata:?}, identifier=None)")
896 );
897 }
898
899 #[rstest]
900 fn test_parse_instrument_id_from_metadata() {
901 let instrument_id_str = "MSFT.XNAS";
902 let metadata = Some(params_from_json(
903 json!({"instrument_id": instrument_id_str}),
904 ));
905 let data_type = DataType::new("InstrumentAny", metadata, None);
906
907 assert_eq!(
908 data_type.instrument_id().unwrap(),
909 InstrumentId::from_str(instrument_id_str).unwrap()
910 );
911 }
912
913 #[rstest]
914 fn test_parse_venue_from_metadata() {
915 let venue_str = "BINANCE";
916 let metadata = Some(params_from_json(json!({"venue": venue_str})));
917 let data_type = DataType::new(stringify!(InstrumentAny), metadata, None);
918
919 assert_eq!(data_type.venue().unwrap(), Venue::new(venue_str));
920 }
921
922 #[rstest]
923 fn test_parse_start_from_metadata() {
924 let start_ns = 1600054595844758000;
925 let metadata = Some(params_from_json(json!({"start": start_ns.to_string()})));
926 let data_type = DataType::new(stringify!(TradeTick), metadata, None);
927
928 assert_eq!(data_type.start().unwrap(), UnixNanos::from(start_ns),);
929 }
930
931 #[rstest]
932 fn test_parse_end_from_metadata() {
933 let end_ns = 1720954595844758000;
934 let metadata = Some(params_from_json(json!({"end": end_ns.to_string()})));
935 let data_type = DataType::new(stringify!(TradeTick), metadata, None);
936
937 assert_eq!(data_type.end().unwrap(), UnixNanos::from(end_ns),);
938 }
939
940 #[rstest]
941 fn test_parse_limit_from_metadata() {
942 let limit = 1000;
943 let metadata = Some(params_from_json(json!({"limit": limit})));
944 let data_type = DataType::new(stringify!(TradeTick), metadata, None);
945
946 assert_eq!(data_type.limit().unwrap(), limit);
947 }
948
949 #[rstest]
950 fn test_data_type_persistence_json_with_identifier() {
951 let data_type = DataType::new("MyCustomType", None, Some("venue//symbol".to_string()));
952 let json = data_type.to_persistence_json().unwrap();
953 assert!(!json.contains("topic"));
954 assert!(json.contains("\"identifier\":\"venue//symbol\""));
955 let restored = DataType::from_persistence_json(&json).unwrap();
956 assert_eq!(restored.type_name(), "MyCustomType");
957 assert_eq!(restored.identifier(), Some("venue//symbol"));
958 assert_eq!(restored.topic(), "MyCustomType");
959 }
960
961 #[rstest]
962 fn test_data_type_identifier_getter() {
963 let data_type = DataType::new("T", None, Some("id".to_string()));
964 assert_eq!(data_type.identifier(), Some("id"));
965 let data_type_no_id = DataType::new("T", None, None);
966 assert_eq!(data_type_no_id.identifier(), None);
967 }
968}