Skip to main content

nautilus_serialization/arrow/
depth.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, str::FromStr, sync::Arc};
17
18use arrow::{
19    array::{
20        Array, FixedSizeBinaryArray, FixedSizeBinaryBuilder, UInt8Array, UInt32Array, UInt64Array,
21    },
22    datatypes::{DataType, Field, Schema},
23    error::ArrowError,
24    record_batch::RecordBatch,
25};
26use nautilus_model::{
27    data::{
28        depth::{DEPTH10_LEN, OrderBookDepth10},
29        order::BookOrder,
30    },
31    enums::OrderSide,
32    identifiers::InstrumentId,
33    types::{PRICE_UNDEF, QUANTITY_UNDEF, fixed::PRECISION_BYTES},
34};
35
36use super::{
37    DecodeDataFromRecordBatch, EncodingError, KEY_INSTRUMENT_ID, KEY_PRICE_PRECISION,
38    KEY_SIZE_PRECISION, decode_price, decode_quantity, extract_column, get_raw_price,
39    get_raw_quantity, validate_precision_bytes,
40};
41use crate::arrow::{ArrowSchemaProvider, Data, DecodeFromRecordBatch, EncodeToRecordBatch};
42
43fn get_field_data() -> Vec<(&'static str, DataType)> {
44    vec![
45        ("bid_price", DataType::FixedSizeBinary(PRECISION_BYTES)),
46        ("ask_price", DataType::FixedSizeBinary(PRECISION_BYTES)),
47        ("bid_size", DataType::FixedSizeBinary(PRECISION_BYTES)),
48        ("ask_size", DataType::FixedSizeBinary(PRECISION_BYTES)),
49        ("bid_count", DataType::UInt32),
50        ("ask_count", DataType::UInt32),
51    ]
52}
53
54impl ArrowSchemaProvider for OrderBookDepth10 {
55    fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema {
56        let mut fields = Vec::new();
57        let field_data = get_field_data();
58
59        // Schema is of the form:
60        // bid_price_0, bid_price_1, ..., bid_price_9, ask_price_0, ask_price_1
61        for (name, data_type) in field_data {
62            for i in 0..DEPTH10_LEN {
63                fields.push(Field::new(format!("{name}_{i}"), data_type.clone(), false));
64            }
65        }
66
67        fields.push(Field::new("flags", DataType::UInt8, false));
68        fields.push(Field::new("sequence", DataType::UInt64, false));
69        fields.push(Field::new("ts_event", DataType::UInt64, false));
70        fields.push(Field::new("ts_init", DataType::UInt64, false));
71
72        match metadata {
73            Some(metadata) => Schema::new_with_metadata(fields, metadata),
74            None => Schema::new(fields),
75        }
76    }
77}
78
79fn parse_metadata(
80    metadata: &HashMap<String, String>,
81) -> Result<(InstrumentId, u8, u8), EncodingError> {
82    let instrument_id_str = metadata
83        .get(KEY_INSTRUMENT_ID)
84        .ok_or_else(|| EncodingError::MissingMetadata(KEY_INSTRUMENT_ID))?;
85    let instrument_id = InstrumentId::from_str(instrument_id_str)
86        .map_err(|e| EncodingError::ParseError(KEY_INSTRUMENT_ID, e.to_string()))?;
87
88    let price_precision = metadata
89        .get(KEY_PRICE_PRECISION)
90        .ok_or_else(|| EncodingError::MissingMetadata(KEY_PRICE_PRECISION))?
91        .parse::<u8>()
92        .map_err(|e| EncodingError::ParseError(KEY_PRICE_PRECISION, e.to_string()))?;
93
94    let size_precision = metadata
95        .get(KEY_SIZE_PRECISION)
96        .ok_or_else(|| EncodingError::MissingMetadata(KEY_SIZE_PRECISION))?
97        .parse::<u8>()
98        .map_err(|e| EncodingError::ParseError(KEY_SIZE_PRECISION, e.to_string()))?;
99
100    Ok((instrument_id, price_precision, size_precision))
101}
102
103impl EncodeToRecordBatch for OrderBookDepth10 {
104    fn encode_batch(
105        metadata: &HashMap<String, String>,
106        data: &[Self],
107    ) -> Result<RecordBatch, ArrowError> {
108        let mut bid_price_builders = Vec::with_capacity(DEPTH10_LEN);
109        let mut ask_price_builders = Vec::with_capacity(DEPTH10_LEN);
110        let mut bid_size_builders = Vec::with_capacity(DEPTH10_LEN);
111        let mut ask_size_builders = Vec::with_capacity(DEPTH10_LEN);
112        let mut bid_count_builders = Vec::with_capacity(DEPTH10_LEN);
113        let mut ask_count_builders = Vec::with_capacity(DEPTH10_LEN);
114
115        for _ in 0..DEPTH10_LEN {
116            bid_price_builders.push(FixedSizeBinaryBuilder::with_capacity(
117                data.len(),
118                PRECISION_BYTES,
119            ));
120            ask_price_builders.push(FixedSizeBinaryBuilder::with_capacity(
121                data.len(),
122                PRECISION_BYTES,
123            ));
124            bid_size_builders.push(FixedSizeBinaryBuilder::with_capacity(
125                data.len(),
126                PRECISION_BYTES,
127            ));
128            ask_size_builders.push(FixedSizeBinaryBuilder::with_capacity(
129                data.len(),
130                PRECISION_BYTES,
131            ));
132            bid_count_builders.push(UInt32Array::builder(data.len()));
133            ask_count_builders.push(UInt32Array::builder(data.len()));
134        }
135
136        let mut flags_builder = UInt8Array::builder(data.len());
137        let mut sequence_builder = UInt64Array::builder(data.len());
138        let mut ts_event_builder = UInt64Array::builder(data.len());
139        let mut ts_init_builder = UInt64Array::builder(data.len());
140
141        for depth in data {
142            for i in 0..DEPTH10_LEN {
143                bid_price_builders[i]
144                    .append_value(depth.bids[i].price.raw.to_le_bytes())
145                    .unwrap();
146                ask_price_builders[i]
147                    .append_value(depth.asks[i].price.raw.to_le_bytes())
148                    .unwrap();
149                bid_size_builders[i]
150                    .append_value(depth.bids[i].size.raw.to_le_bytes())
151                    .unwrap();
152                ask_size_builders[i]
153                    .append_value(depth.asks[i].size.raw.to_le_bytes())
154                    .unwrap();
155                bid_count_builders[i].append_value(depth.bid_counts[i]);
156                ask_count_builders[i].append_value(depth.ask_counts[i]);
157            }
158
159            flags_builder.append_value(depth.flags);
160            sequence_builder.append_value(depth.sequence);
161            ts_event_builder.append_value(depth.ts_event.as_u64());
162            ts_init_builder.append_value(depth.ts_init.as_u64());
163        }
164
165        let bid_price_arrays = bid_price_builders
166            .into_iter()
167            .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
168            .collect::<Vec<_>>();
169        let ask_price_arrays = ask_price_builders
170            .into_iter()
171            .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
172            .collect::<Vec<_>>();
173        let bid_size_arrays = bid_size_builders
174            .into_iter()
175            .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
176            .collect::<Vec<_>>();
177        let ask_size_arrays = ask_size_builders
178            .into_iter()
179            .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
180            .collect::<Vec<_>>();
181        let bid_count_arrays = bid_count_builders
182            .into_iter()
183            .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
184            .collect::<Vec<_>>();
185        let ask_count_arrays = ask_count_builders
186            .into_iter()
187            .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
188            .collect::<Vec<_>>();
189
190        let flags_array = Arc::new(flags_builder.finish()) as Arc<dyn Array>;
191        let sequence_array = Arc::new(sequence_builder.finish()) as Arc<dyn Array>;
192        let ts_event_array = Arc::new(ts_event_builder.finish()) as Arc<dyn Array>;
193        let ts_init_array = Arc::new(ts_init_builder.finish()) as Arc<dyn Array>;
194
195        let mut columns = Vec::new();
196        columns.extend(bid_price_arrays);
197        columns.extend(ask_price_arrays);
198        columns.extend(bid_size_arrays);
199        columns.extend(ask_size_arrays);
200        columns.extend(bid_count_arrays);
201        columns.extend(ask_count_arrays);
202        columns.push(flags_array);
203        columns.push(sequence_array);
204        columns.push(ts_event_array);
205        columns.push(ts_init_array);
206
207        RecordBatch::try_new(Self::get_schema(Some(metadata.clone())).into(), columns)
208    }
209
210    fn metadata(&self) -> HashMap<String, String> {
211        Self::get_metadata(
212            &self.instrument_id,
213            self.bids[0].price.precision,
214            self.bids[0].size.precision,
215        )
216    }
217}
218
219impl DecodeFromRecordBatch for OrderBookDepth10 {
220    fn decode_batch(
221        metadata: &HashMap<String, String>,
222        record_batch: RecordBatch,
223    ) -> Result<Vec<Self>, EncodingError> {
224        let (instrument_id, price_precision, size_precision) = parse_metadata(metadata)?;
225        let cols = record_batch.columns();
226
227        let mut bid_prices = Vec::with_capacity(DEPTH10_LEN);
228        let mut ask_prices = Vec::with_capacity(DEPTH10_LEN);
229        let mut bid_sizes = Vec::with_capacity(DEPTH10_LEN);
230        let mut ask_sizes = Vec::with_capacity(DEPTH10_LEN);
231        let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
232        let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
233
234        macro_rules! extract_depth_column {
235            ($array:ty, $name:literal, $i:expr, $offset:expr, $type:expr) => {
236                extract_column::<$array>(cols, concat!($name, "_", stringify!($i)), $offset, $type)?
237            };
238        }
239
240        for i in 0..DEPTH10_LEN {
241            bid_prices.push(extract_depth_column!(
242                FixedSizeBinaryArray,
243                "bid_price",
244                i,
245                i,
246                DataType::FixedSizeBinary(PRECISION_BYTES)
247            ));
248            ask_prices.push(extract_depth_column!(
249                FixedSizeBinaryArray,
250                "ask_price",
251                i,
252                DEPTH10_LEN + i,
253                DataType::FixedSizeBinary(PRECISION_BYTES)
254            ));
255            bid_sizes.push(extract_depth_column!(
256                FixedSizeBinaryArray,
257                "bid_size",
258                i,
259                2 * DEPTH10_LEN + i,
260                DataType::FixedSizeBinary(PRECISION_BYTES)
261            ));
262            ask_sizes.push(extract_depth_column!(
263                FixedSizeBinaryArray,
264                "ask_size",
265                i,
266                3 * DEPTH10_LEN + i,
267                DataType::FixedSizeBinary(PRECISION_BYTES)
268            ));
269            bid_counts.push(extract_depth_column!(
270                UInt32Array,
271                "bid_count",
272                i,
273                4 * DEPTH10_LEN + i,
274                DataType::UInt32
275            ));
276            ask_counts.push(extract_depth_column!(
277                UInt32Array,
278                "ask_count",
279                i,
280                5 * DEPTH10_LEN + i,
281                DataType::UInt32
282            ));
283        }
284
285        for i in 0..DEPTH10_LEN {
286            validate_precision_bytes(bid_prices[i], "bid_price")?;
287            validate_precision_bytes(ask_prices[i], "ask_price")?;
288            validate_precision_bytes(bid_sizes[i], "bid_size")?;
289            validate_precision_bytes(ask_sizes[i], "ask_size")?;
290        }
291
292        let flags = extract_column::<UInt8Array>(cols, "flags", 6 * DEPTH10_LEN, DataType::UInt8)?;
293        let sequence =
294            extract_column::<UInt64Array>(cols, "sequence", 6 * DEPTH10_LEN + 1, DataType::UInt64)?;
295        let ts_event =
296            extract_column::<UInt64Array>(cols, "ts_event", 6 * DEPTH10_LEN + 2, DataType::UInt64)?;
297        let ts_init =
298            extract_column::<UInt64Array>(cols, "ts_init", 6 * DEPTH10_LEN + 3, DataType::UInt64)?;
299
300        // Map record batch rows to vector of OrderBookDepth10
301        let result: Result<Vec<Self>, EncodingError> = (0..record_batch.num_rows())
302            .map(|row| {
303                let mut bids = [BookOrder::default(); DEPTH10_LEN];
304                let mut asks = [BookOrder::default(); DEPTH10_LEN];
305                let mut bid_count_arr = [0u32; DEPTH10_LEN];
306                let mut ask_count_arr = [0u32; DEPTH10_LEN];
307
308                for i in 0..DEPTH10_LEN {
309                    // Undefined levels (PRICE_UNDEF / QUANTITY_UNDEF sentinels) are
310                    // decoded as NULL_ORDER so the matching engine skips them during
311                    // precision validation (see engine.pyx::process_order_book_depth10).
312                    let bid_price_bytes = bid_prices[i].value(row);
313                    let bid_size_bytes = bid_sizes[i].value(row);
314                    if get_raw_price(bid_price_bytes) == PRICE_UNDEF
315                        || get_raw_quantity(bid_size_bytes) == QUANTITY_UNDEF
316                    {
317                        bids[i] = BookOrder::default();
318                    } else {
319                        let bid_price =
320                            decode_price(bid_price_bytes, price_precision, "bid_price", row)?;
321                        let bid_size =
322                            decode_quantity(bid_size_bytes, size_precision, "bid_size", row)?;
323                        bids[i] = BookOrder::new(OrderSide::Buy, bid_price, bid_size, 0);
324                    }
325
326                    let ask_price_bytes = ask_prices[i].value(row);
327                    let ask_size_bytes = ask_sizes[i].value(row);
328                    if get_raw_price(ask_price_bytes) == PRICE_UNDEF
329                        || get_raw_quantity(ask_size_bytes) == QUANTITY_UNDEF
330                    {
331                        asks[i] = BookOrder::default();
332                    } else {
333                        let ask_price =
334                            decode_price(ask_price_bytes, price_precision, "ask_price", row)?;
335                        let ask_size =
336                            decode_quantity(ask_size_bytes, size_precision, "ask_size", row)?;
337                        asks[i] = BookOrder::new(OrderSide::Sell, ask_price, ask_size, 0);
338                    }
339
340                    bid_count_arr[i] = bid_counts[i].value(row);
341                    ask_count_arr[i] = ask_counts[i].value(row);
342                }
343
344                Ok(Self {
345                    instrument_id,
346                    bids,
347                    asks,
348                    bid_counts: bid_count_arr,
349                    ask_counts: ask_count_arr,
350                    flags: flags.value(row),
351                    sequence: sequence.value(row),
352                    ts_event: ts_event.value(row).into(),
353                    ts_init: ts_init.value(row).into(),
354                })
355            })
356            .collect();
357
358        result
359    }
360}
361
362impl DecodeDataFromRecordBatch for OrderBookDepth10 {
363    fn decode_data_batch(
364        metadata: &HashMap<String, String>,
365        record_batch: RecordBatch,
366    ) -> Result<Vec<Data>, EncodingError> {
367        let depths: Vec<Self> = Self::decode_batch(metadata, record_batch)?;
368        Ok(depths.into_iter().map(Data::from).collect())
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use arrow::datatypes::{DataType, Field};
375    use nautilus_model::{
376        data::stubs::stub_depth10,
377        types::{Price, Quantity, fixed::FIXED_SCALAR, price::PriceRaw, quantity::QuantityRaw},
378    };
379    use pretty_assertions::assert_eq;
380    use rstest::rstest;
381
382    use super::*;
383    use crate::arrow::{get_raw_price, get_raw_quantity};
384
385    #[rstest]
386    fn test_get_schema() {
387        let instrument_id = InstrumentId::from("AAPL.XNAS");
388        let metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
389        let schema = OrderBookDepth10::get_schema(Some(metadata));
390
391        let mut group_count = 0;
392        let field_data = get_field_data();
393        for (name, data_type) in field_data {
394            for i in 0..DEPTH10_LEN {
395                let field = schema.field(i + group_count * DEPTH10_LEN).clone();
396                assert_eq!(
397                    field,
398                    Field::new(format!("{name}_{i}"), data_type.clone(), false)
399                );
400            }
401
402            group_count += 1;
403        }
404
405        let flags_field = schema.field(group_count * DEPTH10_LEN).clone();
406        assert_eq!(flags_field, Field::new("flags", DataType::UInt8, false));
407        let sequence_field = schema.field(group_count * DEPTH10_LEN + 1).clone();
408        assert_eq!(
409            sequence_field,
410            Field::new("sequence", DataType::UInt64, false)
411        );
412        let ts_event_field = schema.field(group_count * DEPTH10_LEN + 2).clone();
413        assert_eq!(
414            ts_event_field,
415            Field::new("ts_event", DataType::UInt64, false)
416        );
417        let ts_init_field = schema.field(group_count * DEPTH10_LEN + 3).clone();
418        assert_eq!(
419            ts_init_field,
420            Field::new("ts_init", DataType::UInt64, false)
421        );
422
423        assert_eq!(schema.metadata()["instrument_id"], "AAPL.XNAS");
424        assert_eq!(schema.metadata()["price_precision"], "2");
425        assert_eq!(schema.metadata()["size_precision"], "0");
426    }
427
428    #[rstest]
429    fn test_get_schema_map() {
430        let schema_map = OrderBookDepth10::get_schema_map();
431
432        let field_data = get_field_data();
433        for (name, data_type) in field_data {
434            for i in 0..DEPTH10_LEN {
435                let field = schema_map.get(&format!("{name}_{i}")).map(String::as_str);
436                assert_eq!(field, Some(format!("{data_type:?}").as_str()));
437            }
438        }
439
440        assert_eq!(schema_map.get("flags").map(String::as_str), Some("UInt8"));
441        assert_eq!(
442            schema_map.get("sequence").map(String::as_str),
443            Some("UInt64")
444        );
445        assert_eq!(
446            schema_map.get("ts_event").map(String::as_str),
447            Some("UInt64")
448        );
449        assert_eq!(
450            schema_map.get("ts_init").map(String::as_str),
451            Some("UInt64")
452        );
453    }
454
455    #[rstest]
456    fn test_encode_batch(stub_depth10: OrderBookDepth10) {
457        let instrument_id = InstrumentId::from("AAPL.XNAS");
458        let price_precision = 2;
459        let metadata = OrderBookDepth10::get_metadata(&instrument_id, price_precision, 0);
460
461        let data = vec![stub_depth10];
462        let record_batch = OrderBookDepth10::encode_batch(&metadata, &data).unwrap();
463        let columns = record_batch.columns();
464
465        assert_eq!(columns.len(), DEPTH10_LEN * 6 + 4);
466
467        // Extract and test bid prices
468        let bid_prices: Vec<_> = (0..DEPTH10_LEN)
469            .map(|i| {
470                columns[i]
471                    .as_any()
472                    .downcast_ref::<FixedSizeBinaryArray>()
473                    .unwrap()
474            })
475            .collect();
476
477        let expected_bid_prices: Vec<f64> =
478            vec![99.0, 98.0, 97.0, 96.0, 95.0, 94.0, 93.0, 92.0, 91.0, 90.0];
479
480        for (i, bid_price) in bid_prices.iter().enumerate() {
481            assert_eq!(bid_price.len(), 1);
482            assert_eq!(
483                get_raw_price(bid_price.value(0)),
484                (expected_bid_prices[i] * FIXED_SCALAR) as PriceRaw
485            );
486            assert_eq!(
487                Price::from_raw(get_raw_price(bid_price.value(0)), price_precision).as_f64(),
488                expected_bid_prices[i]
489            );
490        }
491
492        // Extract and test ask prices
493        let ask_prices: Vec<_> = (0..DEPTH10_LEN)
494            .map(|i| {
495                columns[DEPTH10_LEN + i]
496                    .as_any()
497                    .downcast_ref::<FixedSizeBinaryArray>()
498                    .unwrap()
499            })
500            .collect();
501
502        let expected_ask_prices: Vec<f64> = vec![
503            100.0, 101.0, 102.0, 103.0, 104.0, 105.0, 106.0, 107.0, 108.0, 109.0,
504        ];
505
506        for (i, ask_price) in ask_prices.iter().enumerate() {
507            assert_eq!(ask_price.len(), 1);
508            assert_eq!(
509                get_raw_price(ask_price.value(0)),
510                (expected_ask_prices[i] * FIXED_SCALAR) as PriceRaw
511            );
512            assert_eq!(
513                Price::from_raw(get_raw_price(ask_price.value(0)), price_precision).as_f64(),
514                expected_ask_prices[i]
515            );
516        }
517
518        // Extract and test bid sizes
519        let bid_sizes: Vec<_> = (0..DEPTH10_LEN)
520            .map(|i| {
521                columns[2 * DEPTH10_LEN + i]
522                    .as_any()
523                    .downcast_ref::<FixedSizeBinaryArray>()
524                    .unwrap()
525            })
526            .collect();
527
528        for (i, bid_size) in bid_sizes.iter().enumerate() {
529            assert_eq!(bid_size.len(), 1);
530            assert_eq!(
531                get_raw_quantity(bid_size.value(0)),
532                ((100.0 * FIXED_SCALAR * (i + 1) as f64) as QuantityRaw)
533            );
534        }
535
536        // Extract and test ask sizes
537        let ask_sizes: Vec<_> = (0..DEPTH10_LEN)
538            .map(|i| {
539                columns[3 * DEPTH10_LEN + i]
540                    .as_any()
541                    .downcast_ref::<FixedSizeBinaryArray>()
542                    .unwrap()
543            })
544            .collect();
545
546        for (i, ask_size) in ask_sizes.iter().enumerate() {
547            assert_eq!(ask_size.len(), 1);
548            assert_eq!(
549                get_raw_quantity(ask_size.value(0)),
550                ((100.0 * FIXED_SCALAR * ((i + 1) as f64)) as QuantityRaw)
551            );
552        }
553
554        // Extract and test bid counts
555        let bid_counts: Vec<_> = (0..DEPTH10_LEN)
556            .map(|i| {
557                columns[4 * DEPTH10_LEN + i]
558                    .as_any()
559                    .downcast_ref::<UInt32Array>()
560                    .unwrap()
561            })
562            .collect();
563
564        for count_values in bid_counts {
565            assert_eq!(count_values.len(), 1);
566            assert_eq!(count_values.value(0), 1);
567        }
568
569        // Extract and test ask counts
570        let ask_counts: Vec<_> = (0..DEPTH10_LEN)
571            .map(|i| {
572                columns[5 * DEPTH10_LEN + i]
573                    .as_any()
574                    .downcast_ref::<UInt32Array>()
575                    .unwrap()
576            })
577            .collect();
578
579        for count_values in ask_counts {
580            assert_eq!(count_values.len(), 1);
581            assert_eq!(count_values.value(0), 1);
582        }
583
584        // Test remaining fields
585        let flags_values = columns[6 * DEPTH10_LEN]
586            .as_any()
587            .downcast_ref::<UInt8Array>()
588            .unwrap();
589        let sequence_values = columns[6 * DEPTH10_LEN + 1]
590            .as_any()
591            .downcast_ref::<UInt64Array>()
592            .unwrap();
593        let ts_event_values = columns[6 * DEPTH10_LEN + 2]
594            .as_any()
595            .downcast_ref::<UInt64Array>()
596            .unwrap();
597        let ts_init_values = columns[6 * DEPTH10_LEN + 3]
598            .as_any()
599            .downcast_ref::<UInt64Array>()
600            .unwrap();
601
602        assert_eq!(flags_values.len(), 1);
603        assert_eq!(flags_values.value(0), 0);
604        assert_eq!(sequence_values.len(), 1);
605        assert_eq!(sequence_values.value(0), 0);
606        assert_eq!(ts_event_values.len(), 1);
607        assert_eq!(ts_event_values.value(0), 1);
608        assert_eq!(ts_init_values.len(), 1);
609        assert_eq!(ts_init_values.value(0), 2);
610    }
611
612    #[rstest]
613    fn test_decode_batch(stub_depth10: OrderBookDepth10) {
614        let instrument_id = InstrumentId::from("AAPL.XNAS");
615        let metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
616
617        let data = vec![stub_depth10];
618        let record_batch = OrderBookDepth10::encode_batch(&metadata, &data).unwrap();
619        let decoded_data = OrderBookDepth10::decode_batch(&metadata, record_batch).unwrap();
620
621        assert_eq!(decoded_data.len(), 1);
622    }
623
624    #[rstest]
625    fn test_decode_batch_missing_instrument_id_returns_error(stub_depth10: OrderBookDepth10) {
626        let instrument_id = InstrumentId::from("AAPL.XNAS");
627        let mut metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
628        let record_batch = OrderBookDepth10::encode_batch(&metadata, &[stub_depth10]).unwrap();
629
630        metadata.remove(KEY_INSTRUMENT_ID);
631
632        let result = OrderBookDepth10::decode_batch(&metadata, record_batch);
633        assert!(result.is_err());
634        let err = result.unwrap_err();
635        assert!(
636            err.to_string().contains("instrument_id"),
637            "Expected missing instrument_id error, was: {err}"
638        );
639    }
640
641    #[rstest]
642    fn test_decode_batch_missing_price_precision_returns_error(stub_depth10: OrderBookDepth10) {
643        let instrument_id = InstrumentId::from("AAPL.XNAS");
644        let mut metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
645        let record_batch = OrderBookDepth10::encode_batch(&metadata, &[stub_depth10]).unwrap();
646
647        metadata.remove(KEY_PRICE_PRECISION);
648
649        let result = OrderBookDepth10::decode_batch(&metadata, record_batch);
650        assert!(result.is_err());
651        let err = result.unwrap_err();
652        assert!(
653            err.to_string().contains("price_precision"),
654            "Expected missing price_precision error, was: {err}"
655        );
656    }
657
658    #[rstest]
659    fn test_encode_decode_round_trip(stub_depth10: OrderBookDepth10) {
660        let instrument_id = InstrumentId::from("AAPL.XNAS");
661        let metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
662
663        let original = vec![stub_depth10];
664        let record_batch = OrderBookDepth10::encode_batch(&metadata, &original).unwrap();
665        let decoded = OrderBookDepth10::decode_batch(&metadata, record_batch).unwrap();
666
667        assert_eq!(decoded.len(), original.len());
668        let orig = &original[0];
669        let dec = &decoded[0];
670
671        assert_eq!(dec.instrument_id, orig.instrument_id);
672        assert_eq!(dec.flags, orig.flags);
673        assert_eq!(dec.sequence, orig.sequence);
674        assert_eq!(dec.ts_event, orig.ts_event);
675        assert_eq!(dec.ts_init, orig.ts_init);
676
677        for i in 0..DEPTH10_LEN {
678            assert_eq!(
679                dec.bids[i].price, orig.bids[i].price,
680                "bid price mismatch at level {i}"
681            );
682            assert_eq!(
683                dec.bids[i].size, orig.bids[i].size,
684                "bid size mismatch at level {i}"
685            );
686            assert_eq!(
687                dec.asks[i].price, orig.asks[i].price,
688                "ask price mismatch at level {i}"
689            );
690            assert_eq!(
691                dec.asks[i].size, orig.asks[i].size,
692                "ask size mismatch at level {i}"
693            );
694        }
695    }
696
697    // Each case toggles the price and/or size sentinel on one bid level and one
698    // ask level. Any sentinel in either field must decode to NULL_ORDER; the
699    // "neither" case is the control proving defined levels still round-trip.
700    #[rstest]
701    #[case::price_only(true, false)]
702    #[case::size_only(false, true)]
703    #[case::both(true, true)]
704    #[case::neither(false, false)]
705    fn test_decode_batch_with_undefined_levels(
706        stub_depth10: OrderBookDepth10,
707        #[case] price_undef: bool,
708        #[case] size_undef: bool,
709    ) {
710        let instrument_id = InstrumentId::from("AAPL.XNAS");
711        let price_precision = 2;
712        let size_precision = 0;
713        let metadata =
714            OrderBookDepth10::get_metadata(&instrument_id, price_precision, size_precision);
715
716        let mut depth = stub_depth10;
717        let original_bid = depth.bids[5];
718        let original_ask = depth.asks[7];
719        let sentinel_bid_price = if price_undef {
720            Price::from_raw(PRICE_UNDEF, 0)
721        } else {
722            original_bid.price
723        };
724        let sentinel_bid_size = if size_undef {
725            Quantity::from_raw(QUANTITY_UNDEF, 0)
726        } else {
727            original_bid.size
728        };
729        depth.bids[5] = BookOrder {
730            side: OrderSide::Buy,
731            price: sentinel_bid_price,
732            size: sentinel_bid_size,
733            order_id: 0,
734        };
735        let sentinel_ask_price = if price_undef {
736            Price::from_raw(PRICE_UNDEF, 0)
737        } else {
738            original_ask.price
739        };
740        let sentinel_ask_size = if size_undef {
741            Quantity::from_raw(QUANTITY_UNDEF, 0)
742        } else {
743            original_ask.size
744        };
745        depth.asks[7] = BookOrder {
746            side: OrderSide::Sell,
747            price: sentinel_ask_price,
748            size: sentinel_ask_size,
749            order_id: 0,
750        };
751
752        let record_batch = OrderBookDepth10::encode_batch(&metadata, &[depth]).unwrap();
753        let decoded = OrderBookDepth10::decode_batch(&metadata, record_batch).unwrap();
754
755        assert_eq!(decoded.len(), 1);
756        let decoded = &decoded[0];
757
758        let expect_null = price_undef || size_undef;
759        if expect_null {
760            assert_eq!(decoded.bids[5].side, OrderSide::NoOrderSide);
761            assert_eq!(decoded.bids[5].price.raw, 0);
762            assert_eq!(decoded.bids[5].price.precision, 0);
763            assert_eq!(decoded.bids[5].size.raw, 0);
764            assert_eq!(decoded.bids[5].size.precision, 0);
765
766            assert_eq!(decoded.asks[7].side, OrderSide::NoOrderSide);
767            assert_eq!(decoded.asks[7].price.raw, 0);
768            assert_eq!(decoded.asks[7].price.precision, 0);
769            assert_eq!(decoded.asks[7].size.raw, 0);
770            assert_eq!(decoded.asks[7].size.precision, 0);
771        } else {
772            assert_eq!(decoded.bids[5].side, OrderSide::Buy);
773            assert_eq!(decoded.bids[5].price, original_bid.price);
774            assert_eq!(decoded.bids[5].size, original_bid.size);
775            assert_eq!(decoded.asks[7].side, OrderSide::Sell);
776            assert_eq!(decoded.asks[7].price, original_ask.price);
777            assert_eq!(decoded.asks[7].size, original_ask.size);
778        }
779
780        // Surrounding defined levels always round-trip with the instrument precision
781        assert_eq!(decoded.bids[0].side, OrderSide::Buy);
782        assert_eq!(decoded.bids[0].price.precision, price_precision);
783        assert_eq!(decoded.bids[0].size.precision, size_precision);
784        assert_eq!(decoded.asks[0].side, OrderSide::Sell);
785        assert_eq!(decoded.asks[0].price.precision, price_precision);
786        assert_eq!(decoded.asks[0].size.precision, size_precision);
787    }
788}