1pub mod bar;
19pub mod bet;
20pub mod black_scholes;
21pub mod close;
22pub mod custom;
23pub mod delta;
24pub mod deltas;
25pub mod depth;
26pub mod forward;
27pub mod funding;
28pub mod greeks;
29pub mod option_chain;
30pub mod order;
31pub mod prices;
32pub mod quote;
33pub mod registry;
34pub mod status;
35pub mod trade;
36
37#[cfg(any(test, feature = "stubs"))]
38pub mod stubs;
39
40use std::{
41 fmt::{Debug, Display},
42 hash::{Hash, Hasher},
43 str::FromStr,
44};
45
46use nautilus_core::{Params, UnixNanos};
47use serde::{Deserialize, Serialize};
48use serde_json::Value as JsonValue;
49
50#[cfg(feature = "defi")]
51use crate::defi::DefiData;
52#[rustfmt::skip] pub use bar::{Bar, BarSpecification, BarType};
55pub use black_scholes::Greeks;
56pub use close::InstrumentClose;
57#[cfg(feature = "python")]
58pub use custom::PythonCustomDataWrapper;
59pub use custom::{
60 CustomData, CustomDataTrait, ensure_custom_data_json_registered, register_custom_data_json,
61};
62#[cfg(feature = "python")]
63pub use custom::{
64 get_python_data_class, reconstruct_python_custom_data, register_python_data_class,
65};
66pub use delta::OrderBookDelta;
67pub use deltas::{OrderBookDeltas, OrderBookDeltas_API};
68pub use depth::{DEPTH10_LEN, OrderBookDepth10};
69pub use forward::ForwardPrice;
70pub use funding::FundingRateUpdate;
71pub use greeks::{
72 BlackScholesGreeksResult, GreeksData, HasGreeks, OptionGreekValues, PortfolioGreeks,
73 YieldCurveData, black_scholes_greeks, imply_vol_and_greeks, refine_vol_and_greeks,
74};
75pub use option_chain::{OptionChainSlice, OptionGreeks, OptionStrikeData, StrikeRange};
76pub use order::{BookOrder, NULL_ORDER};
77pub use prices::{IndexPriceUpdate, MarkPriceUpdate};
78pub use quote::QuoteTick;
79#[cfg(feature = "arrow")]
80pub use registry::{
81 ArrowDecoder, ArrowEncoder, decode_custom_from_arrow, encode_custom_to_arrow,
82 ensure_arrow_registered, get_arrow_schema, register_arrow,
83};
84#[cfg(feature = "python")]
85pub use registry::{
86 PyExtractor, ensure_py_extractor_registered, ensure_rust_extractor_factory_registered,
87 ensure_rust_extractor_registered, get_rust_extractor, register_py_extractor,
88 register_rust_extractor, register_rust_extractor_factory, try_extract_from_py,
89};
90pub use registry::{
91 deserialize_custom_from_json, ensure_json_deserializer_registered, register_json_deserializer,
92};
93pub use status::InstrumentStatus;
94pub use trade::TradeTick;
95
96use crate::identifiers::{InstrumentId, Venue};
97#[derive(Debug)]
102pub enum Data {
103 Delta(OrderBookDelta),
104 Deltas(OrderBookDeltas_API),
105 Depth10(Box<OrderBookDepth10>), Quote(QuoteTick),
107 Trade(TradeTick),
108 Bar(Bar),
109 MarkPriceUpdate(MarkPriceUpdate), IndexPriceUpdate(IndexPriceUpdate), FundingRateUpdate(FundingRateUpdate),
112 OptionGreeks(OptionGreeks),
113 InstrumentStatus(InstrumentStatus),
114 InstrumentClose(InstrumentClose),
115 Custom(CustomData),
116 #[cfg(feature = "defi")]
117 Defi(Box<DefiData>), }
119
120#[cfg(feature = "ffi")]
125#[repr(C)]
126#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
127#[allow(non_camel_case_types)]
128pub enum DataFFI {
129 Delta(OrderBookDelta),
130 Deltas(OrderBookDeltas_API),
131 Depth10(Box<OrderBookDepth10>),
132 Quote(QuoteTick),
133 Trade(TradeTick),
134 Bar(Bar),
135 MarkPriceUpdate(MarkPriceUpdate),
136 IndexPriceUpdate(IndexPriceUpdate),
137 InstrumentClose(InstrumentClose),
138}
139
140#[cfg(feature = "ffi")]
141impl TryFrom<Data> for DataFFI {
142 type Error = anyhow::Error;
143
144 fn try_from(value: Data) -> Result<Self, Self::Error> {
145 match value {
146 Data::Delta(x) => Ok(Self::Delta(x)),
147 Data::Deltas(x) => Ok(Self::Deltas(x)),
148 Data::Depth10(x) => Ok(Self::Depth10(x)),
149 Data::Quote(x) => Ok(Self::Quote(x)),
150 Data::Trade(x) => Ok(Self::Trade(x)),
151 Data::Bar(x) => Ok(Self::Bar(x)),
152 Data::MarkPriceUpdate(x) => Ok(Self::MarkPriceUpdate(x)),
153 Data::IndexPriceUpdate(x) => Ok(Self::IndexPriceUpdate(x)),
154 Data::FundingRateUpdate(_) => {
155 anyhow::bail!("Cannot convert Data::FundingRateUpdate to DataFFI")
156 }
157 Data::OptionGreeks(_) => {
158 anyhow::bail!("Cannot convert Data::OptionGreeks to DataFFI")
159 }
160 Data::InstrumentStatus(_) => {
161 anyhow::bail!("Cannot convert Data::InstrumentStatus to DataFFI")
162 }
163 Data::InstrumentClose(x) => Ok(Self::InstrumentClose(x)),
164 Data::Custom(_) => anyhow::bail!("Cannot convert Data::Custom to DataFFI"),
165 #[cfg(feature = "defi")]
166 Data::Defi(_) => anyhow::bail!("Cannot convert Data::Defi to DataFFI"),
167 }
168 }
169}
170
171#[cfg(feature = "ffi")]
172impl From<DataFFI> for Data {
173 fn from(value: DataFFI) -> Self {
174 match value {
175 DataFFI::Delta(x) => Self::Delta(x),
176 DataFFI::Deltas(x) => Self::Deltas(x),
177 DataFFI::Depth10(x) => Self::Depth10(x),
178 DataFFI::Quote(x) => Self::Quote(x),
179 DataFFI::Trade(x) => Self::Trade(x),
180 DataFFI::Bar(x) => Self::Bar(x),
181 DataFFI::MarkPriceUpdate(x) => Self::MarkPriceUpdate(x),
182 DataFFI::IndexPriceUpdate(x) => Self::IndexPriceUpdate(x),
183 DataFFI::InstrumentClose(x) => Self::InstrumentClose(x),
184 }
185 }
186}
187
188impl<'de> Deserialize<'de> for Data {
189 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
190 where
191 D: serde::Deserializer<'de>,
192 {
193 use serde::de::Error;
194 let value = serde_json::Value::deserialize(deserializer)?;
195 let type_name = value
196 .get("type")
197 .and_then(|v| v.as_str())
198 .ok_or_else(|| D::Error::custom("Missing 'type' field in Data"))?
199 .to_string();
200
201 match type_name.as_str() {
202 "OrderBookDelta" => Ok(Self::Delta(
203 serde_json::from_value(value).map_err(D::Error::custom)?,
204 )),
205 "OrderBookDeltas" => Ok(Self::Deltas(
206 serde_json::from_value(value).map_err(D::Error::custom)?,
207 )),
208 "OrderBookDepth10" => Ok(Self::Depth10(
209 serde_json::from_value(value).map_err(D::Error::custom)?,
210 )),
211 "QuoteTick" => Ok(Self::Quote(
212 serde_json::from_value(value).map_err(D::Error::custom)?,
213 )),
214 "TradeTick" => Ok(Self::Trade(
215 serde_json::from_value(value).map_err(D::Error::custom)?,
216 )),
217 "Bar" => Ok(Self::Bar(
218 serde_json::from_value(value).map_err(D::Error::custom)?,
219 )),
220 "MarkPriceUpdate" => Ok(Self::MarkPriceUpdate(
221 serde_json::from_value(value).map_err(D::Error::custom)?,
222 )),
223 "IndexPriceUpdate" => Ok(Self::IndexPriceUpdate(
224 serde_json::from_value(value).map_err(D::Error::custom)?,
225 )),
226 "FundingRateUpdate" => Ok(Self::FundingRateUpdate(
227 serde_json::from_value(value).map_err(D::Error::custom)?,
228 )),
229 "OptionGreeks" => Ok(Self::OptionGreeks(
230 serde_json::from_value(value).map_err(D::Error::custom)?,
231 )),
232 "InstrumentStatus" => Ok(Self::InstrumentStatus(
233 serde_json::from_value(value).map_err(D::Error::custom)?,
234 )),
235 "InstrumentClose" => Ok(Self::InstrumentClose(
236 serde_json::from_value(value).map_err(D::Error::custom)?,
237 )),
238 _ => {
239 if let Some(data) =
240 deserialize_custom_from_json(&type_name, &value).map_err(D::Error::custom)?
241 {
242 Ok(data)
243 } else {
244 Err(D::Error::custom(format!("Unknown Data type: {type_name}")))
245 }
246 }
247 }
248 }
249}
250
251impl Clone for Data {
252 fn clone(&self) -> Self {
253 match self {
254 Self::Delta(x) => Self::Delta(*x),
255 Self::Deltas(x) => Self::Deltas(x.clone()),
256 Self::Depth10(x) => Self::Depth10(x.clone()),
257 Self::Quote(x) => Self::Quote(*x),
258 Self::Trade(x) => Self::Trade(*x),
259 Self::Bar(x) => Self::Bar(*x),
260 Self::MarkPriceUpdate(x) => Self::MarkPriceUpdate(*x),
261 Self::IndexPriceUpdate(x) => Self::IndexPriceUpdate(*x),
262 Self::FundingRateUpdate(x) => Self::FundingRateUpdate(*x),
263 Self::OptionGreeks(x) => Self::OptionGreeks(*x),
264 Self::InstrumentStatus(x) => Self::InstrumentStatus(*x),
265 Self::InstrumentClose(x) => Self::InstrumentClose(*x),
266 Self::Custom(x) => Self::Custom(x.clone()),
267 #[cfg(feature = "defi")]
268 Self::Defi(x) => Self::Defi(x.clone()),
269 }
270 }
271}
272
273impl PartialEq for Data {
274 fn eq(&self, other: &Self) -> bool {
275 match (self, other) {
276 (Self::Delta(a), Self::Delta(b)) => a == b,
277 (Self::Deltas(a), Self::Deltas(b)) => a == b,
278 (Self::Depth10(a), Self::Depth10(b)) => a == b,
279 (Self::Quote(a), Self::Quote(b)) => a == b,
280 (Self::Trade(a), Self::Trade(b)) => a == b,
281 (Self::Bar(a), Self::Bar(b)) => a == b,
282 (Self::MarkPriceUpdate(a), Self::MarkPriceUpdate(b)) => a == b,
283 (Self::IndexPriceUpdate(a), Self::IndexPriceUpdate(b)) => a == b,
284 (Self::FundingRateUpdate(a), Self::FundingRateUpdate(b)) => a == b,
285 (Self::OptionGreeks(a), Self::OptionGreeks(b)) => a == b,
286 (Self::InstrumentStatus(a), Self::InstrumentStatus(b)) => a == b,
287 (Self::InstrumentClose(a), Self::InstrumentClose(b)) => a == b,
288 (Self::Custom(a), Self::Custom(b)) => a == b,
289 #[cfg(feature = "defi")]
290 (Self::Defi(a), Self::Defi(b)) => a == b,
291 _ => false,
292 }
293 }
294}
295
296impl Serialize for Data {
297 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
298 where
299 S: serde::Serializer,
300 {
301 match self {
302 Self::Delta(x) => x.serialize(serializer),
303 Self::Deltas(x) => x.serialize(serializer),
304 Self::Depth10(x) => x.serialize(serializer),
305 Self::Quote(x) => x.serialize(serializer),
306 Self::Trade(x) => x.serialize(serializer),
307 Self::Bar(x) => x.serialize(serializer),
308 Self::MarkPriceUpdate(x) => x.serialize(serializer),
309 Self::IndexPriceUpdate(x) => x.serialize(serializer),
310 Self::FundingRateUpdate(x) => x.serialize(serializer),
311 Self::OptionGreeks(x) => x.serialize(serializer),
312 Self::InstrumentStatus(x) => x.serialize(serializer),
313 Self::InstrumentClose(x) => x.serialize(serializer),
314 Self::Custom(x) => x.serialize(serializer),
315 #[cfg(feature = "defi")]
316 Self::Defi(_) => Err(serde::ser::Error::custom(
317 "Data::Defi serialization is not supported",
318 )),
319 }
320 }
321}
322
323macro_rules! impl_try_from_data {
324 ($variant:ident, $type:ty) => {
325 impl TryFrom<Data> for $type {
326 type Error = ();
327
328 fn try_from(value: Data) -> Result<Self, Self::Error> {
329 match value {
330 Data::$variant(x) => Ok(x),
331 _ => Err(()),
332 }
333 }
334 }
335 };
336}
337
338impl TryFrom<Data> for OrderBookDepth10 {
339 type Error = ();
340
341 fn try_from(value: Data) -> Result<Self, Self::Error> {
342 match value {
343 Data::Depth10(x) => Ok(*x),
344 _ => Err(()),
345 }
346 }
347}
348
349impl_try_from_data!(Quote, QuoteTick);
350impl_try_from_data!(Delta, OrderBookDelta);
351impl_try_from_data!(Deltas, OrderBookDeltas_API);
352impl_try_from_data!(Trade, TradeTick);
353impl_try_from_data!(Bar, Bar);
354impl_try_from_data!(MarkPriceUpdate, MarkPriceUpdate);
355impl_try_from_data!(IndexPriceUpdate, IndexPriceUpdate);
356impl_try_from_data!(FundingRateUpdate, FundingRateUpdate);
357impl_try_from_data!(OptionGreeks, OptionGreeks);
358impl_try_from_data!(InstrumentStatus, InstrumentStatus);
359impl_try_from_data!(InstrumentClose, InstrumentClose);
360
361#[must_use]
366pub fn to_variant<T: TryFrom<Data>>(data: Vec<Data>) -> Vec<T> {
367 data.into_iter()
368 .filter_map(|d| T::try_from(d).ok())
369 .collect()
370}
371
372impl Data {
373 #[must_use]
375 pub fn instrument_id(&self) -> InstrumentId {
376 match self {
377 Self::Delta(delta) => delta.instrument_id,
378 Self::Deltas(deltas) => deltas.instrument_id,
379 Self::Depth10(depth) => depth.instrument_id,
380 Self::Quote(quote) => quote.instrument_id,
381 Self::Trade(trade) => trade.instrument_id,
382 Self::Bar(bar) => bar.bar_type.instrument_id(),
383 Self::MarkPriceUpdate(mark_price) => mark_price.instrument_id,
384 Self::IndexPriceUpdate(index_price) => index_price.instrument_id,
385 Self::FundingRateUpdate(funding_rate) => funding_rate.instrument_id,
386 Self::OptionGreeks(greeks) => greeks.instrument_id,
387 Self::InstrumentStatus(status) => status.instrument_id,
388 Self::InstrumentClose(close) => close.instrument_id,
389 Self::Custom(custom) => custom
390 .data_type
391 .identifier()
392 .and_then(|s| InstrumentId::from_str(s).ok())
393 .or_else(|| {
394 custom
395 .data_type
396 .metadata()
397 .and_then(|m| m.get_str("instrument_id"))
398 .and_then(|s| InstrumentId::from_str(s).ok())
399 })
400 .unwrap_or_else(|| InstrumentId::from("NULL.NULL")),
401 #[cfg(feature = "defi")]
402 Self::Defi(defi) => defi.instrument_id(),
403 }
404 }
405
406 #[must_use]
408 pub fn is_order_book_data(&self) -> bool {
409 matches!(self, Self::Delta(_) | Self::Deltas(_) | Self::Depth10(_))
410 }
411}
412
413pub trait HasTsInit {
419 fn ts_init(&self) -> UnixNanos;
421}
422
423pub trait CatalogPathPrefix {
425 fn path_prefix() -> &'static str;
427}
428
429#[macro_export]
439macro_rules! impl_catalog_path_prefix {
440 ($type:ty, $path:expr) => {
441 impl $crate::data::CatalogPathPrefix for $type {
442 fn path_prefix() -> &'static str {
443 $path
444 }
445 }
446 };
447}
448
449impl_catalog_path_prefix!(QuoteTick, "quotes");
451impl_catalog_path_prefix!(TradeTick, "trades");
452impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
453impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
454impl_catalog_path_prefix!(Bar, "bars");
455impl_catalog_path_prefix!(IndexPriceUpdate, "index_prices");
456impl_catalog_path_prefix!(MarkPriceUpdate, "mark_prices");
457impl_catalog_path_prefix!(FundingRateUpdate, "funding_rate_update");
458impl_catalog_path_prefix!(OptionGreeks, "option_greeks");
459impl_catalog_path_prefix!(InstrumentStatus, "instrument_status");
460impl_catalog_path_prefix!(InstrumentClose, "instrument_closes");
461
462use crate::instruments::InstrumentAny;
463impl_catalog_path_prefix!(InstrumentAny, "instruments");
464
465impl HasTsInit for Data {
466 fn ts_init(&self) -> UnixNanos {
467 match self {
468 Self::Delta(d) => d.ts_init,
469 Self::Deltas(d) => d.ts_init,
470 Self::Depth10(d) => d.ts_init,
471 Self::Quote(q) => q.ts_init,
472 Self::Trade(t) => t.ts_init,
473 Self::Bar(b) => b.ts_init,
474 Self::MarkPriceUpdate(p) => p.ts_init,
475 Self::IndexPriceUpdate(p) => p.ts_init,
476 Self::FundingRateUpdate(f) => f.ts_init,
477 Self::OptionGreeks(g) => g.ts_init,
478 Self::InstrumentStatus(s) => s.ts_init,
479 Self::InstrumentClose(c) => c.ts_init,
480 Self::Custom(c) => c.data.ts_init(),
481 #[cfg(feature = "defi")]
482 Self::Defi(d) => d.ts_init(),
483 }
484 }
485}
486
487pub fn is_monotonically_increasing_by_init<T: HasTsInit>(data: &[T]) -> bool {
491 data.array_windows()
492 .all(|[a, b]| a.ts_init() <= b.ts_init())
493}
494
495impl From<OrderBookDelta> for Data {
496 fn from(value: OrderBookDelta) -> Self {
497 Self::Delta(value)
498 }
499}
500
501impl From<OrderBookDeltas_API> for Data {
502 fn from(value: OrderBookDeltas_API) -> Self {
503 Self::Deltas(value)
504 }
505}
506
507impl From<OrderBookDepth10> for Data {
508 fn from(value: OrderBookDepth10) -> Self {
509 Self::Depth10(Box::new(value))
510 }
511}
512
513impl From<QuoteTick> for Data {
514 fn from(value: QuoteTick) -> Self {
515 Self::Quote(value)
516 }
517}
518
519impl From<TradeTick> for Data {
520 fn from(value: TradeTick) -> Self {
521 Self::Trade(value)
522 }
523}
524
525impl From<Bar> for Data {
526 fn from(value: Bar) -> Self {
527 Self::Bar(value)
528 }
529}
530
531impl From<MarkPriceUpdate> for Data {
532 fn from(value: MarkPriceUpdate) -> Self {
533 Self::MarkPriceUpdate(value)
534 }
535}
536
537impl From<IndexPriceUpdate> for Data {
538 fn from(value: IndexPriceUpdate) -> Self {
539 Self::IndexPriceUpdate(value)
540 }
541}
542
543impl From<FundingRateUpdate> for Data {
544 fn from(value: FundingRateUpdate) -> Self {
545 Self::FundingRateUpdate(value)
546 }
547}
548
549impl From<OptionGreeks> for Data {
550 fn from(value: OptionGreeks) -> Self {
551 Self::OptionGreeks(value)
552 }
553}
554
555impl From<InstrumentStatus> for Data {
556 fn from(value: InstrumentStatus) -> Self {
557 Self::InstrumentStatus(value)
558 }
559}
560
561impl From<InstrumentClose> for Data {
562 fn from(value: InstrumentClose) -> Self {
563 Self::InstrumentClose(value)
564 }
565}
566
567#[cfg(feature = "defi")]
568impl From<DefiData> for Data {
569 fn from(value: DefiData) -> Self {
570 Self::Defi(Box::new(value))
571 }
572}
573
574fn value_to_topic_string(v: &JsonValue) -> String {
576 if let Some(s) = v.as_str() {
577 return s.to_string();
578 }
579
580 if let Some(n) = v.as_u64() {
581 return n.to_string();
582 }
583
584 if let Some(n) = v.as_i64() {
585 return n.to_string();
586 }
587
588 if let Some(b) = v.as_bool() {
589 return b.to_string();
590 }
591
592 if let Some(f) = v.as_f64() {
593 return f.to_string();
594 }
595
596 if v.is_null() {
597 return "null".to_string();
598 }
599 serde_json::to_string(v).unwrap_or_default()
600}
601
602fn params_to_topic_suffix(params: &Params) -> String {
604 let mut entries = params.iter().collect::<Vec<_>>();
605 entries.sort_by_key(|(key, _)| *key);
606
607 entries
608 .into_iter()
609 .map(|(k, v)| format!("{k}={}", value_to_topic_string(v)))
610 .collect::<Vec<_>>()
611 .join(".")
612}
613
614#[derive(Clone, Serialize, Deserialize)]
616#[cfg_attr(
617 feature = "python",
618 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.model", from_py_object)
619)]
620#[cfg_attr(
621 feature = "python",
622 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.model")
623)]
624pub struct DataType {
625 type_name: String,
626 metadata: Option<Params>,
627 topic: String,
628 hash: u64,
629 identifier: Option<String>,
630}
631
632impl DataType {
633 #[must_use]
635 pub fn new(type_name: &str, metadata: Option<Params>, identifier: Option<String>) -> Self {
636 let topic = if let Some(ref meta) = metadata {
638 if meta.is_empty() {
639 type_name.to_string()
640 } else {
641 format!("{type_name}.{}", params_to_topic_suffix(meta))
642 }
643 } else {
644 type_name.to_string()
645 };
646
647 let mut hasher = std::collections::hash_map::DefaultHasher::new();
649 topic.hash(&mut hasher);
650
651 Self {
652 type_name: type_name.to_owned(),
653 metadata,
654 topic,
655 hash: hasher.finish(),
656 identifier,
657 }
658 }
659
660 #[must_use]
664 pub fn from_parts(type_name: &str, topic: &str, metadata: Option<Params>) -> Self {
665 let mut hasher = std::collections::hash_map::DefaultHasher::new();
666 topic.hash(&mut hasher);
667 Self {
668 type_name: type_name.to_owned(),
669 metadata,
670 topic: topic.to_owned(),
671 hash: hasher.finish(),
672 identifier: None,
673 }
674 }
675
676 pub fn to_persistence_json(&self) -> Result<String, serde_json::Error> {
682 let mut map = serde_json::Map::new();
683 map.insert(
684 "type_name".to_string(),
685 serde_json::Value::String(self.type_name.clone()),
686 );
687 map.insert(
688 "metadata".to_string(),
689 self.metadata.as_ref().map_or(serde_json::Value::Null, |m| {
690 serde_json::to_value(m).unwrap_or(serde_json::Value::Null)
691 }),
692 );
693
694 if let Some(ref id) = self.identifier {
695 map.insert(
696 "identifier".to_string(),
697 serde_json::Value::String(id.clone()),
698 );
699 }
700 serde_json::to_string(&serde_json::Value::Object(map))
701 }
702
703 pub fn from_persistence_json(s: &str) -> Result<Self, anyhow::Error> {
710 let value: serde_json::Value =
711 serde_json::from_str(s).map_err(|e| anyhow::anyhow!("Invalid data_type JSON: {e}"))?;
712 let obj = value
713 .as_object()
714 .ok_or_else(|| anyhow::anyhow!("data_type must be a JSON object"))?;
715 let type_name = obj
716 .get("type_name")
717 .and_then(|v| v.as_str())
718 .ok_or_else(|| anyhow::anyhow!("data_type must have type_name"))?
719 .to_string();
720 let metadata = obj.get("metadata").and_then(|m| {
721 if m.is_null() {
722 None
723 } else {
724 let p: Params = serde_json::from_value(m.clone()).ok()?;
725 if p.is_empty() { None } else { Some(p) }
726 }
727 });
728 let identifier = obj
729 .get("identifier")
730 .and_then(|v| v.as_str())
731 .map(String::from);
732 Ok(Self::new(&type_name, metadata, identifier))
733 }
734
735 #[must_use]
737 pub fn type_name(&self) -> &str {
738 self.type_name.as_str()
739 }
740
741 #[must_use]
743 pub fn metadata(&self) -> Option<&Params> {
744 self.metadata.as_ref()
745 }
746
747 #[must_use]
749 pub fn metadata_str(&self) -> String {
750 self.metadata.as_ref().map_or_else(
751 || "null".to_string(),
752 |metadata| {
753 let mut entries = metadata.iter().collect::<Vec<_>>();
754 entries.sort_by_key(|(key, _)| *key);
755
756 let mut metadata_map = serde_json::Map::new();
757 for (key, value) in entries {
758 metadata_map.insert(key.clone(), value.clone());
759 }
760
761 serde_json::to_string(&metadata_map).unwrap_or_default()
762 },
763 )
764 }
765
766 #[must_use]
768 pub fn metadata_string_map(&self) -> Option<std::collections::HashMap<String, String>> {
769 self.metadata.as_ref().map(|p| {
770 p.iter()
771 .map(|(k, v)| (k.clone(), value_to_topic_string(v)))
772 .collect()
773 })
774 }
775
776 #[must_use]
778 pub fn precomputed_hash(&self) -> u64 {
779 self.hash
780 }
781
782 #[must_use]
784 pub fn topic(&self) -> &str {
785 self.topic.as_str()
786 }
787
788 #[must_use]
790 pub fn identifier(&self) -> Option<&str> {
791 self.identifier.as_deref()
792 }
793
794 #[must_use]
801 pub fn instrument_id(&self) -> Option<InstrumentId> {
802 let metadata = self.metadata.as_ref()?;
803 let instrument_id = metadata.get_str("instrument_id")?;
804 Some(
805 InstrumentId::from_str(instrument_id)
806 .expect("Invalid `InstrumentId` for 'instrument_id'"),
807 )
808 }
809
810 #[must_use]
817 pub fn venue(&self) -> Option<Venue> {
818 let metadata = self.metadata.as_ref()?;
819 let venue_str = metadata.get_str("venue")?;
820 Some(Venue::from(venue_str))
821 }
822
823 #[must_use]
830 pub fn start(&self) -> Option<UnixNanos> {
831 let metadata = self.metadata.as_ref()?;
832 let start_str = metadata.get_str("start")?;
833 Some(UnixNanos::from_str(start_str).expect("Invalid `UnixNanos` for 'start'"))
834 }
835
836 #[must_use]
843 pub fn end(&self) -> Option<UnixNanos> {
844 let metadata = self.metadata.as_ref()?;
845 let end_str = metadata.get_str("end")?;
846 Some(UnixNanos::from_str(end_str).expect("Invalid `UnixNanos` for 'end'"))
847 }
848
849 #[must_use]
856 pub fn limit(&self) -> Option<usize> {
857 let metadata = self.metadata.as_ref()?;
858 metadata.get_usize("limit").or_else(|| {
859 metadata
860 .get_str("limit")
861 .map(|s| s.parse::<usize>().expect("Invalid `usize` for 'limit'"))
862 })
863 }
864}
865
866impl PartialEq for DataType {
867 fn eq(&self, other: &Self) -> bool {
868 self.topic == other.topic
869 }
870}
871
872impl Eq for DataType {}
873
874impl PartialOrd for DataType {
875 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
876 Some(self.cmp(other))
877 }
878}
879
880impl Ord for DataType {
881 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
882 self.topic.cmp(&other.topic)
883 }
884}
885
886impl Hash for DataType {
887 fn hash<H: Hasher>(&self, state: &mut H) {
888 self.hash.hash(state);
889 }
890}
891
892impl Display for DataType {
893 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
894 write!(f, "{}", self.topic)
895 }
896}
897
898impl Debug for DataType {
899 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
900 write!(
901 f,
902 "DataType(type_name={}, metadata={:?}, identifier={:?})",
903 self.type_name, self.metadata, self.identifier
904 )
905 }
906}
907
908#[cfg(test)]
909mod tests {
910 use std::hash::DefaultHasher;
911
912 use rstest::*;
913 use serde_json::json;
914
915 use super::*;
916
917 fn params_from_json(value: serde_json::Value) -> Params {
918 serde_json::from_value(value).expect("valid Params JSON")
919 }
920
921 #[cfg(feature = "ffi")]
922 #[rstest]
923 fn test_funding_rate_update_does_not_convert_to_data_ffi() {
924 let funding_rate = FundingRateUpdate::new(
925 InstrumentId::from("BTCUSDT-PERP.BINANCE"),
926 "0.0001".parse().unwrap(),
927 Some(480),
928 Some(UnixNanos::from(1_000_000_000)),
929 UnixNanos::from(1),
930 UnixNanos::from(2),
931 );
932
933 let err = DataFFI::try_from(Data::FundingRateUpdate(funding_rate)).unwrap_err();
934
935 assert_eq!(
936 err.to_string(),
937 "Cannot convert Data::FundingRateUpdate to DataFFI"
938 );
939 }
940
941 #[rstest]
942 fn test_data_type_creation_with_metadata() {
943 let metadata = Some(params_from_json(
944 json!({"key1": "value1", "key2": "value2"}),
945 ));
946 let data_type = DataType::new("ExampleType", metadata.clone(), None);
947
948 assert_eq!(data_type.type_name(), "ExampleType");
949 assert_eq!(data_type.topic(), "ExampleType.key1=value1.key2=value2");
950 assert_eq!(data_type.metadata(), metadata.as_ref());
951 }
952
953 #[rstest]
954 fn test_data_type_topic_identity_uses_canonical_metadata_order() {
955 let mut metadata1 = Params::new();
956 metadata1.insert("b".to_string(), json!(2));
957 metadata1.insert("a".to_string(), json!(1));
958 let mut metadata2 = Params::new();
959 metadata2.insert("a".to_string(), json!(1));
960 metadata2.insert("b".to_string(), json!(2));
961
962 let data_type1 = DataType::new("ExampleType", Some(metadata1), None);
963 let data_type2 = DataType::new("ExampleType", Some(metadata2), None);
964 let mut hasher1 = DefaultHasher::new();
965 data_type1.hash(&mut hasher1);
966 let hash1 = hasher1.finish();
967 let mut hasher2 = DefaultHasher::new();
968 data_type2.hash(&mut hasher2);
969 let hash2 = hasher2.finish();
970
971 assert_eq!(data_type1.topic(), "ExampleType.a=1.b=2");
972 assert_eq!(data_type1.topic(), data_type2.topic());
973 assert_eq!(data_type1, data_type2);
974 assert_eq!(hash1, hash2);
975 assert_eq!(format!("{data_type1}"), format!("{data_type2}"));
976 assert_eq!(data_type1.metadata_str(), r#"{"a":1,"b":2}"#);
977 assert_eq!(data_type1.metadata_str(), data_type2.metadata_str());
978 }
979
980 #[rstest]
981 fn test_data_type_creation_without_metadata() {
982 let data_type = DataType::new("ExampleType", None, None);
983
984 assert_eq!(data_type.type_name(), "ExampleType");
985 assert_eq!(data_type.topic(), "ExampleType");
986 assert_eq!(data_type.metadata(), None);
987 }
988
989 #[rstest]
990 fn test_data_type_equality() {
991 let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
992 let metadata2 = Some(params_from_json(json!({"key1": "value1"})));
993
994 let data_type1 = DataType::new("ExampleType", metadata1, None);
995 let data_type2 = DataType::new("ExampleType", metadata2, None);
996
997 assert_eq!(data_type1, data_type2);
998 }
999
1000 #[rstest]
1001 fn test_data_type_inequality() {
1002 let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
1003 let metadata2 = Some(params_from_json(json!({"key2": "value2"})));
1004
1005 let data_type1 = DataType::new("ExampleType", metadata1, None);
1006 let data_type2 = DataType::new("ExampleType", metadata2, None);
1007
1008 assert_ne!(data_type1, data_type2);
1009 }
1010
1011 #[rstest]
1012 fn test_data_type_ordering() {
1013 let metadata1 = Some(params_from_json(json!({"key1": "value1"})));
1014 let metadata2 = Some(params_from_json(json!({"key2": "value2"})));
1015
1016 let data_type1 = DataType::new("ExampleTypeA", metadata1, None);
1017 let data_type2 = DataType::new("ExampleTypeB", metadata2, None);
1018
1019 assert!(data_type1 < data_type2);
1020 }
1021
1022 #[rstest]
1023 fn test_data_type_hash() {
1024 let metadata = Some(params_from_json(json!({"key1": "value1"})));
1025
1026 let data_type1 = DataType::new("ExampleType", metadata.clone(), None);
1027 let data_type2 = DataType::new("ExampleType", metadata, None);
1028
1029 let mut hasher1 = DefaultHasher::new();
1030 data_type1.hash(&mut hasher1);
1031 let hash1 = hasher1.finish();
1032
1033 let mut hasher2 = DefaultHasher::new();
1034 data_type2.hash(&mut hasher2);
1035 let hash2 = hasher2.finish();
1036
1037 assert_eq!(hash1, hash2);
1038 }
1039
1040 #[rstest]
1041 fn test_data_type_display() {
1042 let metadata = Some(params_from_json(json!({"key1": "value1"})));
1043 let data_type = DataType::new("ExampleType", metadata, None);
1044
1045 assert_eq!(format!("{data_type}"), "ExampleType.key1=value1");
1046 }
1047
1048 #[rstest]
1049 fn test_data_type_debug() {
1050 let metadata = Some(params_from_json(json!({"key1": "value1"})));
1051 let data_type = DataType::new("ExampleType", metadata.clone(), None);
1052
1053 assert_eq!(
1054 format!("{data_type:?}"),
1055 format!("DataType(type_name=ExampleType, metadata={metadata:?}, identifier=None)")
1056 );
1057 }
1058
1059 #[rstest]
1060 fn test_parse_instrument_id_from_metadata() {
1061 let instrument_id_str = "MSFT.XNAS";
1062 let metadata = Some(params_from_json(
1063 json!({"instrument_id": instrument_id_str}),
1064 ));
1065 let data_type = DataType::new("InstrumentAny", metadata, None);
1066
1067 assert_eq!(
1068 data_type.instrument_id().unwrap(),
1069 InstrumentId::from_str(instrument_id_str).unwrap()
1070 );
1071 }
1072
1073 #[rstest]
1074 fn test_parse_venue_from_metadata() {
1075 let venue_str = "BINANCE";
1076 let metadata = Some(params_from_json(json!({"venue": venue_str})));
1077 let data_type = DataType::new(stringify!(InstrumentAny), metadata, None);
1078
1079 assert_eq!(data_type.venue().unwrap(), Venue::new(venue_str));
1080 }
1081
1082 #[rstest]
1083 fn test_parse_start_from_metadata() {
1084 let start_ns = 1_600_054_595_844_758_000;
1085 let metadata = Some(params_from_json(json!({"start": start_ns.to_string()})));
1086 let data_type = DataType::new(stringify!(TradeTick), metadata, None);
1087
1088 assert_eq!(data_type.start().unwrap(), UnixNanos::from(start_ns),);
1089 }
1090
1091 #[rstest]
1092 fn test_parse_end_from_metadata() {
1093 let end_ns = 1_720_954_595_844_758_000;
1094 let metadata = Some(params_from_json(json!({"end": end_ns.to_string()})));
1095 let data_type = DataType::new(stringify!(TradeTick), metadata, None);
1096
1097 assert_eq!(data_type.end().unwrap(), UnixNanos::from(end_ns),);
1098 }
1099
1100 #[rstest]
1101 fn test_parse_limit_from_metadata() {
1102 let limit = 1000;
1103 let metadata = Some(params_from_json(json!({"limit": limit})));
1104 let data_type = DataType::new(stringify!(TradeTick), metadata, None);
1105
1106 assert_eq!(data_type.limit().unwrap(), limit);
1107 }
1108
1109 #[rstest]
1110 fn test_data_type_metadata_accessors_return_none_without_metadata() {
1111 let data_type = DataType::new(stringify!(TradeTick), None, None);
1112
1113 assert_eq!(data_type.instrument_id(), None);
1114 assert_eq!(data_type.venue(), None);
1115 assert_eq!(data_type.start(), None);
1116 assert_eq!(data_type.end(), None);
1117 }
1118
1119 #[rstest]
1120 fn test_data_type_persistence_json_with_identifier() {
1121 let data_type = DataType::new("MyCustomType", None, Some("venue//symbol".to_string()));
1122 let json = data_type.to_persistence_json().unwrap();
1123 assert!(!json.contains("topic"));
1124 assert!(json.contains("\"identifier\":\"venue//symbol\""));
1125 let restored = DataType::from_persistence_json(&json).unwrap();
1126 assert_eq!(restored.type_name(), "MyCustomType");
1127 assert_eq!(restored.identifier(), Some("venue//symbol"));
1128 assert_eq!(restored.topic(), "MyCustomType");
1129 }
1130
1131 #[rstest]
1132 fn test_data_type_from_persistence_json_rebuilds_canonical_topic() {
1133 let json = r#"{
1134 "type_name": "ExampleType",
1135 "topic": "ExampleType.z=9.a=1",
1136 "metadata": {"z": 9, "a": 1}
1137 }"#;
1138
1139 let restored = DataType::from_persistence_json(json).unwrap();
1140
1141 assert_eq!(restored.topic(), "ExampleType.a=1.z=9");
1142 }
1143
1144 #[rstest]
1145 fn test_data_type_identifier_getter() {
1146 let data_type = DataType::new("T", None, Some("id".to_string()));
1147 assert_eq!(data_type.identifier(), Some("id"));
1148 let data_type_no_id = DataType::new("T", None, None);
1149 assert_eq!(data_type_no_id.identifier(), None);
1150 }
1151}