1use std::{any::type_name, collections::HashMap, fmt};
23
24use arrow::{datatypes::Schema, error::ArrowError, record_batch::RecordBatch};
25use nautilus_model::{
26 instruments::{
27 Instrument, InstrumentAny, betting::BettingInstrument, binary_option::BinaryOption,
28 cfd::Cfd, commodity::Commodity, crypto_future::CryptoFuture,
29 crypto_futures_spread::CryptoFuturesSpread, crypto_option::CryptoOption,
30 crypto_option_spread::CryptoOptionSpread, crypto_perpetual::CryptoPerpetual,
31 currency_pair::CurrencyPair, equity::Equity, futures_contract::FuturesContract,
32 futures_spread::FuturesSpread, index_instrument::IndexInstrument,
33 option_contract::OptionContract, option_spread::OptionSpread,
34 perpetual_contract::PerpetualContract, tokenized_asset::TokenizedAsset,
35 },
36 types::Currency,
37};
38
39#[allow(unused)]
40use crate::arrow::{
41 ArrowSchemaProvider, Data, DecodeDataFromRecordBatch, DecodeFromRecordBatch,
42 EncodeToRecordBatch, EncodingError, KEY_INSTRUMENT_ID,
43};
44
45pub mod betting;
46pub mod binary_option;
47pub mod cfd;
48pub mod commodity;
49pub mod crypto_future;
50pub mod crypto_futures_spread;
51pub mod crypto_option;
52pub mod crypto_option_spread;
53pub mod crypto_perpetual;
54pub mod currency_pair;
55pub mod equity;
56pub mod futures_contract;
57pub mod futures_spread;
58pub mod index_instrument;
59pub mod option_contract;
60pub mod option_spread;
61pub mod perpetual_contract;
62pub mod tokenized_asset;
63
64pub(crate) fn decode_currency(
69 value: &str,
70 field: &'static str,
71 context: &'static str,
72 row: usize,
73) -> Result<Currency, EncodingError> {
74 let trimmed = value.trim();
75 if trimmed.is_empty() {
76 return Err(EncodingError::ParseError(
77 field,
78 format!("row {row}: empty currency code"),
79 ));
80 }
81
82 Ok(Currency::get_or_create_crypto_with_context(
83 trimmed,
84 Some(context),
85 ))
86}
87
88const INSTRUMENT_VALIDATION_FIELD: &str = "instrument";
89
90pub(crate) fn instrument_validation_error<T>(
91 row: usize,
92 error: impl fmt::Display,
93) -> EncodingError {
94 let type_name = type_name::<T>();
95 let instrument_type = type_name.rsplit("::").next().unwrap_or(type_name);
96
97 EncodingError::ParseError(
98 INSTRUMENT_VALIDATION_FIELD,
99 format!("row {row}: invalid {instrument_type}: {error}"),
100 )
101}
102
103impl ArrowSchemaProvider for InstrumentAny {
104 fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema {
105 let instrument_type = metadata
106 .as_ref()
107 .and_then(|m| m.get("class"))
108 .map_or("CurrencyPair", |s| s.as_str());
109
110 match instrument_type {
111 "BettingInstrument" => BettingInstrument::get_schema(metadata),
112 "BinaryOption" => BinaryOption::get_schema(metadata),
113 "Cfd" => Cfd::get_schema(metadata),
114 "Commodity" => Commodity::get_schema(metadata),
115 "CryptoFuture" => CryptoFuture::get_schema(metadata),
116 "CryptoFuturesSpread" => CryptoFuturesSpread::get_schema(metadata),
117 "CryptoOption" => CryptoOption::get_schema(metadata),
118 "CryptoOptionSpread" => CryptoOptionSpread::get_schema(metadata),
119 "CryptoPerpetual" => CryptoPerpetual::get_schema(metadata),
120 "CurrencyPair" => CurrencyPair::get_schema(metadata),
121 "Equity" => Equity::get_schema(metadata),
122 "FuturesContract" => FuturesContract::get_schema(metadata),
123 "FuturesSpread" => FuturesSpread::get_schema(metadata),
124 "IndexInstrument" => IndexInstrument::get_schema(metadata),
125 "OptionContract" => OptionContract::get_schema(metadata),
126 "OptionSpread" => OptionSpread::get_schema(metadata),
127 "PerpetualContract" => PerpetualContract::get_schema(metadata),
128 "TokenizedAsset" => TokenizedAsset::get_schema(metadata),
129 _ => {
130 CurrencyPair::get_schema(metadata)
132 }
133 }
134 }
135}
136
137impl EncodeToRecordBatch for InstrumentAny {
138 fn encode_batch(
139 #[allow(unused)] metadata: &HashMap<String, String>,
140 data: &[Self],
141 ) -> Result<RecordBatch, ArrowError> {
142 if data.is_empty() {
143 return Err(ArrowError::InvalidArgumentError(
144 "Cannot encode empty instrument batch".to_string(),
145 ));
146 }
147
148 let mut by_type: HashMap<String, Vec<&Self>> = HashMap::new();
149
150 for instrument in data {
151 let type_name = match instrument {
152 Self::Cfd(_) => "Cfd",
153 Self::Commodity(_) => "Commodity",
154 Self::CurrencyPair(_) => "CurrencyPair",
155 Self::Equity(_) => "Equity",
156 Self::CryptoFuture(_) => "CryptoFuture",
157 Self::CryptoFuturesSpread(_) => "CryptoFuturesSpread",
158 Self::CryptoPerpetual(_) => "CryptoPerpetual",
159 Self::CryptoOption(_) => "CryptoOption",
160 Self::CryptoOptionSpread(_) => "CryptoOptionSpread",
161 Self::FuturesContract(_) => "FuturesContract",
162 Self::FuturesSpread(_) => "FuturesSpread",
163 Self::IndexInstrument(_) => "IndexInstrument",
164 Self::OptionContract(_) => "OptionContract",
165 Self::OptionSpread(_) => "OptionSpread",
166 Self::BinaryOption(_) => "BinaryOption",
167 Self::Betting(_) => "BettingInstrument",
168 Self::PerpetualContract(_) => "PerpetualContract",
169 Self::TokenizedAsset(_) => "TokenizedAsset",
170 };
171 by_type
172 .entry(type_name.to_string())
173 .or_default()
174 .push(instrument);
175 }
176
177 if by_type.len() > 1 {
178 return Err(ArrowError::InvalidArgumentError(
179 "Cannot encode mixed instrument types in a single batch. Use separate batches for each type.".to_string(),
180 ));
181 }
182
183 let (type_name, instruments) = by_type.iter().next().unwrap();
184 match type_name.as_str() {
185 "Cfd" => {
186 let cfds: Vec<_> = instruments
187 .iter()
188 .map(|i| {
189 if let Self::Cfd(c) = i {
190 c
191 } else {
192 unreachable!()
193 }
194 })
195 .cloned()
196 .collect();
197 Cfd::encode_batch(metadata, &cfds)
198 }
199 "Commodity" => {
200 let commodities: Vec<_> = instruments
201 .iter()
202 .map(|i| {
203 if let Self::Commodity(c) = i {
204 c
205 } else {
206 unreachable!()
207 }
208 })
209 .cloned()
210 .collect();
211 Commodity::encode_batch(metadata, &commodities)
212 }
213 "BettingInstrument" => {
214 let betting: Vec<_> = instruments
215 .iter()
216 .map(|i| {
217 if let Self::Betting(b) = i {
218 b
219 } else {
220 unreachable!()
221 }
222 })
223 .cloned()
224 .collect();
225 BettingInstrument::encode_batch(metadata, &betting)
226 }
227 "BinaryOption" => {
228 let binary_options: Vec<_> = instruments
229 .iter()
230 .map(|i| {
231 if let Self::BinaryOption(bo) = i {
232 bo
233 } else {
234 unreachable!()
235 }
236 })
237 .cloned()
238 .collect();
239 BinaryOption::encode_batch(metadata, &binary_options)
240 }
241 "CryptoFuture" => {
242 let crypto_futures: Vec<_> = instruments
243 .iter()
244 .map(|i| {
245 if let Self::CryptoFuture(cf) = i {
246 cf
247 } else {
248 unreachable!()
249 }
250 })
251 .cloned()
252 .collect();
253 CryptoFuture::encode_batch(metadata, &crypto_futures)
254 }
255 "CryptoFuturesSpread" => {
256 let spreads: Vec<_> = instruments
257 .iter()
258 .map(|i| {
259 if let Self::CryptoFuturesSpread(cfs) = i {
260 cfs
261 } else {
262 unreachable!()
263 }
264 })
265 .cloned()
266 .collect();
267 CryptoFuturesSpread::encode_batch(metadata, &spreads)
268 }
269 "CryptoOption" => {
270 let crypto_options: Vec<_> = instruments
271 .iter()
272 .map(|i| {
273 if let Self::CryptoOption(co) = i {
274 co
275 } else {
276 unreachable!()
277 }
278 })
279 .cloned()
280 .collect();
281 CryptoOption::encode_batch(metadata, &crypto_options)
282 }
283 "CryptoOptionSpread" => {
284 let spreads: Vec<_> = instruments
285 .iter()
286 .map(|i| {
287 if let Self::CryptoOptionSpread(cos) = i {
288 cos
289 } else {
290 unreachable!()
291 }
292 })
293 .cloned()
294 .collect();
295 CryptoOptionSpread::encode_batch(metadata, &spreads)
296 }
297 "CryptoPerpetual" => {
298 let crypto_perps: Vec<_> = instruments
299 .iter()
300 .map(|i| {
301 if let Self::CryptoPerpetual(cp) = i {
302 cp
303 } else {
304 unreachable!()
305 }
306 })
307 .cloned()
308 .collect();
309 CryptoPerpetual::encode_batch(metadata, &crypto_perps)
310 }
311 "CurrencyPair" => {
312 let currency_pairs: Vec<_> = instruments
313 .iter()
314 .map(|i| {
315 if let Self::CurrencyPair(cp) = i {
316 cp
317 } else {
318 unreachable!()
319 }
320 })
321 .cloned()
322 .collect();
323 CurrencyPair::encode_batch(metadata, ¤cy_pairs)
324 }
325 "Equity" => {
326 let equities: Vec<_> = instruments
327 .iter()
328 .map(|i| {
329 if let Self::Equity(e) = i {
330 e
331 } else {
332 unreachable!()
333 }
334 })
335 .cloned()
336 .collect();
337 Equity::encode_batch(metadata, &equities)
338 }
339 "FuturesContract" => {
340 let futures_contracts: Vec<_> = instruments
341 .iter()
342 .map(|i| {
343 if let Self::FuturesContract(fc) = i {
344 fc
345 } else {
346 unreachable!()
347 }
348 })
349 .cloned()
350 .collect();
351 FuturesContract::encode_batch(metadata, &futures_contracts)
352 }
353 "FuturesSpread" => {
354 let futures_spreads: Vec<_> = instruments
355 .iter()
356 .map(|i| {
357 if let Self::FuturesSpread(fs) = i {
358 fs
359 } else {
360 unreachable!()
361 }
362 })
363 .cloned()
364 .collect();
365 FuturesSpread::encode_batch(metadata, &futures_spreads)
366 }
367 "IndexInstrument" => {
368 let index_instruments: Vec<_> = instruments
369 .iter()
370 .map(|i| {
371 if let Self::IndexInstrument(ii) = i {
372 ii
373 } else {
374 unreachable!()
375 }
376 })
377 .cloned()
378 .collect();
379 IndexInstrument::encode_batch(metadata, &index_instruments)
380 }
381 "OptionContract" => {
382 let option_contracts: Vec<_> = instruments
383 .iter()
384 .map(|i| {
385 if let Self::OptionContract(oc) = i {
386 oc
387 } else {
388 unreachable!()
389 }
390 })
391 .cloned()
392 .collect();
393 OptionContract::encode_batch(metadata, &option_contracts)
394 }
395 "OptionSpread" => {
396 let option_spreads: Vec<_> = instruments
397 .iter()
398 .map(|i| {
399 if let Self::OptionSpread(os) = i {
400 os
401 } else {
402 unreachable!()
403 }
404 })
405 .cloned()
406 .collect();
407 OptionSpread::encode_batch(metadata, &option_spreads)
408 }
409 "PerpetualContract" => {
410 let perpetual_contracts: Vec<_> = instruments
411 .iter()
412 .map(|i| {
413 if let Self::PerpetualContract(pc) = i {
414 pc
415 } else {
416 unreachable!()
417 }
418 })
419 .cloned()
420 .collect();
421 PerpetualContract::encode_batch(metadata, &perpetual_contracts)
422 }
423 "TokenizedAsset" => {
424 let tokenized_assets: Vec<_> = instruments
425 .iter()
426 .map(|i| {
427 if let Self::TokenizedAsset(ta) = i {
428 ta
429 } else {
430 unreachable!()
431 }
432 })
433 .cloned()
434 .collect();
435 TokenizedAsset::encode_batch(metadata, &tokenized_assets)
436 }
437 _ => Err(ArrowError::InvalidArgumentError(format!(
438 "Instrument type {type_name} serialization not yet implemented"
439 ))),
440 }
441 }
442
443 fn metadata(&self) -> HashMap<String, String> {
444 let mut metadata = HashMap::new();
445 metadata.insert(
446 KEY_INSTRUMENT_ID.to_string(),
447 Instrument::id(self).to_string(),
448 );
449
450 let type_name = match self {
451 Self::Cfd(_) => "Cfd",
452 Self::Commodity(_) => "Commodity",
453 Self::CurrencyPair(_) => "CurrencyPair",
454 Self::Equity(_) => "Equity",
455 Self::CryptoFuture(_) => "CryptoFuture",
456 Self::CryptoFuturesSpread(_) => "CryptoFuturesSpread",
457 Self::CryptoPerpetual(_) => "CryptoPerpetual",
458 Self::CryptoOption(_) => "CryptoOption",
459 Self::CryptoOptionSpread(_) => "CryptoOptionSpread",
460 Self::FuturesContract(_) => "FuturesContract",
461 Self::FuturesSpread(_) => "FuturesSpread",
462 Self::IndexInstrument(_) => "IndexInstrument",
463 Self::OptionContract(_) => "OptionContract",
464 Self::OptionSpread(_) => "OptionSpread",
465 Self::BinaryOption(_) => "BinaryOption",
466 Self::Betting(_) => "BettingInstrument",
467 Self::PerpetualContract(_) => "PerpetualContract",
468 Self::TokenizedAsset(_) => "TokenizedAsset",
469 };
470 metadata.insert("class".to_string(), type_name.to_string());
471 metadata
472 }
473}
474
475pub fn decode_instrument_any_batch(
482 #[allow(unused)] metadata: &HashMap<String, String>,
483 record_batch: &RecordBatch,
484) -> Result<Vec<InstrumentAny>, EncodingError> {
485 let type_name = metadata
486 .get("class")
487 .map(String::as_str)
488 .ok_or_else(|| EncodingError::MissingMetadata("class"))?;
489
490 match type_name {
491 "Cfd" => {
492 let cfds = cfd::decode_cfd_batch(metadata, record_batch)?;
493 Ok(cfds.into_iter().map(InstrumentAny::Cfd).collect())
494 }
495 "Commodity" => {
496 let commodities = commodity::decode_commodity_batch(metadata, record_batch)?;
497 Ok(commodities
498 .into_iter()
499 .map(InstrumentAny::Commodity)
500 .collect())
501 }
502 "BettingInstrument" => {
503 let betting = betting::decode_betting_instrument_batch(metadata, record_batch)?;
504 Ok(betting.into_iter().map(InstrumentAny::Betting).collect())
505 }
506 "BinaryOption" => {
507 let binary_options = binary_option::decode_binary_option_batch(metadata, record_batch)?;
508 Ok(binary_options
509 .into_iter()
510 .map(InstrumentAny::BinaryOption)
511 .collect())
512 }
513 "CryptoFuture" => {
514 let crypto_futures = crypto_future::decode_crypto_future_batch(metadata, record_batch)?;
515 Ok(crypto_futures
516 .into_iter()
517 .map(InstrumentAny::CryptoFuture)
518 .collect())
519 }
520 "CryptoFuturesSpread" => {
521 let spreads =
522 crypto_futures_spread::decode_crypto_futures_spread_batch(metadata, record_batch)?;
523 Ok(spreads
524 .into_iter()
525 .map(InstrumentAny::CryptoFuturesSpread)
526 .collect())
527 }
528 "CryptoOption" => {
529 let crypto_options = crypto_option::decode_crypto_option_batch(metadata, record_batch)?;
530 Ok(crypto_options
531 .into_iter()
532 .map(InstrumentAny::CryptoOption)
533 .collect())
534 }
535 "CryptoOptionSpread" => {
536 let spreads =
537 crypto_option_spread::decode_crypto_option_spread_batch(metadata, record_batch)?;
538 Ok(spreads
539 .into_iter()
540 .map(InstrumentAny::CryptoOptionSpread)
541 .collect())
542 }
543 "CryptoPerpetual" => {
544 let crypto_perps =
545 crypto_perpetual::decode_crypto_perpetual_batch(metadata, record_batch)?;
546 Ok(crypto_perps
547 .into_iter()
548 .map(InstrumentAny::CryptoPerpetual)
549 .collect())
550 }
551 "CurrencyPair" => {
552 let currency_pairs = currency_pair::decode_currency_pair_batch(metadata, record_batch)?;
553 Ok(currency_pairs
554 .into_iter()
555 .map(InstrumentAny::CurrencyPair)
556 .collect())
557 }
558 "Equity" => {
559 let equities = equity::decode_equity_batch(metadata, record_batch)?;
560 Ok(equities.into_iter().map(InstrumentAny::Equity).collect())
561 }
562 "FuturesContract" => {
563 let futures_contracts =
564 futures_contract::decode_futures_contract_batch(metadata, record_batch)?;
565 Ok(futures_contracts
566 .into_iter()
567 .map(InstrumentAny::FuturesContract)
568 .collect())
569 }
570 "FuturesSpread" => {
571 let futures_spreads =
572 futures_spread::decode_futures_spread_batch(metadata, record_batch)?;
573 Ok(futures_spreads
574 .into_iter()
575 .map(InstrumentAny::FuturesSpread)
576 .collect())
577 }
578 "IndexInstrument" => {
579 let index_instruments =
580 index_instrument::decode_index_instrument_batch(metadata, record_batch)?;
581 Ok(index_instruments
582 .into_iter()
583 .map(InstrumentAny::IndexInstrument)
584 .collect())
585 }
586 "OptionContract" => {
587 let option_contracts =
588 option_contract::decode_option_contract_batch(metadata, record_batch)?;
589 Ok(option_contracts
590 .into_iter()
591 .map(InstrumentAny::OptionContract)
592 .collect())
593 }
594 "OptionSpread" => {
595 let option_spreads = option_spread::decode_option_spread_batch(metadata, record_batch)?;
596 Ok(option_spreads
597 .into_iter()
598 .map(InstrumentAny::OptionSpread)
599 .collect())
600 }
601 "PerpetualContract" => {
602 let perpetual_contracts =
603 perpetual_contract::decode_perpetual_contract_batch(metadata, record_batch)?;
604 Ok(perpetual_contracts
605 .into_iter()
606 .map(InstrumentAny::PerpetualContract)
607 .collect())
608 }
609 "TokenizedAsset" => {
610 let tokenized_assets =
611 tokenized_asset::decode_tokenized_asset_batch(metadata, record_batch)?;
612 Ok(tokenized_assets
613 .into_iter()
614 .map(InstrumentAny::TokenizedAsset)
615 .collect())
616 }
617 _ => Err(EncodingError::ParseError(
618 "class",
619 format!("Unknown instrument type: {type_name}"),
620 )),
621 }
622}
623
624#[cfg(test)]
625mod tests {
626 use std::sync::Arc;
627
628 use arrow::array::{ArrayRef, StringArray, UInt8Array};
629 use nautilus_core::UnixNanos;
630 use nautilus_model::{
631 enums::CurrencyType,
632 identifiers::{InstrumentId, Symbol},
633 instruments::{Instrument, InstrumentAny, currency_pair::CurrencyPair},
634 types::{Currency, Price, Quantity},
635 };
636 use rstest::rstest;
637 use ustr::Ustr;
638
639 use super::*;
640
641 #[rstest]
642 fn test_get_schema() {
643 let mut metadata = HashMap::new();
644 metadata.insert("class".to_string(), "CurrencyPair".to_string());
645 let schema = InstrumentAny::get_schema(Some(metadata));
646 assert!(schema.fields().len() >= 20);
647 assert_eq!(schema.field(0).name(), "id");
648 }
649
650 #[rstest]
651 #[case("")]
652 #[case(" ")]
653 #[case("\t\n")]
654 fn test_decode_currency_empty_or_whitespace_errors(#[case] value: &str) {
655 let result = decode_currency(value, "currency", "test.currency", 7);
656 let err = result.expect_err("empty code must surface EncodingError");
657 match err {
658 EncodingError::ParseError(field, msg) => {
659 assert_eq!(field, "currency");
660 assert!(
661 msg.contains("row 7"),
662 "message should include row index, found: {msg}",
663 );
664 assert!(
665 msg.contains("empty currency code"),
666 "message should describe empty code, found: {msg}",
667 );
668 }
669 other => panic!("unexpected error variant: {other:?}"),
670 }
671 assert!(Currency::try_from_str(value.trim()).is_none());
673 }
674
675 #[rstest]
676 #[case("USD", CurrencyType::Fiat, 2)]
677 #[case("BTC", CurrencyType::Crypto, 8)]
678 #[case("XAU", CurrencyType::CommodityBacked, 2)]
679 fn test_decode_currency_known_code_preserves_metadata(
680 #[case] code: &str,
681 #[case] expected_type: CurrencyType,
682 #[case] expected_precision: u8,
683 ) {
684 let currency = decode_currency(code, "currency", "test.currency", 0).unwrap();
685 assert_eq!(currency.code.as_str(), code);
686 assert_eq!(currency.currency_type, expected_type);
687 assert_eq!(currency.precision, expected_precision);
688 }
689
690 #[rstest]
691 fn test_decode_currency_unknown_code_registers_as_crypto() {
692 let code = "XDECTEST";
693 assert!(
694 Currency::try_from_str(code).is_none(),
695 "test precondition: '{code}' must not be pre-registered",
696 );
697
698 let currency = decode_currency(code, "base_currency", "test.base_currency", 0).unwrap();
699 assert_eq!(currency.code.as_str(), code);
700 assert_eq!(currency.currency_type, CurrencyType::Crypto);
701 assert_eq!(currency.precision, 8);
702 assert_eq!(currency.iso4217, 0);
703
704 let registered = Currency::try_from_str(code).expect("unknown code must be registered");
705 assert_eq!(registered, currency);
706 }
707
708 #[rstest]
709 fn test_encode_decode_round_trip() {
710 let instrument_id = InstrumentId::from("EUR/USD.SIM");
711 let currency_pair = CurrencyPair::new(
712 instrument_id,
713 Symbol::from("EUR/USD"),
714 Currency::from("EUR"),
715 Currency::from("USD"),
716 5,
717 0, Price::new(0.00001, 5),
719 Quantity::new(1.0, 0), None, None, None, None, None, None, None, None, None, None, None, None, Some(Ustr::from("FOREX_5DECIMAL")),
733 None, UnixNanos::default(),
735 UnixNanos::default(),
736 );
737 let instrument = InstrumentAny::CurrencyPair(currency_pair);
738
739 let metadata = instrument.metadata();
740 let record_batch =
741 InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&instrument)).unwrap();
742 let decoded = decode_instrument_any_batch(&metadata, &record_batch).unwrap();
743
744 assert_eq!(decoded.len(), 1);
745 assert_eq!(Instrument::id(&decoded[0]), Instrument::id(&instrument));
746 assert_eq!(
747 Instrument::raw_symbol(&decoded[0]),
748 Instrument::raw_symbol(&instrument)
749 );
750 assert_eq!(
751 Instrument::asset_class(&decoded[0]),
752 Instrument::asset_class(&instrument)
753 );
754
755 match (&decoded[0], &instrument) {
756 (InstrumentAny::CurrencyPair(decoded_cp), InstrumentAny::CurrencyPair(original_cp)) => {
757 assert_eq!(decoded_cp.id, original_cp.id);
758 assert_eq!(decoded_cp.base_currency, original_cp.base_currency);
759 assert_eq!(decoded_cp.quote_currency, original_cp.quote_currency);
760 assert_eq!(decoded_cp.price_precision, original_cp.price_precision);
761 assert_eq!(decoded_cp.size_precision, original_cp.size_precision);
762 assert_eq!(decoded_cp.tick_scheme, original_cp.tick_scheme);
763 }
764 _ => panic!("Decoded instrument type mismatch"),
765 }
766 }
767
768 #[rstest]
769 fn test_decode_currency_pair_without_tick_scheme_column_defaults_none() {
770 let instrument_id = InstrumentId::from("EUR/USD.SIM");
771 let currency_pair = CurrencyPair::new(
772 instrument_id,
773 Symbol::from("EUR/USD"),
774 Currency::from("EUR"),
775 Currency::from("USD"),
776 5,
777 0,
778 Price::new(0.00001, 5),
779 Quantity::new(1.0, 0),
780 None, None, None, None, None, None, None, None, None, None, None, None, Some(Ustr::from("FOREX_5DECIMAL")),
793 None, UnixNanos::default(),
795 UnixNanos::default(),
796 );
797 let instrument = InstrumentAny::CurrencyPair(currency_pair);
798
799 let metadata = instrument.metadata();
800 let record_batch =
801 InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&instrument)).unwrap();
802 let record_batch = batch_without_column(&record_batch, "tick_scheme");
803 let decoded = decode_instrument_any_batch(&metadata, &record_batch).unwrap();
804
805 assert_eq!(decoded.len(), 1);
806 match &decoded[0] {
807 InstrumentAny::CurrencyPair(decoded_cp) => {
808 assert_eq!(decoded_cp.id, instrument.id());
809 assert_eq!(decoded_cp.tick_scheme, None);
810 }
811 _ => panic!("Decoded instrument type mismatch"),
812 }
813 }
814
815 #[rstest]
816 fn test_encode_decode_round_trip_equity() {
817 use nautilus_model::instruments::{Instrument, equity::Equity};
818
819 let instrument_id = InstrumentId::from("AAPL.NASDAQ");
820 let equity = Equity::new(
821 instrument_id,
822 Symbol::from("AAPL"),
823 None, Currency::from("USD"),
825 2,
826 Price::new(0.01, 2),
827 None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
839 UnixNanos::default(),
840 );
841 let instrument = InstrumentAny::Equity(equity);
842
843 let metadata = instrument.metadata();
844 let record_batch =
845 InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&instrument)).unwrap();
846 let decoded = decode_instrument_any_batch(&metadata, &record_batch).unwrap();
847 assert_eq!(decoded.len(), 1);
848 assert_eq!(Instrument::id(&decoded[0]), Instrument::id(&instrument));
849 assert_eq!(
850 Instrument::raw_symbol(&decoded[0]),
851 Instrument::raw_symbol(&instrument)
852 );
853 assert_eq!(
854 Instrument::asset_class(&decoded[0]),
855 Instrument::asset_class(&instrument)
856 );
857
858 match (&decoded[0], &instrument) {
859 (InstrumentAny::Equity(decoded_eq), InstrumentAny::Equity(original_eq)) => {
860 assert_eq!(decoded_eq.id, original_eq.id);
861 assert_eq!(decoded_eq.currency, original_eq.currency);
862 assert_eq!(decoded_eq.price_precision, original_eq.price_precision);
863 }
864 _ => panic!("Decoded instrument type mismatch"),
865 }
866 }
867
868 fn roundtrip_case(instrument: &InstrumentAny) {
869 let metadata = instrument.metadata();
870 let record_batch =
871 InstrumentAny::encode_batch(&metadata, std::slice::from_ref(instrument)).unwrap();
872 let decoded = decode_instrument_any_batch(&metadata, &record_batch).unwrap();
873
874 assert_eq!(decoded.len(), 1);
875 assert_eq!(Instrument::id(&decoded[0]), Instrument::id(instrument));
876 assert_eq!(
877 Instrument::raw_symbol(&decoded[0]),
878 Instrument::raw_symbol(instrument)
879 );
880 assert_eq!(
881 Instrument::asset_class(&decoded[0]),
882 Instrument::asset_class(instrument)
883 );
884 assert_eq!(
885 Instrument::instrument_class(&decoded[0]),
886 Instrument::instrument_class(instrument)
887 );
888 assert_eq!(
889 Instrument::price_precision(&decoded[0]),
890 Instrument::price_precision(instrument)
891 );
892 assert_eq!(
893 Instrument::size_precision(&decoded[0]),
894 Instrument::size_precision(instrument)
895 );
896 assert_eq!(
897 Instrument::quote_currency(&decoded[0]),
898 Instrument::quote_currency(instrument)
899 );
900 assert_eq!(
901 std::mem::discriminant(&decoded[0]),
902 std::mem::discriminant(instrument),
903 "decoded variant must match encoded variant"
904 );
905 }
906
907 fn batch_without_column(record_batch: &RecordBatch, column_name: &str) -> RecordBatch {
908 let schema = record_batch.schema();
909 let column_index = schema.index_of(column_name).unwrap();
910 let fields: Vec<_> = schema
911 .fields()
912 .iter()
913 .enumerate()
914 .filter(|(index, _)| *index != column_index)
915 .map(|(_, field)| field.as_ref().clone())
916 .collect();
917 let columns = record_batch
918 .columns()
919 .iter()
920 .enumerate()
921 .filter(|(index, _)| *index != column_index)
922 .map(|(_, column)| Arc::clone(column))
923 .collect();
924 let new_schema = Schema::new_with_metadata(fields, schema.metadata().clone());
925
926 RecordBatch::try_new(Arc::new(new_schema), columns).unwrap()
927 }
928
929 fn batch_with_null_string_column(record_batch: &RecordBatch, column_name: &str) -> RecordBatch {
930 let schema = record_batch.schema();
931 let column_index = schema.index_of(column_name).unwrap();
932 let mut columns = record_batch.columns().to_vec();
933 let null_column: ArrayRef = Arc::new(StringArray::from(vec![None::<&str>]));
934 columns[column_index] = null_column;
935
936 RecordBatch::try_new(schema, columns).unwrap()
937 }
938
939 fn batch_with_uint8_column(
940 record_batch: &RecordBatch,
941 column_name: &str,
942 values: Vec<u8>,
943 ) -> RecordBatch {
944 let schema = record_batch.schema();
945 let column_index = schema.index_of(column_name).unwrap();
946 let mut columns = record_batch.columns().to_vec();
947 columns[column_index] = Arc::new(UInt8Array::from(values));
948
949 RecordBatch::try_new(schema, columns).unwrap()
950 }
951
952 #[rstest]
953 #[case::binary_option(InstrumentAny::BinaryOption(
954 nautilus_model::instruments::stubs::binary_option()
955 ))]
956 #[case::cfd(InstrumentAny::Cfd(nautilus_model::instruments::stubs::cfd_gold()))]
957 #[case::commodity(InstrumentAny::Commodity(
958 nautilus_model::instruments::stubs::commodity_gold()
959 ))]
960 #[case::crypto_future(InstrumentAny::CryptoFuture(
961 nautilus_model::instruments::stubs::crypto_future_btcusdt(
962 2,
963 6,
964 Price::from("0.01"),
965 Quantity::from("0.000001"),
966 )
967 ))]
968 #[case::crypto_futures_spread(InstrumentAny::CryptoFuturesSpread(
969 nautilus_model::instruments::stubs::crypto_futures_spread_btc_deribit()
970 ))]
971 #[case::crypto_option(InstrumentAny::CryptoOption(
972 nautilus_model::instruments::stubs::crypto_option_btc_deribit(
973 3,
974 1,
975 Price::from("0.001"),
976 Quantity::from("0.1"),
977 )
978 ))]
979 #[case::crypto_option_spread(InstrumentAny::CryptoOptionSpread(
980 nautilus_model::instruments::stubs::crypto_option_spread_btc_deribit()
981 ))]
982 #[case::crypto_perpetual(InstrumentAny::CryptoPerpetual(
983 nautilus_model::instruments::stubs::crypto_perpetual_ethusdt()
984 ))]
985 #[case::currency_pair(InstrumentAny::CurrencyPair(
986 nautilus_model::instruments::stubs::currency_pair_btcusdt()
987 ))]
988 #[case::equity(InstrumentAny::Equity(nautilus_model::instruments::stubs::equity_aapl()))]
989 #[case::futures_contract(InstrumentAny::FuturesContract(
990 nautilus_model::instruments::stubs::futures_contract_es(None, None,)
991 ))]
992 #[case::futures_spread(InstrumentAny::FuturesSpread(
993 nautilus_model::instruments::stubs::futures_spread_es()
994 ))]
995 #[case::index_instrument(InstrumentAny::IndexInstrument(
996 nautilus_model::instruments::stubs::index_instrument_spx()
997 ))]
998 #[case::option_contract(InstrumentAny::OptionContract(
999 nautilus_model::instruments::stubs::option_contract_appl()
1000 ))]
1001 #[case::option_spread(InstrumentAny::OptionSpread(
1002 nautilus_model::instruments::stubs::option_spread()
1003 ))]
1004 #[case::perpetual_contract(InstrumentAny::PerpetualContract(
1005 nautilus_model::instruments::stubs::perpetual_contract_eurusd()
1006 ))]
1007 #[case::tokenized_asset(InstrumentAny::TokenizedAsset(
1008 nautilus_model::instruments::stubs::tokenized_asset_aaplx()
1009 ))]
1010 fn test_decode_instrument_checked_constructor_error(#[case] instrument: InstrumentAny) {
1011 let metadata = instrument.metadata();
1012 let class = metadata.get("class").unwrap();
1013 let first_row_price_precision = Instrument::price_precision(&instrument);
1014 let instruments = vec![instrument.clone(), instrument];
1015 let record_batch = InstrumentAny::encode_batch(&metadata, &instruments).unwrap();
1016 let record_batch = batch_with_uint8_column(
1017 &record_batch,
1018 "price_precision",
1019 vec![first_row_price_precision, u8::MAX],
1020 );
1021
1022 let error = decode_instrument_any_batch(&metadata, &record_batch)
1023 .expect_err("invalid precision must return EncodingError");
1024
1025 match error {
1026 EncodingError::ParseError(field, message) => {
1027 assert_eq!(field, INSTRUMENT_VALIDATION_FIELD);
1028 assert!(
1029 message.contains(class),
1030 "message should include instrument class, found: {message}",
1031 );
1032 assert!(
1033 message.starts_with("row 1:"),
1034 "message should include row index, found: {message}",
1035 );
1036 assert!(
1037 message.contains("price_precision"),
1038 "message should include failed precision, found: {message}",
1039 );
1040 }
1041 other => panic!("unexpected error variant: {other:?}"),
1042 }
1043 }
1044
1045 #[rstest]
1046 fn test_roundtrip_betting() {
1047 use nautilus_model::instruments::stubs::betting;
1048 roundtrip_case(&InstrumentAny::Betting(betting()));
1049 }
1050
1051 #[rstest]
1052 fn test_roundtrip_binary_option() {
1053 use nautilus_model::instruments::stubs::binary_option;
1054 roundtrip_case(&InstrumentAny::BinaryOption(binary_option()));
1055 }
1056
1057 #[rstest]
1058 fn test_roundtrip_cfd() {
1059 use nautilus_model::instruments::stubs::cfd_gold;
1060 roundtrip_case(&InstrumentAny::Cfd(cfd_gold()));
1061 }
1062
1063 #[rstest]
1064 fn test_roundtrip_commodity() {
1065 use nautilus_model::instruments::stubs::commodity_gold;
1066 roundtrip_case(&InstrumentAny::Commodity(commodity_gold()));
1067 }
1068
1069 #[rstest]
1070 fn test_roundtrip_crypto_future() {
1071 use nautilus_model::instruments::stubs::crypto_future_btcusdt;
1072
1073 let mut inst = crypto_future_btcusdt(2, 6, Price::from("0.01"), Quantity::from("0.000001"));
1074 inst.lot_size = Quantity::from("0.25");
1075 let any = InstrumentAny::CryptoFuture(inst.clone());
1076 roundtrip_case(&any);
1077 let metadata = any.metadata();
1078 let batch = InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&any)).unwrap();
1079 let decoded = decode_instrument_any_batch(&metadata, &batch).unwrap();
1080 let InstrumentAny::CryptoFuture(decoded_inst) = &decoded[0] else {
1081 panic!("decoded variant is not CryptoFuture");
1082 };
1083 assert_eq!(decoded_inst.lot_size, inst.lot_size);
1084 }
1085
1086 #[rstest]
1087 fn test_decode_crypto_future_without_lot_size_column_defaults_to_one() {
1088 use nautilus_model::instruments::stubs::crypto_future_btcusdt;
1089
1090 let inst = crypto_future_btcusdt(2, 6, Price::from("0.01"), Quantity::from("0.000001"));
1091 let any = InstrumentAny::CryptoFuture(inst);
1092 let metadata = any.metadata();
1093 let batch = InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&any)).unwrap();
1094 let batch = batch_without_column(&batch, "lot_size");
1095
1096 let decoded = decode_instrument_any_batch(&metadata, &batch).unwrap();
1097
1098 let InstrumentAny::CryptoFuture(decoded_inst) = &decoded[0] else {
1099 panic!("decoded variant is not CryptoFuture");
1100 };
1101 assert_eq!(decoded_inst.lot_size, Quantity::from(1));
1102 }
1103
1104 #[rstest]
1105 fn test_decode_crypto_future_null_lot_size_defaults_to_one() {
1106 use nautilus_model::instruments::stubs::crypto_future_btcusdt;
1107
1108 let inst = crypto_future_btcusdt(2, 6, Price::from("0.01"), Quantity::from("0.000001"));
1109 let any = InstrumentAny::CryptoFuture(inst);
1110 let metadata = any.metadata();
1111 let batch = InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&any)).unwrap();
1112 let batch = batch_with_null_string_column(&batch, "lot_size");
1113
1114 let decoded = decode_instrument_any_batch(&metadata, &batch).unwrap();
1115
1116 let InstrumentAny::CryptoFuture(decoded_inst) = &decoded[0] else {
1117 panic!("decoded variant is not CryptoFuture");
1118 };
1119 assert_eq!(decoded_inst.lot_size, Quantity::from(1));
1120 }
1121
1122 #[rstest]
1123 fn test_roundtrip_crypto_option() {
1124 use nautilus_model::instruments::stubs::crypto_option_btc_deribit;
1125
1126 let mut inst = crypto_option_btc_deribit(3, 1, Price::from("0.001"), Quantity::from("0.1"));
1127 inst.lot_size = Quantity::from("0.5");
1128 let any = InstrumentAny::CryptoOption(inst.clone());
1129 roundtrip_case(&any);
1130 let metadata = any.metadata();
1131 let batch = InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&any)).unwrap();
1132 let decoded = decode_instrument_any_batch(&metadata, &batch).unwrap();
1133 let InstrumentAny::CryptoOption(decoded_inst) = &decoded[0] else {
1134 panic!("decoded variant is not CryptoOption");
1135 };
1136 assert_eq!(decoded_inst.lot_size, inst.lot_size);
1137 }
1138
1139 #[rstest]
1140 fn test_decode_crypto_option_without_lot_size_column_defaults_to_one() {
1141 use nautilus_model::instruments::stubs::crypto_option_btc_deribit;
1142
1143 let inst = crypto_option_btc_deribit(3, 1, Price::from("0.001"), Quantity::from("0.1"));
1144 let any = InstrumentAny::CryptoOption(inst);
1145 let metadata = any.metadata();
1146 let batch = InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&any)).unwrap();
1147 let batch = batch_without_column(&batch, "lot_size");
1148
1149 let decoded = decode_instrument_any_batch(&metadata, &batch).unwrap();
1150
1151 let InstrumentAny::CryptoOption(decoded_inst) = &decoded[0] else {
1152 panic!("decoded variant is not CryptoOption");
1153 };
1154 assert_eq!(decoded_inst.lot_size, Quantity::from(1));
1155 }
1156
1157 #[rstest]
1158 fn test_decode_crypto_option_null_lot_size_defaults_to_one() {
1159 use nautilus_model::instruments::stubs::crypto_option_btc_deribit;
1160
1161 let inst = crypto_option_btc_deribit(3, 1, Price::from("0.001"), Quantity::from("0.1"));
1162 let any = InstrumentAny::CryptoOption(inst);
1163 let metadata = any.metadata();
1164 let batch = InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&any)).unwrap();
1165 let batch = batch_with_null_string_column(&batch, "lot_size");
1166
1167 let decoded = decode_instrument_any_batch(&metadata, &batch).unwrap();
1168
1169 let InstrumentAny::CryptoOption(decoded_inst) = &decoded[0] else {
1170 panic!("decoded variant is not CryptoOption");
1171 };
1172 assert_eq!(decoded_inst.lot_size, Quantity::from(1));
1173 }
1174
1175 #[rstest]
1176 fn test_roundtrip_crypto_futures_spread() {
1177 use nautilus_model::instruments::{Instrument, stubs::crypto_futures_spread_btc_deribit};
1178 let inst = crypto_futures_spread_btc_deribit();
1179 let any = InstrumentAny::CryptoFuturesSpread(inst.clone());
1180 roundtrip_case(&any);
1181 let metadata = any.metadata();
1182 let batch = InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&any)).unwrap();
1183 let decoded = decode_instrument_any_batch(&metadata, &batch).unwrap();
1184 let InstrumentAny::CryptoFuturesSpread(decoded_inst) = &decoded[0] else {
1185 panic!("decoded variant is not CryptoFuturesSpread");
1186 };
1187 assert_eq!(decoded_inst.lot_size, inst.lot_size);
1188 assert_eq!(decoded_inst.is_inverse, inst.is_inverse);
1189 assert_eq!(decoded_inst.strategy_type, inst.strategy_type);
1190 assert_eq!(decoded_inst.settlement_currency, inst.settlement_currency);
1191 assert_eq!(Instrument::id(decoded_inst), Instrument::id(&inst));
1192 }
1193
1194 #[rstest]
1195 fn test_roundtrip_crypto_option_spread() {
1196 use nautilus_model::instruments::{Instrument, stubs::crypto_option_spread_btc_deribit};
1197 let inst = crypto_option_spread_btc_deribit();
1198 let any = InstrumentAny::CryptoOptionSpread(inst.clone());
1199 roundtrip_case(&any);
1200 let metadata = any.metadata();
1201 let batch = InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&any)).unwrap();
1202 let decoded = decode_instrument_any_batch(&metadata, &batch).unwrap();
1203 let InstrumentAny::CryptoOptionSpread(decoded_inst) = &decoded[0] else {
1204 panic!("decoded variant is not CryptoOptionSpread");
1205 };
1206 assert_eq!(decoded_inst.lot_size, inst.lot_size);
1210 assert_eq!(decoded_inst.size_precision, inst.size_precision);
1211 assert_eq!(decoded_inst.size_increment, inst.size_increment);
1212 assert_eq!(decoded_inst.is_inverse, inst.is_inverse);
1213 assert_eq!(decoded_inst.strategy_type, inst.strategy_type);
1214 assert_eq!(decoded_inst.settlement_currency, inst.settlement_currency);
1215 assert_eq!(Instrument::id(decoded_inst), Instrument::id(&inst));
1216 }
1217
1218 #[rstest]
1219 fn test_roundtrip_crypto_perpetual_inverse() {
1220 use nautilus_model::instruments::stubs::xbtusd_bitmex;
1221 roundtrip_case(&InstrumentAny::CryptoPerpetual(xbtusd_bitmex()));
1222 }
1223
1224 #[rstest]
1225 fn test_roundtrip_crypto_perpetual_linear() {
1226 use nautilus_model::instruments::stubs::crypto_perpetual_ethusdt;
1227
1228 let mut inst = crypto_perpetual_ethusdt();
1229 inst.lot_size = Quantity::from("0.005");
1230 let any = InstrumentAny::CryptoPerpetual(inst.clone());
1231 roundtrip_case(&any);
1232 let metadata = any.metadata();
1233 let batch = InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&any)).unwrap();
1234 let decoded = decode_instrument_any_batch(&metadata, &batch).unwrap();
1235 let InstrumentAny::CryptoPerpetual(decoded_inst) = &decoded[0] else {
1236 panic!("decoded variant is not CryptoPerpetual");
1237 };
1238 assert_eq!(decoded_inst.lot_size, inst.lot_size);
1239 }
1240
1241 #[rstest]
1242 fn test_decode_crypto_perpetual_without_lot_size_column_defaults_to_one() {
1243 use nautilus_model::instruments::stubs::crypto_perpetual_ethusdt;
1244
1245 let inst = crypto_perpetual_ethusdt();
1246 let any = InstrumentAny::CryptoPerpetual(inst);
1247 let metadata = any.metadata();
1248 let batch = InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&any)).unwrap();
1249 let batch = batch_without_column(&batch, "lot_size");
1250
1251 let decoded = decode_instrument_any_batch(&metadata, &batch).unwrap();
1252
1253 let InstrumentAny::CryptoPerpetual(decoded_inst) = &decoded[0] else {
1254 panic!("decoded variant is not CryptoPerpetual");
1255 };
1256 assert_eq!(decoded_inst.lot_size, Quantity::from(1));
1257 }
1258
1259 #[rstest]
1260 fn test_decode_crypto_perpetual_null_lot_size_defaults_to_one() {
1261 use nautilus_model::instruments::stubs::crypto_perpetual_ethusdt;
1262
1263 let inst = crypto_perpetual_ethusdt();
1264 let any = InstrumentAny::CryptoPerpetual(inst);
1265 let metadata = any.metadata();
1266 let batch = InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&any)).unwrap();
1267 let batch = batch_with_null_string_column(&batch, "lot_size");
1268
1269 let decoded = decode_instrument_any_batch(&metadata, &batch).unwrap();
1270
1271 let InstrumentAny::CryptoPerpetual(decoded_inst) = &decoded[0] else {
1272 panic!("decoded variant is not CryptoPerpetual");
1273 };
1274 assert_eq!(decoded_inst.lot_size, Quantity::from(1));
1275 }
1276
1277 #[rstest]
1278 fn test_roundtrip_futures_contract() {
1279 use nautilus_model::instruments::stubs::futures_contract_es;
1280 roundtrip_case(&InstrumentAny::FuturesContract(futures_contract_es(
1281 None, None,
1282 )));
1283 }
1284
1285 #[rstest]
1286 fn test_roundtrip_futures_spread() {
1287 use nautilus_model::instruments::stubs::futures_spread_es;
1288 roundtrip_case(&InstrumentAny::FuturesSpread(futures_spread_es()));
1289 }
1290
1291 #[rstest]
1292 fn test_roundtrip_index_instrument() {
1293 use nautilus_model::instruments::stubs::index_instrument_spx;
1294 roundtrip_case(&InstrumentAny::IndexInstrument(index_instrument_spx()));
1295 }
1296
1297 #[rstest]
1298 fn test_roundtrip_option_contract() {
1299 use nautilus_model::instruments::stubs::option_contract_appl;
1300 roundtrip_case(&InstrumentAny::OptionContract(option_contract_appl()));
1301 }
1302
1303 #[rstest]
1304 fn test_roundtrip_option_spread() {
1305 use nautilus_model::instruments::stubs::option_spread;
1306 roundtrip_case(&InstrumentAny::OptionSpread(option_spread()));
1307 }
1308
1309 #[rstest]
1310 fn test_roundtrip_perpetual_contract() {
1311 use nautilus_model::instruments::stubs::perpetual_contract_eurusd;
1312 roundtrip_case(&InstrumentAny::PerpetualContract(
1313 perpetual_contract_eurusd(),
1314 ));
1315 }
1316
1317 #[rstest]
1318 fn test_roundtrip_tokenized_asset() {
1319 use nautilus_model::instruments::stubs::tokenized_asset_aaplx;
1320 roundtrip_case(&InstrumentAny::TokenizedAsset(tokenized_asset_aaplx()));
1321 }
1322}