Skip to main content

nautilus_serialization/arrow/instrument/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Arrow serialization for instruments.
17//!
18//! `InstrumentAny` acts as a dispatcher that routes to the appropriate concrete instrument type's
19//! Arrow serialization implementation. Each concrete instrument type implements its own schema
20//! with all fields as columns (wide schema approach), matching the Python implementation.
21
22use std::{any::type_name, collections::HashMap, fmt};
23
24use arrow::{datatypes::Schema, error::ArrowError, record_batch::RecordBatch};
25use nautilus_model::{
26    instruments::{
27        Instrument, InstrumentAny, betting::BettingInstrument, binary_option::BinaryOption,
28        cfd::Cfd, commodity::Commodity, crypto_future::CryptoFuture,
29        crypto_futures_spread::CryptoFuturesSpread, crypto_option::CryptoOption,
30        crypto_option_spread::CryptoOptionSpread, crypto_perpetual::CryptoPerpetual,
31        currency_pair::CurrencyPair, equity::Equity, futures_contract::FuturesContract,
32        futures_spread::FuturesSpread, index_instrument::IndexInstrument,
33        option_contract::OptionContract, option_spread::OptionSpread,
34        perpetual_contract::PerpetualContract, tokenized_asset::TokenizedAsset,
35    },
36    types::Currency,
37};
38
39#[allow(unused)]
40use crate::arrow::{
41    ArrowSchemaProvider, Data, DecodeDataFromRecordBatch, DecodeFromRecordBatch,
42    EncodeToRecordBatch, EncodingError, KEY_INSTRUMENT_ID,
43};
44
45pub mod betting;
46pub mod binary_option;
47pub mod cfd;
48pub mod commodity;
49pub mod crypto_future;
50pub mod crypto_futures_spread;
51pub mod crypto_option;
52pub mod crypto_option_spread;
53pub mod crypto_perpetual;
54pub mod currency_pair;
55pub mod equity;
56pub mod futures_contract;
57pub mod futures_spread;
58pub mod index_instrument;
59pub mod option_contract;
60pub mod option_spread;
61pub mod perpetual_contract;
62pub mod tokenized_asset;
63
64// Errors on empty/whitespace codes so corrupted rows surface as ParseError,
65// instead of silently registering as a fallback currency. Known codes resolve
66// from CURRENCY_MAP with original metadata; unknown non-empty codes fall back
67// to a new crypto currency to support newly listed exchange assets.
68pub(crate) fn decode_currency(
69    value: &str,
70    field: &'static str,
71    context: &'static str,
72    row: usize,
73) -> Result<Currency, EncodingError> {
74    let trimmed = value.trim();
75    if trimmed.is_empty() {
76        return Err(EncodingError::ParseError(
77            field,
78            format!("row {row}: empty currency code"),
79        ));
80    }
81
82    Ok(Currency::get_or_create_crypto_with_context(
83        trimmed,
84        Some(context),
85    ))
86}
87
88const INSTRUMENT_VALIDATION_FIELD: &str = "instrument";
89
90pub(crate) fn instrument_validation_error<T>(
91    row: usize,
92    error: impl fmt::Display,
93) -> EncodingError {
94    let type_name = type_name::<T>();
95    let instrument_type = type_name.rsplit("::").next().unwrap_or(type_name);
96
97    EncodingError::ParseError(
98        INSTRUMENT_VALIDATION_FIELD,
99        format!("row {row}: invalid {instrument_type}: {error}"),
100    )
101}
102
103impl ArrowSchemaProvider for InstrumentAny {
104    fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema {
105        let instrument_type = metadata
106            .as_ref()
107            .and_then(|m| m.get("class"))
108            .map_or("CurrencyPair", |s| s.as_str());
109
110        match instrument_type {
111            "BettingInstrument" => BettingInstrument::get_schema(metadata),
112            "BinaryOption" => BinaryOption::get_schema(metadata),
113            "Cfd" => Cfd::get_schema(metadata),
114            "Commodity" => Commodity::get_schema(metadata),
115            "CryptoFuture" => CryptoFuture::get_schema(metadata),
116            "CryptoFuturesSpread" => CryptoFuturesSpread::get_schema(metadata),
117            "CryptoOption" => CryptoOption::get_schema(metadata),
118            "CryptoOptionSpread" => CryptoOptionSpread::get_schema(metadata),
119            "CryptoPerpetual" => CryptoPerpetual::get_schema(metadata),
120            "CurrencyPair" => CurrencyPair::get_schema(metadata),
121            "Equity" => Equity::get_schema(metadata),
122            "FuturesContract" => FuturesContract::get_schema(metadata),
123            "FuturesSpread" => FuturesSpread::get_schema(metadata),
124            "IndexInstrument" => IndexInstrument::get_schema(metadata),
125            "OptionContract" => OptionContract::get_schema(metadata),
126            "OptionSpread" => OptionSpread::get_schema(metadata),
127            "PerpetualContract" => PerpetualContract::get_schema(metadata),
128            "TokenizedAsset" => TokenizedAsset::get_schema(metadata),
129            _ => {
130                // Fallback to CurrencyPair schema if type is unknown
131                CurrencyPair::get_schema(metadata)
132            }
133        }
134    }
135}
136
137impl EncodeToRecordBatch for InstrumentAny {
138    fn encode_batch(
139        #[allow(unused)] metadata: &HashMap<String, String>,
140        data: &[Self],
141    ) -> Result<RecordBatch, ArrowError> {
142        if data.is_empty() {
143            return Err(ArrowError::InvalidArgumentError(
144                "Cannot encode empty instrument batch".to_string(),
145            ));
146        }
147
148        let mut by_type: HashMap<String, Vec<&Self>> = HashMap::new();
149
150        for instrument in data {
151            let type_name = match instrument {
152                Self::Cfd(_) => "Cfd",
153                Self::Commodity(_) => "Commodity",
154                Self::CurrencyPair(_) => "CurrencyPair",
155                Self::Equity(_) => "Equity",
156                Self::CryptoFuture(_) => "CryptoFuture",
157                Self::CryptoFuturesSpread(_) => "CryptoFuturesSpread",
158                Self::CryptoPerpetual(_) => "CryptoPerpetual",
159                Self::CryptoOption(_) => "CryptoOption",
160                Self::CryptoOptionSpread(_) => "CryptoOptionSpread",
161                Self::FuturesContract(_) => "FuturesContract",
162                Self::FuturesSpread(_) => "FuturesSpread",
163                Self::IndexInstrument(_) => "IndexInstrument",
164                Self::OptionContract(_) => "OptionContract",
165                Self::OptionSpread(_) => "OptionSpread",
166                Self::BinaryOption(_) => "BinaryOption",
167                Self::Betting(_) => "BettingInstrument",
168                Self::PerpetualContract(_) => "PerpetualContract",
169                Self::TokenizedAsset(_) => "TokenizedAsset",
170            };
171            by_type
172                .entry(type_name.to_string())
173                .or_default()
174                .push(instrument);
175        }
176
177        if by_type.len() > 1 {
178            return Err(ArrowError::InvalidArgumentError(
179                "Cannot encode mixed instrument types in a single batch. Use separate batches for each type.".to_string(),
180            ));
181        }
182
183        let (type_name, instruments) = by_type.iter().next().unwrap();
184        match type_name.as_str() {
185            "Cfd" => {
186                let cfds: Vec<_> = instruments
187                    .iter()
188                    .map(|i| {
189                        if let Self::Cfd(c) = i {
190                            c
191                        } else {
192                            unreachable!()
193                        }
194                    })
195                    .cloned()
196                    .collect();
197                Cfd::encode_batch(metadata, &cfds)
198            }
199            "Commodity" => {
200                let commodities: Vec<_> = instruments
201                    .iter()
202                    .map(|i| {
203                        if let Self::Commodity(c) = i {
204                            c
205                        } else {
206                            unreachable!()
207                        }
208                    })
209                    .cloned()
210                    .collect();
211                Commodity::encode_batch(metadata, &commodities)
212            }
213            "BettingInstrument" => {
214                let betting: Vec<_> = instruments
215                    .iter()
216                    .map(|i| {
217                        if let Self::Betting(b) = i {
218                            b
219                        } else {
220                            unreachable!()
221                        }
222                    })
223                    .cloned()
224                    .collect();
225                BettingInstrument::encode_batch(metadata, &betting)
226            }
227            "BinaryOption" => {
228                let binary_options: Vec<_> = instruments
229                    .iter()
230                    .map(|i| {
231                        if let Self::BinaryOption(bo) = i {
232                            bo
233                        } else {
234                            unreachable!()
235                        }
236                    })
237                    .cloned()
238                    .collect();
239                BinaryOption::encode_batch(metadata, &binary_options)
240            }
241            "CryptoFuture" => {
242                let crypto_futures: Vec<_> = instruments
243                    .iter()
244                    .map(|i| {
245                        if let Self::CryptoFuture(cf) = i {
246                            cf
247                        } else {
248                            unreachable!()
249                        }
250                    })
251                    .cloned()
252                    .collect();
253                CryptoFuture::encode_batch(metadata, &crypto_futures)
254            }
255            "CryptoFuturesSpread" => {
256                let spreads: Vec<_> = instruments
257                    .iter()
258                    .map(|i| {
259                        if let Self::CryptoFuturesSpread(cfs) = i {
260                            cfs
261                        } else {
262                            unreachable!()
263                        }
264                    })
265                    .cloned()
266                    .collect();
267                CryptoFuturesSpread::encode_batch(metadata, &spreads)
268            }
269            "CryptoOption" => {
270                let crypto_options: Vec<_> = instruments
271                    .iter()
272                    .map(|i| {
273                        if let Self::CryptoOption(co) = i {
274                            co
275                        } else {
276                            unreachable!()
277                        }
278                    })
279                    .cloned()
280                    .collect();
281                CryptoOption::encode_batch(metadata, &crypto_options)
282            }
283            "CryptoOptionSpread" => {
284                let spreads: Vec<_> = instruments
285                    .iter()
286                    .map(|i| {
287                        if let Self::CryptoOptionSpread(cos) = i {
288                            cos
289                        } else {
290                            unreachable!()
291                        }
292                    })
293                    .cloned()
294                    .collect();
295                CryptoOptionSpread::encode_batch(metadata, &spreads)
296            }
297            "CryptoPerpetual" => {
298                let crypto_perps: Vec<_> = instruments
299                    .iter()
300                    .map(|i| {
301                        if let Self::CryptoPerpetual(cp) = i {
302                            cp
303                        } else {
304                            unreachable!()
305                        }
306                    })
307                    .cloned()
308                    .collect();
309                CryptoPerpetual::encode_batch(metadata, &crypto_perps)
310            }
311            "CurrencyPair" => {
312                let currency_pairs: Vec<_> = instruments
313                    .iter()
314                    .map(|i| {
315                        if let Self::CurrencyPair(cp) = i {
316                            cp
317                        } else {
318                            unreachable!()
319                        }
320                    })
321                    .cloned()
322                    .collect();
323                CurrencyPair::encode_batch(metadata, &currency_pairs)
324            }
325            "Equity" => {
326                let equities: Vec<_> = instruments
327                    .iter()
328                    .map(|i| {
329                        if let Self::Equity(e) = i {
330                            e
331                        } else {
332                            unreachable!()
333                        }
334                    })
335                    .cloned()
336                    .collect();
337                Equity::encode_batch(metadata, &equities)
338            }
339            "FuturesContract" => {
340                let futures_contracts: Vec<_> = instruments
341                    .iter()
342                    .map(|i| {
343                        if let Self::FuturesContract(fc) = i {
344                            fc
345                        } else {
346                            unreachable!()
347                        }
348                    })
349                    .cloned()
350                    .collect();
351                FuturesContract::encode_batch(metadata, &futures_contracts)
352            }
353            "FuturesSpread" => {
354                let futures_spreads: Vec<_> = instruments
355                    .iter()
356                    .map(|i| {
357                        if let Self::FuturesSpread(fs) = i {
358                            fs
359                        } else {
360                            unreachable!()
361                        }
362                    })
363                    .cloned()
364                    .collect();
365                FuturesSpread::encode_batch(metadata, &futures_spreads)
366            }
367            "IndexInstrument" => {
368                let index_instruments: Vec<_> = instruments
369                    .iter()
370                    .map(|i| {
371                        if let Self::IndexInstrument(ii) = i {
372                            ii
373                        } else {
374                            unreachable!()
375                        }
376                    })
377                    .cloned()
378                    .collect();
379                IndexInstrument::encode_batch(metadata, &index_instruments)
380            }
381            "OptionContract" => {
382                let option_contracts: Vec<_> = instruments
383                    .iter()
384                    .map(|i| {
385                        if let Self::OptionContract(oc) = i {
386                            oc
387                        } else {
388                            unreachable!()
389                        }
390                    })
391                    .cloned()
392                    .collect();
393                OptionContract::encode_batch(metadata, &option_contracts)
394            }
395            "OptionSpread" => {
396                let option_spreads: Vec<_> = instruments
397                    .iter()
398                    .map(|i| {
399                        if let Self::OptionSpread(os) = i {
400                            os
401                        } else {
402                            unreachable!()
403                        }
404                    })
405                    .cloned()
406                    .collect();
407                OptionSpread::encode_batch(metadata, &option_spreads)
408            }
409            "PerpetualContract" => {
410                let perpetual_contracts: Vec<_> = instruments
411                    .iter()
412                    .map(|i| {
413                        if let Self::PerpetualContract(pc) = i {
414                            pc
415                        } else {
416                            unreachable!()
417                        }
418                    })
419                    .cloned()
420                    .collect();
421                PerpetualContract::encode_batch(metadata, &perpetual_contracts)
422            }
423            "TokenizedAsset" => {
424                let tokenized_assets: Vec<_> = instruments
425                    .iter()
426                    .map(|i| {
427                        if let Self::TokenizedAsset(ta) = i {
428                            ta
429                        } else {
430                            unreachable!()
431                        }
432                    })
433                    .cloned()
434                    .collect();
435                TokenizedAsset::encode_batch(metadata, &tokenized_assets)
436            }
437            _ => Err(ArrowError::InvalidArgumentError(format!(
438                "Instrument type {type_name} serialization not yet implemented"
439            ))),
440        }
441    }
442
443    fn metadata(&self) -> HashMap<String, String> {
444        let mut metadata = HashMap::new();
445        metadata.insert(
446            KEY_INSTRUMENT_ID.to_string(),
447            Instrument::id(self).to_string(),
448        );
449
450        let type_name = match self {
451            Self::Cfd(_) => "Cfd",
452            Self::Commodity(_) => "Commodity",
453            Self::CurrencyPair(_) => "CurrencyPair",
454            Self::Equity(_) => "Equity",
455            Self::CryptoFuture(_) => "CryptoFuture",
456            Self::CryptoFuturesSpread(_) => "CryptoFuturesSpread",
457            Self::CryptoPerpetual(_) => "CryptoPerpetual",
458            Self::CryptoOption(_) => "CryptoOption",
459            Self::CryptoOptionSpread(_) => "CryptoOptionSpread",
460            Self::FuturesContract(_) => "FuturesContract",
461            Self::FuturesSpread(_) => "FuturesSpread",
462            Self::IndexInstrument(_) => "IndexInstrument",
463            Self::OptionContract(_) => "OptionContract",
464            Self::OptionSpread(_) => "OptionSpread",
465            Self::BinaryOption(_) => "BinaryOption",
466            Self::Betting(_) => "BettingInstrument",
467            Self::PerpetualContract(_) => "PerpetualContract",
468            Self::TokenizedAsset(_) => "TokenizedAsset",
469        };
470        metadata.insert("class".to_string(), type_name.to_string());
471        metadata
472    }
473}
474
475/// Decode InstrumentAny from RecordBatch
476/// (Cannot implement DecodeFromRecordBatch trait due to `Into<Data>` bound)
477///
478/// # Errors
479///
480/// Returns an `EncodingError` if the RecordBatch cannot be decoded.
481pub fn decode_instrument_any_batch(
482    #[allow(unused)] metadata: &HashMap<String, String>,
483    record_batch: &RecordBatch,
484) -> Result<Vec<InstrumentAny>, EncodingError> {
485    let type_name = metadata
486        .get("class")
487        .map(String::as_str)
488        .ok_or_else(|| EncodingError::MissingMetadata("class"))?;
489
490    match type_name {
491        "Cfd" => {
492            let cfds = cfd::decode_cfd_batch(metadata, record_batch)?;
493            Ok(cfds.into_iter().map(InstrumentAny::Cfd).collect())
494        }
495        "Commodity" => {
496            let commodities = commodity::decode_commodity_batch(metadata, record_batch)?;
497            Ok(commodities
498                .into_iter()
499                .map(InstrumentAny::Commodity)
500                .collect())
501        }
502        "BettingInstrument" => {
503            let betting = betting::decode_betting_instrument_batch(metadata, record_batch)?;
504            Ok(betting.into_iter().map(InstrumentAny::Betting).collect())
505        }
506        "BinaryOption" => {
507            let binary_options = binary_option::decode_binary_option_batch(metadata, record_batch)?;
508            Ok(binary_options
509                .into_iter()
510                .map(InstrumentAny::BinaryOption)
511                .collect())
512        }
513        "CryptoFuture" => {
514            let crypto_futures = crypto_future::decode_crypto_future_batch(metadata, record_batch)?;
515            Ok(crypto_futures
516                .into_iter()
517                .map(InstrumentAny::CryptoFuture)
518                .collect())
519        }
520        "CryptoFuturesSpread" => {
521            let spreads =
522                crypto_futures_spread::decode_crypto_futures_spread_batch(metadata, record_batch)?;
523            Ok(spreads
524                .into_iter()
525                .map(InstrumentAny::CryptoFuturesSpread)
526                .collect())
527        }
528        "CryptoOption" => {
529            let crypto_options = crypto_option::decode_crypto_option_batch(metadata, record_batch)?;
530            Ok(crypto_options
531                .into_iter()
532                .map(InstrumentAny::CryptoOption)
533                .collect())
534        }
535        "CryptoOptionSpread" => {
536            let spreads =
537                crypto_option_spread::decode_crypto_option_spread_batch(metadata, record_batch)?;
538            Ok(spreads
539                .into_iter()
540                .map(InstrumentAny::CryptoOptionSpread)
541                .collect())
542        }
543        "CryptoPerpetual" => {
544            let crypto_perps =
545                crypto_perpetual::decode_crypto_perpetual_batch(metadata, record_batch)?;
546            Ok(crypto_perps
547                .into_iter()
548                .map(InstrumentAny::CryptoPerpetual)
549                .collect())
550        }
551        "CurrencyPair" => {
552            let currency_pairs = currency_pair::decode_currency_pair_batch(metadata, record_batch)?;
553            Ok(currency_pairs
554                .into_iter()
555                .map(InstrumentAny::CurrencyPair)
556                .collect())
557        }
558        "Equity" => {
559            let equities = equity::decode_equity_batch(metadata, record_batch)?;
560            Ok(equities.into_iter().map(InstrumentAny::Equity).collect())
561        }
562        "FuturesContract" => {
563            let futures_contracts =
564                futures_contract::decode_futures_contract_batch(metadata, record_batch)?;
565            Ok(futures_contracts
566                .into_iter()
567                .map(InstrumentAny::FuturesContract)
568                .collect())
569        }
570        "FuturesSpread" => {
571            let futures_spreads =
572                futures_spread::decode_futures_spread_batch(metadata, record_batch)?;
573            Ok(futures_spreads
574                .into_iter()
575                .map(InstrumentAny::FuturesSpread)
576                .collect())
577        }
578        "IndexInstrument" => {
579            let index_instruments =
580                index_instrument::decode_index_instrument_batch(metadata, record_batch)?;
581            Ok(index_instruments
582                .into_iter()
583                .map(InstrumentAny::IndexInstrument)
584                .collect())
585        }
586        "OptionContract" => {
587            let option_contracts =
588                option_contract::decode_option_contract_batch(metadata, record_batch)?;
589            Ok(option_contracts
590                .into_iter()
591                .map(InstrumentAny::OptionContract)
592                .collect())
593        }
594        "OptionSpread" => {
595            let option_spreads = option_spread::decode_option_spread_batch(metadata, record_batch)?;
596            Ok(option_spreads
597                .into_iter()
598                .map(InstrumentAny::OptionSpread)
599                .collect())
600        }
601        "PerpetualContract" => {
602            let perpetual_contracts =
603                perpetual_contract::decode_perpetual_contract_batch(metadata, record_batch)?;
604            Ok(perpetual_contracts
605                .into_iter()
606                .map(InstrumentAny::PerpetualContract)
607                .collect())
608        }
609        "TokenizedAsset" => {
610            let tokenized_assets =
611                tokenized_asset::decode_tokenized_asset_batch(metadata, record_batch)?;
612            Ok(tokenized_assets
613                .into_iter()
614                .map(InstrumentAny::TokenizedAsset)
615                .collect())
616        }
617        _ => Err(EncodingError::ParseError(
618            "class",
619            format!("Unknown instrument type: {type_name}"),
620        )),
621    }
622}
623
624#[cfg(test)]
625mod tests {
626    use std::sync::Arc;
627
628    use arrow::array::{ArrayRef, StringArray, UInt8Array};
629    use nautilus_core::UnixNanos;
630    use nautilus_model::{
631        enums::CurrencyType,
632        identifiers::{InstrumentId, Symbol},
633        instruments::{Instrument, InstrumentAny, currency_pair::CurrencyPair},
634        types::{Currency, Price, Quantity},
635    };
636    use rstest::rstest;
637    use ustr::Ustr;
638
639    use super::*;
640
641    #[rstest]
642    fn test_get_schema() {
643        let mut metadata = HashMap::new();
644        metadata.insert("class".to_string(), "CurrencyPair".to_string());
645        let schema = InstrumentAny::get_schema(Some(metadata));
646        assert!(schema.fields().len() >= 20);
647        assert_eq!(schema.field(0).name(), "id");
648    }
649
650    #[rstest]
651    #[case("")]
652    #[case("   ")]
653    #[case("\t\n")]
654    fn test_decode_currency_empty_or_whitespace_errors(#[case] value: &str) {
655        let result = decode_currency(value, "currency", "test.currency", 7);
656        let err = result.expect_err("empty code must surface EncodingError");
657        match err {
658            EncodingError::ParseError(field, msg) => {
659                assert_eq!(field, "currency");
660                assert!(
661                    msg.contains("row 7"),
662                    "message should include row index, found: {msg}",
663                );
664                assert!(
665                    msg.contains("empty currency code"),
666                    "message should describe empty code, found: {msg}",
667                );
668            }
669            other => panic!("unexpected error variant: {other:?}"),
670        }
671        // Ensure the fallback did not register a phantom currency under the empty key.
672        assert!(Currency::try_from_str(value.trim()).is_none());
673    }
674
675    #[rstest]
676    #[case("USD", CurrencyType::Fiat, 2)]
677    #[case("BTC", CurrencyType::Crypto, 8)]
678    #[case("XAU", CurrencyType::CommodityBacked, 2)]
679    fn test_decode_currency_known_code_preserves_metadata(
680        #[case] code: &str,
681        #[case] expected_type: CurrencyType,
682        #[case] expected_precision: u8,
683    ) {
684        let currency = decode_currency(code, "currency", "test.currency", 0).unwrap();
685        assert_eq!(currency.code.as_str(), code);
686        assert_eq!(currency.currency_type, expected_type);
687        assert_eq!(currency.precision, expected_precision);
688    }
689
690    #[rstest]
691    fn test_decode_currency_unknown_code_registers_as_crypto() {
692        let code = "XDECTEST";
693        assert!(
694            Currency::try_from_str(code).is_none(),
695            "test precondition: '{code}' must not be pre-registered",
696        );
697
698        let currency = decode_currency(code, "base_currency", "test.base_currency", 0).unwrap();
699        assert_eq!(currency.code.as_str(), code);
700        assert_eq!(currency.currency_type, CurrencyType::Crypto);
701        assert_eq!(currency.precision, 8);
702        assert_eq!(currency.iso4217, 0);
703
704        let registered = Currency::try_from_str(code).expect("unknown code must be registered");
705        assert_eq!(registered, currency);
706    }
707
708    #[rstest]
709    fn test_encode_decode_round_trip() {
710        let instrument_id = InstrumentId::from("EUR/USD.SIM");
711        let currency_pair = CurrencyPair::new(
712            instrument_id,
713            Symbol::from("EUR/USD"),
714            Currency::from("EUR"),
715            Currency::from("USD"),
716            5,
717            0, // size_precision must match size_increment precision (0)
718            Price::new(0.00001, 5),
719            Quantity::new(1.0, 0), // precision 0
720            None,                  // multiplier
721            None,                  // lot_size
722            None,                  // max_quantity
723            None,                  // min_quantity
724            None,                  // max_notional
725            None,                  // min_notional
726            None,                  // max_price
727            None,                  // min_price
728            None,                  // margin_init
729            None,                  // margin_maint
730            None,                  // maker_fee
731            None,                  // taker_fee
732            Some(Ustr::from("FOREX_5DECIMAL")),
733            None, // info
734            UnixNanos::default(),
735            UnixNanos::default(),
736        );
737        let instrument = InstrumentAny::CurrencyPair(currency_pair);
738
739        let metadata = instrument.metadata();
740        let record_batch =
741            InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&instrument)).unwrap();
742        let decoded = decode_instrument_any_batch(&metadata, &record_batch).unwrap();
743
744        assert_eq!(decoded.len(), 1);
745        assert_eq!(Instrument::id(&decoded[0]), Instrument::id(&instrument));
746        assert_eq!(
747            Instrument::raw_symbol(&decoded[0]),
748            Instrument::raw_symbol(&instrument)
749        );
750        assert_eq!(
751            Instrument::asset_class(&decoded[0]),
752            Instrument::asset_class(&instrument)
753        );
754
755        match (&decoded[0], &instrument) {
756            (InstrumentAny::CurrencyPair(decoded_cp), InstrumentAny::CurrencyPair(original_cp)) => {
757                assert_eq!(decoded_cp.id, original_cp.id);
758                assert_eq!(decoded_cp.base_currency, original_cp.base_currency);
759                assert_eq!(decoded_cp.quote_currency, original_cp.quote_currency);
760                assert_eq!(decoded_cp.price_precision, original_cp.price_precision);
761                assert_eq!(decoded_cp.size_precision, original_cp.size_precision);
762                assert_eq!(decoded_cp.tick_scheme, original_cp.tick_scheme);
763            }
764            _ => panic!("Decoded instrument type mismatch"),
765        }
766    }
767
768    #[rstest]
769    fn test_decode_currency_pair_without_tick_scheme_column_defaults_none() {
770        let instrument_id = InstrumentId::from("EUR/USD.SIM");
771        let currency_pair = CurrencyPair::new(
772            instrument_id,
773            Symbol::from("EUR/USD"),
774            Currency::from("EUR"),
775            Currency::from("USD"),
776            5,
777            0,
778            Price::new(0.00001, 5),
779            Quantity::new(1.0, 0),
780            None, // multiplier
781            None, // lot_size
782            None, // max_quantity
783            None, // min_quantity
784            None, // max_notional
785            None, // min_notional
786            None, // max_price
787            None, // min_price
788            None, // margin_init
789            None, // margin_maint
790            None, // maker_fee
791            None, // taker_fee
792            Some(Ustr::from("FOREX_5DECIMAL")),
793            None, // info
794            UnixNanos::default(),
795            UnixNanos::default(),
796        );
797        let instrument = InstrumentAny::CurrencyPair(currency_pair);
798
799        let metadata = instrument.metadata();
800        let record_batch =
801            InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&instrument)).unwrap();
802        let record_batch = batch_without_column(&record_batch, "tick_scheme");
803        let decoded = decode_instrument_any_batch(&metadata, &record_batch).unwrap();
804
805        assert_eq!(decoded.len(), 1);
806        match &decoded[0] {
807            InstrumentAny::CurrencyPair(decoded_cp) => {
808                assert_eq!(decoded_cp.id, instrument.id());
809                assert_eq!(decoded_cp.tick_scheme, None);
810            }
811            _ => panic!("Decoded instrument type mismatch"),
812        }
813    }
814
815    #[rstest]
816    fn test_encode_decode_round_trip_equity() {
817        use nautilus_model::instruments::{Instrument, equity::Equity};
818
819        let instrument_id = InstrumentId::from("AAPL.NASDAQ");
820        let equity = Equity::new(
821            instrument_id,
822            Symbol::from("AAPL"),
823            None, // isin
824            Currency::from("USD"),
825            2,
826            Price::new(0.01, 2),
827            None, // lot_size
828            None, // max_quantity
829            None, // min_quantity
830            None, // max_price
831            None, // min_price
832            None, // margin_init
833            None, // margin_maint
834            None, // maker_fee
835            None, // taker_fee
836            None, // tick_scheme
837            None, // info
838            UnixNanos::default(),
839            UnixNanos::default(),
840        );
841        let instrument = InstrumentAny::Equity(equity);
842
843        let metadata = instrument.metadata();
844        let record_batch =
845            InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&instrument)).unwrap();
846        let decoded = decode_instrument_any_batch(&metadata, &record_batch).unwrap();
847        assert_eq!(decoded.len(), 1);
848        assert_eq!(Instrument::id(&decoded[0]), Instrument::id(&instrument));
849        assert_eq!(
850            Instrument::raw_symbol(&decoded[0]),
851            Instrument::raw_symbol(&instrument)
852        );
853        assert_eq!(
854            Instrument::asset_class(&decoded[0]),
855            Instrument::asset_class(&instrument)
856        );
857
858        match (&decoded[0], &instrument) {
859            (InstrumentAny::Equity(decoded_eq), InstrumentAny::Equity(original_eq)) => {
860                assert_eq!(decoded_eq.id, original_eq.id);
861                assert_eq!(decoded_eq.currency, original_eq.currency);
862                assert_eq!(decoded_eq.price_precision, original_eq.price_precision);
863            }
864            _ => panic!("Decoded instrument type mismatch"),
865        }
866    }
867
868    fn roundtrip_case(instrument: &InstrumentAny) {
869        let metadata = instrument.metadata();
870        let record_batch =
871            InstrumentAny::encode_batch(&metadata, std::slice::from_ref(instrument)).unwrap();
872        let decoded = decode_instrument_any_batch(&metadata, &record_batch).unwrap();
873
874        assert_eq!(decoded.len(), 1);
875        assert_eq!(Instrument::id(&decoded[0]), Instrument::id(instrument));
876        assert_eq!(
877            Instrument::raw_symbol(&decoded[0]),
878            Instrument::raw_symbol(instrument)
879        );
880        assert_eq!(
881            Instrument::asset_class(&decoded[0]),
882            Instrument::asset_class(instrument)
883        );
884        assert_eq!(
885            Instrument::instrument_class(&decoded[0]),
886            Instrument::instrument_class(instrument)
887        );
888        assert_eq!(
889            Instrument::price_precision(&decoded[0]),
890            Instrument::price_precision(instrument)
891        );
892        assert_eq!(
893            Instrument::size_precision(&decoded[0]),
894            Instrument::size_precision(instrument)
895        );
896        assert_eq!(
897            Instrument::quote_currency(&decoded[0]),
898            Instrument::quote_currency(instrument)
899        );
900        assert_eq!(
901            std::mem::discriminant(&decoded[0]),
902            std::mem::discriminant(instrument),
903            "decoded variant must match encoded variant"
904        );
905    }
906
907    fn batch_without_column(record_batch: &RecordBatch, column_name: &str) -> RecordBatch {
908        let schema = record_batch.schema();
909        let column_index = schema.index_of(column_name).unwrap();
910        let fields: Vec<_> = schema
911            .fields()
912            .iter()
913            .enumerate()
914            .filter(|(index, _)| *index != column_index)
915            .map(|(_, field)| field.as_ref().clone())
916            .collect();
917        let columns = record_batch
918            .columns()
919            .iter()
920            .enumerate()
921            .filter(|(index, _)| *index != column_index)
922            .map(|(_, column)| Arc::clone(column))
923            .collect();
924        let new_schema = Schema::new_with_metadata(fields, schema.metadata().clone());
925
926        RecordBatch::try_new(Arc::new(new_schema), columns).unwrap()
927    }
928
929    fn batch_with_null_string_column(record_batch: &RecordBatch, column_name: &str) -> RecordBatch {
930        let schema = record_batch.schema();
931        let column_index = schema.index_of(column_name).unwrap();
932        let mut columns = record_batch.columns().to_vec();
933        let null_column: ArrayRef = Arc::new(StringArray::from(vec![None::<&str>]));
934        columns[column_index] = null_column;
935
936        RecordBatch::try_new(schema, columns).unwrap()
937    }
938
939    fn batch_with_uint8_column(
940        record_batch: &RecordBatch,
941        column_name: &str,
942        values: Vec<u8>,
943    ) -> RecordBatch {
944        let schema = record_batch.schema();
945        let column_index = schema.index_of(column_name).unwrap();
946        let mut columns = record_batch.columns().to_vec();
947        columns[column_index] = Arc::new(UInt8Array::from(values));
948
949        RecordBatch::try_new(schema, columns).unwrap()
950    }
951
952    #[rstest]
953    #[case::binary_option(InstrumentAny::BinaryOption(
954        nautilus_model::instruments::stubs::binary_option()
955    ))]
956    #[case::cfd(InstrumentAny::Cfd(nautilus_model::instruments::stubs::cfd_gold()))]
957    #[case::commodity(InstrumentAny::Commodity(
958        nautilus_model::instruments::stubs::commodity_gold()
959    ))]
960    #[case::crypto_future(InstrumentAny::CryptoFuture(
961        nautilus_model::instruments::stubs::crypto_future_btcusdt(
962            2,
963            6,
964            Price::from("0.01"),
965            Quantity::from("0.000001"),
966        )
967    ))]
968    #[case::crypto_futures_spread(InstrumentAny::CryptoFuturesSpread(
969        nautilus_model::instruments::stubs::crypto_futures_spread_btc_deribit()
970    ))]
971    #[case::crypto_option(InstrumentAny::CryptoOption(
972        nautilus_model::instruments::stubs::crypto_option_btc_deribit(
973            3,
974            1,
975            Price::from("0.001"),
976            Quantity::from("0.1"),
977        )
978    ))]
979    #[case::crypto_option_spread(InstrumentAny::CryptoOptionSpread(
980        nautilus_model::instruments::stubs::crypto_option_spread_btc_deribit()
981    ))]
982    #[case::crypto_perpetual(InstrumentAny::CryptoPerpetual(
983        nautilus_model::instruments::stubs::crypto_perpetual_ethusdt()
984    ))]
985    #[case::currency_pair(InstrumentAny::CurrencyPair(
986        nautilus_model::instruments::stubs::currency_pair_btcusdt()
987    ))]
988    #[case::equity(InstrumentAny::Equity(nautilus_model::instruments::stubs::equity_aapl()))]
989    #[case::futures_contract(InstrumentAny::FuturesContract(
990        nautilus_model::instruments::stubs::futures_contract_es(None, None,)
991    ))]
992    #[case::futures_spread(InstrumentAny::FuturesSpread(
993        nautilus_model::instruments::stubs::futures_spread_es()
994    ))]
995    #[case::index_instrument(InstrumentAny::IndexInstrument(
996        nautilus_model::instruments::stubs::index_instrument_spx()
997    ))]
998    #[case::option_contract(InstrumentAny::OptionContract(
999        nautilus_model::instruments::stubs::option_contract_appl()
1000    ))]
1001    #[case::option_spread(InstrumentAny::OptionSpread(
1002        nautilus_model::instruments::stubs::option_spread()
1003    ))]
1004    #[case::perpetual_contract(InstrumentAny::PerpetualContract(
1005        nautilus_model::instruments::stubs::perpetual_contract_eurusd()
1006    ))]
1007    #[case::tokenized_asset(InstrumentAny::TokenizedAsset(
1008        nautilus_model::instruments::stubs::tokenized_asset_aaplx()
1009    ))]
1010    fn test_decode_instrument_checked_constructor_error(#[case] instrument: InstrumentAny) {
1011        let metadata = instrument.metadata();
1012        let class = metadata.get("class").unwrap();
1013        let first_row_price_precision = Instrument::price_precision(&instrument);
1014        let instruments = vec![instrument.clone(), instrument];
1015        let record_batch = InstrumentAny::encode_batch(&metadata, &instruments).unwrap();
1016        let record_batch = batch_with_uint8_column(
1017            &record_batch,
1018            "price_precision",
1019            vec![first_row_price_precision, u8::MAX],
1020        );
1021
1022        let error = decode_instrument_any_batch(&metadata, &record_batch)
1023            .expect_err("invalid precision must return EncodingError");
1024
1025        match error {
1026            EncodingError::ParseError(field, message) => {
1027                assert_eq!(field, INSTRUMENT_VALIDATION_FIELD);
1028                assert!(
1029                    message.contains(class),
1030                    "message should include instrument class, found: {message}",
1031                );
1032                assert!(
1033                    message.starts_with("row 1:"),
1034                    "message should include row index, found: {message}",
1035                );
1036                assert!(
1037                    message.contains("price_precision"),
1038                    "message should include failed precision, found: {message}",
1039                );
1040            }
1041            other => panic!("unexpected error variant: {other:?}"),
1042        }
1043    }
1044
1045    #[rstest]
1046    fn test_roundtrip_betting() {
1047        use nautilus_model::instruments::stubs::betting;
1048        roundtrip_case(&InstrumentAny::Betting(betting()));
1049    }
1050
1051    #[rstest]
1052    fn test_roundtrip_binary_option() {
1053        use nautilus_model::instruments::stubs::binary_option;
1054        roundtrip_case(&InstrumentAny::BinaryOption(binary_option()));
1055    }
1056
1057    #[rstest]
1058    fn test_roundtrip_cfd() {
1059        use nautilus_model::instruments::stubs::cfd_gold;
1060        roundtrip_case(&InstrumentAny::Cfd(cfd_gold()));
1061    }
1062
1063    #[rstest]
1064    fn test_roundtrip_commodity() {
1065        use nautilus_model::instruments::stubs::commodity_gold;
1066        roundtrip_case(&InstrumentAny::Commodity(commodity_gold()));
1067    }
1068
1069    #[rstest]
1070    fn test_roundtrip_crypto_future() {
1071        use nautilus_model::instruments::stubs::crypto_future_btcusdt;
1072
1073        let mut inst = crypto_future_btcusdt(2, 6, Price::from("0.01"), Quantity::from("0.000001"));
1074        inst.lot_size = Quantity::from("0.25");
1075        let any = InstrumentAny::CryptoFuture(inst.clone());
1076        roundtrip_case(&any);
1077        let metadata = any.metadata();
1078        let batch = InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&any)).unwrap();
1079        let decoded = decode_instrument_any_batch(&metadata, &batch).unwrap();
1080        let InstrumentAny::CryptoFuture(decoded_inst) = &decoded[0] else {
1081            panic!("decoded variant is not CryptoFuture");
1082        };
1083        assert_eq!(decoded_inst.lot_size, inst.lot_size);
1084    }
1085
1086    #[rstest]
1087    fn test_decode_crypto_future_without_lot_size_column_defaults_to_one() {
1088        use nautilus_model::instruments::stubs::crypto_future_btcusdt;
1089
1090        let inst = crypto_future_btcusdt(2, 6, Price::from("0.01"), Quantity::from("0.000001"));
1091        let any = InstrumentAny::CryptoFuture(inst);
1092        let metadata = any.metadata();
1093        let batch = InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&any)).unwrap();
1094        let batch = batch_without_column(&batch, "lot_size");
1095
1096        let decoded = decode_instrument_any_batch(&metadata, &batch).unwrap();
1097
1098        let InstrumentAny::CryptoFuture(decoded_inst) = &decoded[0] else {
1099            panic!("decoded variant is not CryptoFuture");
1100        };
1101        assert_eq!(decoded_inst.lot_size, Quantity::from(1));
1102    }
1103
1104    #[rstest]
1105    fn test_decode_crypto_future_null_lot_size_defaults_to_one() {
1106        use nautilus_model::instruments::stubs::crypto_future_btcusdt;
1107
1108        let inst = crypto_future_btcusdt(2, 6, Price::from("0.01"), Quantity::from("0.000001"));
1109        let any = InstrumentAny::CryptoFuture(inst);
1110        let metadata = any.metadata();
1111        let batch = InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&any)).unwrap();
1112        let batch = batch_with_null_string_column(&batch, "lot_size");
1113
1114        let decoded = decode_instrument_any_batch(&metadata, &batch).unwrap();
1115
1116        let InstrumentAny::CryptoFuture(decoded_inst) = &decoded[0] else {
1117            panic!("decoded variant is not CryptoFuture");
1118        };
1119        assert_eq!(decoded_inst.lot_size, Quantity::from(1));
1120    }
1121
1122    #[rstest]
1123    fn test_roundtrip_crypto_option() {
1124        use nautilus_model::instruments::stubs::crypto_option_btc_deribit;
1125
1126        let mut inst = crypto_option_btc_deribit(3, 1, Price::from("0.001"), Quantity::from("0.1"));
1127        inst.lot_size = Quantity::from("0.5");
1128        let any = InstrumentAny::CryptoOption(inst.clone());
1129        roundtrip_case(&any);
1130        let metadata = any.metadata();
1131        let batch = InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&any)).unwrap();
1132        let decoded = decode_instrument_any_batch(&metadata, &batch).unwrap();
1133        let InstrumentAny::CryptoOption(decoded_inst) = &decoded[0] else {
1134            panic!("decoded variant is not CryptoOption");
1135        };
1136        assert_eq!(decoded_inst.lot_size, inst.lot_size);
1137    }
1138
1139    #[rstest]
1140    fn test_decode_crypto_option_without_lot_size_column_defaults_to_one() {
1141        use nautilus_model::instruments::stubs::crypto_option_btc_deribit;
1142
1143        let inst = crypto_option_btc_deribit(3, 1, Price::from("0.001"), Quantity::from("0.1"));
1144        let any = InstrumentAny::CryptoOption(inst);
1145        let metadata = any.metadata();
1146        let batch = InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&any)).unwrap();
1147        let batch = batch_without_column(&batch, "lot_size");
1148
1149        let decoded = decode_instrument_any_batch(&metadata, &batch).unwrap();
1150
1151        let InstrumentAny::CryptoOption(decoded_inst) = &decoded[0] else {
1152            panic!("decoded variant is not CryptoOption");
1153        };
1154        assert_eq!(decoded_inst.lot_size, Quantity::from(1));
1155    }
1156
1157    #[rstest]
1158    fn test_decode_crypto_option_null_lot_size_defaults_to_one() {
1159        use nautilus_model::instruments::stubs::crypto_option_btc_deribit;
1160
1161        let inst = crypto_option_btc_deribit(3, 1, Price::from("0.001"), Quantity::from("0.1"));
1162        let any = InstrumentAny::CryptoOption(inst);
1163        let metadata = any.metadata();
1164        let batch = InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&any)).unwrap();
1165        let batch = batch_with_null_string_column(&batch, "lot_size");
1166
1167        let decoded = decode_instrument_any_batch(&metadata, &batch).unwrap();
1168
1169        let InstrumentAny::CryptoOption(decoded_inst) = &decoded[0] else {
1170            panic!("decoded variant is not CryptoOption");
1171        };
1172        assert_eq!(decoded_inst.lot_size, Quantity::from(1));
1173    }
1174
1175    #[rstest]
1176    fn test_roundtrip_crypto_futures_spread() {
1177        use nautilus_model::instruments::{Instrument, stubs::crypto_futures_spread_btc_deribit};
1178        let inst = crypto_futures_spread_btc_deribit();
1179        let any = InstrumentAny::CryptoFuturesSpread(inst.clone());
1180        roundtrip_case(&any);
1181        let metadata = any.metadata();
1182        let batch = InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&any)).unwrap();
1183        let decoded = decode_instrument_any_batch(&metadata, &batch).unwrap();
1184        let InstrumentAny::CryptoFuturesSpread(decoded_inst) = &decoded[0] else {
1185            panic!("decoded variant is not CryptoFuturesSpread");
1186        };
1187        assert_eq!(decoded_inst.lot_size, inst.lot_size);
1188        assert_eq!(decoded_inst.is_inverse, inst.is_inverse);
1189        assert_eq!(decoded_inst.strategy_type, inst.strategy_type);
1190        assert_eq!(decoded_inst.settlement_currency, inst.settlement_currency);
1191        assert_eq!(Instrument::id(decoded_inst), Instrument::id(&inst));
1192    }
1193
1194    #[rstest]
1195    fn test_roundtrip_crypto_option_spread() {
1196        use nautilus_model::instruments::{Instrument, stubs::crypto_option_spread_btc_deribit};
1197        let inst = crypto_option_spread_btc_deribit();
1198        let any = InstrumentAny::CryptoOptionSpread(inst.clone());
1199        roundtrip_case(&any);
1200        let metadata = any.metadata();
1201        let batch = InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&any)).unwrap();
1202        let decoded = decode_instrument_any_batch(&metadata, &batch).unwrap();
1203        let InstrumentAny::CryptoOptionSpread(decoded_inst) = &decoded[0] else {
1204            panic!("decoded variant is not CryptoOptionSpread");
1205        };
1206        // Deribit BTC option combos carry min_trade_amount=0.1, which sets
1207        // lot_size=0.1; dropping the lot_size Arrow column would silently
1208        // default it back to 1
1209        assert_eq!(decoded_inst.lot_size, inst.lot_size);
1210        assert_eq!(decoded_inst.size_precision, inst.size_precision);
1211        assert_eq!(decoded_inst.size_increment, inst.size_increment);
1212        assert_eq!(decoded_inst.is_inverse, inst.is_inverse);
1213        assert_eq!(decoded_inst.strategy_type, inst.strategy_type);
1214        assert_eq!(decoded_inst.settlement_currency, inst.settlement_currency);
1215        assert_eq!(Instrument::id(decoded_inst), Instrument::id(&inst));
1216    }
1217
1218    #[rstest]
1219    fn test_roundtrip_crypto_perpetual_inverse() {
1220        use nautilus_model::instruments::stubs::xbtusd_bitmex;
1221        roundtrip_case(&InstrumentAny::CryptoPerpetual(xbtusd_bitmex()));
1222    }
1223
1224    #[rstest]
1225    fn test_roundtrip_crypto_perpetual_linear() {
1226        use nautilus_model::instruments::stubs::crypto_perpetual_ethusdt;
1227
1228        let mut inst = crypto_perpetual_ethusdt();
1229        inst.lot_size = Quantity::from("0.005");
1230        let any = InstrumentAny::CryptoPerpetual(inst.clone());
1231        roundtrip_case(&any);
1232        let metadata = any.metadata();
1233        let batch = InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&any)).unwrap();
1234        let decoded = decode_instrument_any_batch(&metadata, &batch).unwrap();
1235        let InstrumentAny::CryptoPerpetual(decoded_inst) = &decoded[0] else {
1236            panic!("decoded variant is not CryptoPerpetual");
1237        };
1238        assert_eq!(decoded_inst.lot_size, inst.lot_size);
1239    }
1240
1241    #[rstest]
1242    fn test_decode_crypto_perpetual_without_lot_size_column_defaults_to_one() {
1243        use nautilus_model::instruments::stubs::crypto_perpetual_ethusdt;
1244
1245        let inst = crypto_perpetual_ethusdt();
1246        let any = InstrumentAny::CryptoPerpetual(inst);
1247        let metadata = any.metadata();
1248        let batch = InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&any)).unwrap();
1249        let batch = batch_without_column(&batch, "lot_size");
1250
1251        let decoded = decode_instrument_any_batch(&metadata, &batch).unwrap();
1252
1253        let InstrumentAny::CryptoPerpetual(decoded_inst) = &decoded[0] else {
1254            panic!("decoded variant is not CryptoPerpetual");
1255        };
1256        assert_eq!(decoded_inst.lot_size, Quantity::from(1));
1257    }
1258
1259    #[rstest]
1260    fn test_decode_crypto_perpetual_null_lot_size_defaults_to_one() {
1261        use nautilus_model::instruments::stubs::crypto_perpetual_ethusdt;
1262
1263        let inst = crypto_perpetual_ethusdt();
1264        let any = InstrumentAny::CryptoPerpetual(inst);
1265        let metadata = any.metadata();
1266        let batch = InstrumentAny::encode_batch(&metadata, std::slice::from_ref(&any)).unwrap();
1267        let batch = batch_with_null_string_column(&batch, "lot_size");
1268
1269        let decoded = decode_instrument_any_batch(&metadata, &batch).unwrap();
1270
1271        let InstrumentAny::CryptoPerpetual(decoded_inst) = &decoded[0] else {
1272            panic!("decoded variant is not CryptoPerpetual");
1273        };
1274        assert_eq!(decoded_inst.lot_size, Quantity::from(1));
1275    }
1276
1277    #[rstest]
1278    fn test_roundtrip_futures_contract() {
1279        use nautilus_model::instruments::stubs::futures_contract_es;
1280        roundtrip_case(&InstrumentAny::FuturesContract(futures_contract_es(
1281            None, None,
1282        )));
1283    }
1284
1285    #[rstest]
1286    fn test_roundtrip_futures_spread() {
1287        use nautilus_model::instruments::stubs::futures_spread_es;
1288        roundtrip_case(&InstrumentAny::FuturesSpread(futures_spread_es()));
1289    }
1290
1291    #[rstest]
1292    fn test_roundtrip_index_instrument() {
1293        use nautilus_model::instruments::stubs::index_instrument_spx;
1294        roundtrip_case(&InstrumentAny::IndexInstrument(index_instrument_spx()));
1295    }
1296
1297    #[rstest]
1298    fn test_roundtrip_option_contract() {
1299        use nautilus_model::instruments::stubs::option_contract_appl;
1300        roundtrip_case(&InstrumentAny::OptionContract(option_contract_appl()));
1301    }
1302
1303    #[rstest]
1304    fn test_roundtrip_option_spread() {
1305        use nautilus_model::instruments::stubs::option_spread;
1306        roundtrip_case(&InstrumentAny::OptionSpread(option_spread()));
1307    }
1308
1309    #[rstest]
1310    fn test_roundtrip_perpetual_contract() {
1311        use nautilus_model::instruments::stubs::perpetual_contract_eurusd;
1312        roundtrip_case(&InstrumentAny::PerpetualContract(
1313            perpetual_contract_eurusd(),
1314        ));
1315    }
1316
1317    #[rstest]
1318    fn test_roundtrip_tokenized_asset() {
1319        use nautilus_model::instruments::stubs::tokenized_asset_aaplx;
1320        roundtrip_case(&InstrumentAny::TokenizedAsset(tokenized_asset_aaplx()));
1321    }
1322}