1use std::{collections::HashMap, str::FromStr, sync::Arc};
17
18use arrow::{
19 array::{
20 Array, FixedSizeBinaryArray, FixedSizeBinaryBuilder, UInt8Array, UInt32Array, UInt64Array,
21 },
22 datatypes::{DataType, Field, Schema},
23 error::ArrowError,
24 record_batch::RecordBatch,
25};
26use nautilus_model::{
27 data::{
28 depth::{DEPTH10_LEN, OrderBookDepth10},
29 order::BookOrder,
30 },
31 enums::OrderSide,
32 identifiers::InstrumentId,
33 types::{PRICE_UNDEF, QUANTITY_UNDEF, fixed::PRECISION_BYTES},
34};
35
36use super::{
37 DecodeDataFromRecordBatch, EncodingError, KEY_INSTRUMENT_ID, KEY_PRICE_PRECISION,
38 KEY_SIZE_PRECISION, decode_price, decode_quantity, extract_column, get_raw_price,
39 get_raw_quantity, validate_precision_bytes,
40};
41use crate::arrow::{ArrowSchemaProvider, Data, DecodeFromRecordBatch, EncodeToRecordBatch};
42
43fn get_field_data() -> Vec<(&'static str, DataType)> {
44 vec![
45 ("bid_price", DataType::FixedSizeBinary(PRECISION_BYTES)),
46 ("ask_price", DataType::FixedSizeBinary(PRECISION_BYTES)),
47 ("bid_size", DataType::FixedSizeBinary(PRECISION_BYTES)),
48 ("ask_size", DataType::FixedSizeBinary(PRECISION_BYTES)),
49 ("bid_count", DataType::UInt32),
50 ("ask_count", DataType::UInt32),
51 ]
52}
53
54impl ArrowSchemaProvider for OrderBookDepth10 {
55 fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema {
56 let mut fields = Vec::new();
57 let field_data = get_field_data();
58
59 for (name, data_type) in field_data {
62 for i in 0..DEPTH10_LEN {
63 fields.push(Field::new(format!("{name}_{i}"), data_type.clone(), false));
64 }
65 }
66
67 fields.push(Field::new("flags", DataType::UInt8, false));
68 fields.push(Field::new("sequence", DataType::UInt64, false));
69 fields.push(Field::new("ts_event", DataType::UInt64, false));
70 fields.push(Field::new("ts_init", DataType::UInt64, false));
71
72 match metadata {
73 Some(metadata) => Schema::new_with_metadata(fields, metadata),
74 None => Schema::new(fields),
75 }
76 }
77}
78
79fn parse_metadata(
80 metadata: &HashMap<String, String>,
81) -> Result<(InstrumentId, u8, u8), EncodingError> {
82 let instrument_id_str = metadata
83 .get(KEY_INSTRUMENT_ID)
84 .ok_or_else(|| EncodingError::MissingMetadata(KEY_INSTRUMENT_ID))?;
85 let instrument_id = InstrumentId::from_str(instrument_id_str)
86 .map_err(|e| EncodingError::ParseError(KEY_INSTRUMENT_ID, e.to_string()))?;
87
88 let price_precision = metadata
89 .get(KEY_PRICE_PRECISION)
90 .ok_or_else(|| EncodingError::MissingMetadata(KEY_PRICE_PRECISION))?
91 .parse::<u8>()
92 .map_err(|e| EncodingError::ParseError(KEY_PRICE_PRECISION, e.to_string()))?;
93
94 let size_precision = metadata
95 .get(KEY_SIZE_PRECISION)
96 .ok_or_else(|| EncodingError::MissingMetadata(KEY_SIZE_PRECISION))?
97 .parse::<u8>()
98 .map_err(|e| EncodingError::ParseError(KEY_SIZE_PRECISION, e.to_string()))?;
99
100 Ok((instrument_id, price_precision, size_precision))
101}
102
103impl EncodeToRecordBatch for OrderBookDepth10 {
104 fn encode_batch(
105 metadata: &HashMap<String, String>,
106 data: &[Self],
107 ) -> Result<RecordBatch, ArrowError> {
108 let mut bid_price_builders = Vec::with_capacity(DEPTH10_LEN);
109 let mut ask_price_builders = Vec::with_capacity(DEPTH10_LEN);
110 let mut bid_size_builders = Vec::with_capacity(DEPTH10_LEN);
111 let mut ask_size_builders = Vec::with_capacity(DEPTH10_LEN);
112 let mut bid_count_builders = Vec::with_capacity(DEPTH10_LEN);
113 let mut ask_count_builders = Vec::with_capacity(DEPTH10_LEN);
114
115 for _ in 0..DEPTH10_LEN {
116 bid_price_builders.push(FixedSizeBinaryBuilder::with_capacity(
117 data.len(),
118 PRECISION_BYTES,
119 ));
120 ask_price_builders.push(FixedSizeBinaryBuilder::with_capacity(
121 data.len(),
122 PRECISION_BYTES,
123 ));
124 bid_size_builders.push(FixedSizeBinaryBuilder::with_capacity(
125 data.len(),
126 PRECISION_BYTES,
127 ));
128 ask_size_builders.push(FixedSizeBinaryBuilder::with_capacity(
129 data.len(),
130 PRECISION_BYTES,
131 ));
132 bid_count_builders.push(UInt32Array::builder(data.len()));
133 ask_count_builders.push(UInt32Array::builder(data.len()));
134 }
135
136 let mut flags_builder = UInt8Array::builder(data.len());
137 let mut sequence_builder = UInt64Array::builder(data.len());
138 let mut ts_event_builder = UInt64Array::builder(data.len());
139 let mut ts_init_builder = UInt64Array::builder(data.len());
140
141 for depth in data {
142 for i in 0..DEPTH10_LEN {
143 bid_price_builders[i]
144 .append_value(depth.bids[i].price.raw.to_le_bytes())
145 .unwrap();
146 ask_price_builders[i]
147 .append_value(depth.asks[i].price.raw.to_le_bytes())
148 .unwrap();
149 bid_size_builders[i]
150 .append_value(depth.bids[i].size.raw.to_le_bytes())
151 .unwrap();
152 ask_size_builders[i]
153 .append_value(depth.asks[i].size.raw.to_le_bytes())
154 .unwrap();
155 bid_count_builders[i].append_value(depth.bid_counts[i]);
156 ask_count_builders[i].append_value(depth.ask_counts[i]);
157 }
158
159 flags_builder.append_value(depth.flags);
160 sequence_builder.append_value(depth.sequence);
161 ts_event_builder.append_value(depth.ts_event.as_u64());
162 ts_init_builder.append_value(depth.ts_init.as_u64());
163 }
164
165 let bid_price_arrays = bid_price_builders
166 .into_iter()
167 .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
168 .collect::<Vec<_>>();
169 let ask_price_arrays = ask_price_builders
170 .into_iter()
171 .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
172 .collect::<Vec<_>>();
173 let bid_size_arrays = bid_size_builders
174 .into_iter()
175 .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
176 .collect::<Vec<_>>();
177 let ask_size_arrays = ask_size_builders
178 .into_iter()
179 .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
180 .collect::<Vec<_>>();
181 let bid_count_arrays = bid_count_builders
182 .into_iter()
183 .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
184 .collect::<Vec<_>>();
185 let ask_count_arrays = ask_count_builders
186 .into_iter()
187 .map(|mut b| Arc::new(b.finish()) as Arc<dyn Array>)
188 .collect::<Vec<_>>();
189
190 let flags_array = Arc::new(flags_builder.finish()) as Arc<dyn Array>;
191 let sequence_array = Arc::new(sequence_builder.finish()) as Arc<dyn Array>;
192 let ts_event_array = Arc::new(ts_event_builder.finish()) as Arc<dyn Array>;
193 let ts_init_array = Arc::new(ts_init_builder.finish()) as Arc<dyn Array>;
194
195 let mut columns = Vec::new();
196 columns.extend(bid_price_arrays);
197 columns.extend(ask_price_arrays);
198 columns.extend(bid_size_arrays);
199 columns.extend(ask_size_arrays);
200 columns.extend(bid_count_arrays);
201 columns.extend(ask_count_arrays);
202 columns.push(flags_array);
203 columns.push(sequence_array);
204 columns.push(ts_event_array);
205 columns.push(ts_init_array);
206
207 RecordBatch::try_new(Self::get_schema(Some(metadata.clone())).into(), columns)
208 }
209
210 fn metadata(&self) -> HashMap<String, String> {
211 Self::get_metadata(
212 &self.instrument_id,
213 self.bids[0].price.precision,
214 self.bids[0].size.precision,
215 )
216 }
217}
218
219impl DecodeFromRecordBatch for OrderBookDepth10 {
220 fn decode_batch(
221 metadata: &HashMap<String, String>,
222 record_batch: RecordBatch,
223 ) -> Result<Vec<Self>, EncodingError> {
224 let (instrument_id, price_precision, size_precision) = parse_metadata(metadata)?;
225 let cols = record_batch.columns();
226
227 let mut bid_prices = Vec::with_capacity(DEPTH10_LEN);
228 let mut ask_prices = Vec::with_capacity(DEPTH10_LEN);
229 let mut bid_sizes = Vec::with_capacity(DEPTH10_LEN);
230 let mut ask_sizes = Vec::with_capacity(DEPTH10_LEN);
231 let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
232 let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
233
234 macro_rules! extract_depth_column {
235 ($array:ty, $name:literal, $i:expr, $offset:expr, $type:expr) => {
236 extract_column::<$array>(cols, concat!($name, "_", stringify!($i)), $offset, $type)?
237 };
238 }
239
240 for i in 0..DEPTH10_LEN {
241 bid_prices.push(extract_depth_column!(
242 FixedSizeBinaryArray,
243 "bid_price",
244 i,
245 i,
246 DataType::FixedSizeBinary(PRECISION_BYTES)
247 ));
248 ask_prices.push(extract_depth_column!(
249 FixedSizeBinaryArray,
250 "ask_price",
251 i,
252 DEPTH10_LEN + i,
253 DataType::FixedSizeBinary(PRECISION_BYTES)
254 ));
255 bid_sizes.push(extract_depth_column!(
256 FixedSizeBinaryArray,
257 "bid_size",
258 i,
259 2 * DEPTH10_LEN + i,
260 DataType::FixedSizeBinary(PRECISION_BYTES)
261 ));
262 ask_sizes.push(extract_depth_column!(
263 FixedSizeBinaryArray,
264 "ask_size",
265 i,
266 3 * DEPTH10_LEN + i,
267 DataType::FixedSizeBinary(PRECISION_BYTES)
268 ));
269 bid_counts.push(extract_depth_column!(
270 UInt32Array,
271 "bid_count",
272 i,
273 4 * DEPTH10_LEN + i,
274 DataType::UInt32
275 ));
276 ask_counts.push(extract_depth_column!(
277 UInt32Array,
278 "ask_count",
279 i,
280 5 * DEPTH10_LEN + i,
281 DataType::UInt32
282 ));
283 }
284
285 for i in 0..DEPTH10_LEN {
286 validate_precision_bytes(bid_prices[i], "bid_price")?;
287 validate_precision_bytes(ask_prices[i], "ask_price")?;
288 validate_precision_bytes(bid_sizes[i], "bid_size")?;
289 validate_precision_bytes(ask_sizes[i], "ask_size")?;
290 }
291
292 let flags = extract_column::<UInt8Array>(cols, "flags", 6 * DEPTH10_LEN, DataType::UInt8)?;
293 let sequence =
294 extract_column::<UInt64Array>(cols, "sequence", 6 * DEPTH10_LEN + 1, DataType::UInt64)?;
295 let ts_event =
296 extract_column::<UInt64Array>(cols, "ts_event", 6 * DEPTH10_LEN + 2, DataType::UInt64)?;
297 let ts_init =
298 extract_column::<UInt64Array>(cols, "ts_init", 6 * DEPTH10_LEN + 3, DataType::UInt64)?;
299
300 let result: Result<Vec<Self>, EncodingError> = (0..record_batch.num_rows())
302 .map(|row| {
303 let mut bids = [BookOrder::default(); DEPTH10_LEN];
304 let mut asks = [BookOrder::default(); DEPTH10_LEN];
305 let mut bid_count_arr = [0u32; DEPTH10_LEN];
306 let mut ask_count_arr = [0u32; DEPTH10_LEN];
307
308 for i in 0..DEPTH10_LEN {
309 let bid_price_bytes = bid_prices[i].value(row);
313 let bid_size_bytes = bid_sizes[i].value(row);
314 if get_raw_price(bid_price_bytes) == PRICE_UNDEF
315 || get_raw_quantity(bid_size_bytes) == QUANTITY_UNDEF
316 {
317 bids[i] = BookOrder::default();
318 } else {
319 let bid_price =
320 decode_price(bid_price_bytes, price_precision, "bid_price", row)?;
321 let bid_size =
322 decode_quantity(bid_size_bytes, size_precision, "bid_size", row)?;
323 bids[i] = BookOrder::new(OrderSide::Buy, bid_price, bid_size, 0);
324 }
325
326 let ask_price_bytes = ask_prices[i].value(row);
327 let ask_size_bytes = ask_sizes[i].value(row);
328 if get_raw_price(ask_price_bytes) == PRICE_UNDEF
329 || get_raw_quantity(ask_size_bytes) == QUANTITY_UNDEF
330 {
331 asks[i] = BookOrder::default();
332 } else {
333 let ask_price =
334 decode_price(ask_price_bytes, price_precision, "ask_price", row)?;
335 let ask_size =
336 decode_quantity(ask_size_bytes, size_precision, "ask_size", row)?;
337 asks[i] = BookOrder::new(OrderSide::Sell, ask_price, ask_size, 0);
338 }
339
340 bid_count_arr[i] = bid_counts[i].value(row);
341 ask_count_arr[i] = ask_counts[i].value(row);
342 }
343
344 Ok(Self {
345 instrument_id,
346 bids,
347 asks,
348 bid_counts: bid_count_arr,
349 ask_counts: ask_count_arr,
350 flags: flags.value(row),
351 sequence: sequence.value(row),
352 ts_event: ts_event.value(row).into(),
353 ts_init: ts_init.value(row).into(),
354 })
355 })
356 .collect();
357
358 result
359 }
360}
361
362impl DecodeDataFromRecordBatch for OrderBookDepth10 {
363 fn decode_data_batch(
364 metadata: &HashMap<String, String>,
365 record_batch: RecordBatch,
366 ) -> Result<Vec<Data>, EncodingError> {
367 let depths: Vec<Self> = Self::decode_batch(metadata, record_batch)?;
368 Ok(depths.into_iter().map(Data::from).collect())
369 }
370}
371
372#[cfg(test)]
373mod tests {
374 use arrow::datatypes::{DataType, Field};
375 use nautilus_model::{
376 data::stubs::stub_depth10,
377 types::{Price, Quantity, fixed::FIXED_SCALAR, price::PriceRaw, quantity::QuantityRaw},
378 };
379 use pretty_assertions::assert_eq;
380 use rstest::rstest;
381
382 use super::*;
383 use crate::arrow::{get_raw_price, get_raw_quantity};
384
385 #[rstest]
386 fn test_get_schema() {
387 let instrument_id = InstrumentId::from("AAPL.XNAS");
388 let metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
389 let schema = OrderBookDepth10::get_schema(Some(metadata));
390
391 let mut group_count = 0;
392 let field_data = get_field_data();
393 for (name, data_type) in field_data {
394 for i in 0..DEPTH10_LEN {
395 let field = schema.field(i + group_count * DEPTH10_LEN).clone();
396 assert_eq!(
397 field,
398 Field::new(format!("{name}_{i}"), data_type.clone(), false)
399 );
400 }
401
402 group_count += 1;
403 }
404
405 let flags_field = schema.field(group_count * DEPTH10_LEN).clone();
406 assert_eq!(flags_field, Field::new("flags", DataType::UInt8, false));
407 let sequence_field = schema.field(group_count * DEPTH10_LEN + 1).clone();
408 assert_eq!(
409 sequence_field,
410 Field::new("sequence", DataType::UInt64, false)
411 );
412 let ts_event_field = schema.field(group_count * DEPTH10_LEN + 2).clone();
413 assert_eq!(
414 ts_event_field,
415 Field::new("ts_event", DataType::UInt64, false)
416 );
417 let ts_init_field = schema.field(group_count * DEPTH10_LEN + 3).clone();
418 assert_eq!(
419 ts_init_field,
420 Field::new("ts_init", DataType::UInt64, false)
421 );
422
423 assert_eq!(schema.metadata()["instrument_id"], "AAPL.XNAS");
424 assert_eq!(schema.metadata()["price_precision"], "2");
425 assert_eq!(schema.metadata()["size_precision"], "0");
426 }
427
428 #[rstest]
429 fn test_get_schema_map() {
430 let schema_map = OrderBookDepth10::get_schema_map();
431
432 let field_data = get_field_data();
433 for (name, data_type) in field_data {
434 for i in 0..DEPTH10_LEN {
435 let field = schema_map.get(&format!("{name}_{i}")).map(String::as_str);
436 assert_eq!(field, Some(format!("{data_type:?}").as_str()));
437 }
438 }
439
440 assert_eq!(schema_map.get("flags").map(String::as_str), Some("UInt8"));
441 assert_eq!(
442 schema_map.get("sequence").map(String::as_str),
443 Some("UInt64")
444 );
445 assert_eq!(
446 schema_map.get("ts_event").map(String::as_str),
447 Some("UInt64")
448 );
449 assert_eq!(
450 schema_map.get("ts_init").map(String::as_str),
451 Some("UInt64")
452 );
453 }
454
455 #[rstest]
456 fn test_encode_batch(stub_depth10: OrderBookDepth10) {
457 let instrument_id = InstrumentId::from("AAPL.XNAS");
458 let price_precision = 2;
459 let metadata = OrderBookDepth10::get_metadata(&instrument_id, price_precision, 0);
460
461 let data = vec![stub_depth10];
462 let record_batch = OrderBookDepth10::encode_batch(&metadata, &data).unwrap();
463 let columns = record_batch.columns();
464
465 assert_eq!(columns.len(), DEPTH10_LEN * 6 + 4);
466
467 let bid_prices: Vec<_> = (0..DEPTH10_LEN)
469 .map(|i| {
470 columns[i]
471 .as_any()
472 .downcast_ref::<FixedSizeBinaryArray>()
473 .unwrap()
474 })
475 .collect();
476
477 let expected_bid_prices: Vec<f64> =
478 vec![99.0, 98.0, 97.0, 96.0, 95.0, 94.0, 93.0, 92.0, 91.0, 90.0];
479
480 for (i, bid_price) in bid_prices.iter().enumerate() {
481 assert_eq!(bid_price.len(), 1);
482 assert_eq!(
483 get_raw_price(bid_price.value(0)),
484 (expected_bid_prices[i] * FIXED_SCALAR) as PriceRaw
485 );
486 assert_eq!(
487 Price::from_raw(get_raw_price(bid_price.value(0)), price_precision).as_f64(),
488 expected_bid_prices[i]
489 );
490 }
491
492 let ask_prices: Vec<_> = (0..DEPTH10_LEN)
494 .map(|i| {
495 columns[DEPTH10_LEN + i]
496 .as_any()
497 .downcast_ref::<FixedSizeBinaryArray>()
498 .unwrap()
499 })
500 .collect();
501
502 let expected_ask_prices: Vec<f64> = vec![
503 100.0, 101.0, 102.0, 103.0, 104.0, 105.0, 106.0, 107.0, 108.0, 109.0,
504 ];
505
506 for (i, ask_price) in ask_prices.iter().enumerate() {
507 assert_eq!(ask_price.len(), 1);
508 assert_eq!(
509 get_raw_price(ask_price.value(0)),
510 (expected_ask_prices[i] * FIXED_SCALAR) as PriceRaw
511 );
512 assert_eq!(
513 Price::from_raw(get_raw_price(ask_price.value(0)), price_precision).as_f64(),
514 expected_ask_prices[i]
515 );
516 }
517
518 let bid_sizes: Vec<_> = (0..DEPTH10_LEN)
520 .map(|i| {
521 columns[2 * DEPTH10_LEN + i]
522 .as_any()
523 .downcast_ref::<FixedSizeBinaryArray>()
524 .unwrap()
525 })
526 .collect();
527
528 for (i, bid_size) in bid_sizes.iter().enumerate() {
529 assert_eq!(bid_size.len(), 1);
530 assert_eq!(
531 get_raw_quantity(bid_size.value(0)),
532 ((100.0 * FIXED_SCALAR * (i + 1) as f64) as QuantityRaw)
533 );
534 }
535
536 let ask_sizes: Vec<_> = (0..DEPTH10_LEN)
538 .map(|i| {
539 columns[3 * DEPTH10_LEN + i]
540 .as_any()
541 .downcast_ref::<FixedSizeBinaryArray>()
542 .unwrap()
543 })
544 .collect();
545
546 for (i, ask_size) in ask_sizes.iter().enumerate() {
547 assert_eq!(ask_size.len(), 1);
548 assert_eq!(
549 get_raw_quantity(ask_size.value(0)),
550 ((100.0 * FIXED_SCALAR * ((i + 1) as f64)) as QuantityRaw)
551 );
552 }
553
554 let bid_counts: Vec<_> = (0..DEPTH10_LEN)
556 .map(|i| {
557 columns[4 * DEPTH10_LEN + i]
558 .as_any()
559 .downcast_ref::<UInt32Array>()
560 .unwrap()
561 })
562 .collect();
563
564 for count_values in bid_counts {
565 assert_eq!(count_values.len(), 1);
566 assert_eq!(count_values.value(0), 1);
567 }
568
569 let ask_counts: Vec<_> = (0..DEPTH10_LEN)
571 .map(|i| {
572 columns[5 * DEPTH10_LEN + i]
573 .as_any()
574 .downcast_ref::<UInt32Array>()
575 .unwrap()
576 })
577 .collect();
578
579 for count_values in ask_counts {
580 assert_eq!(count_values.len(), 1);
581 assert_eq!(count_values.value(0), 1);
582 }
583
584 let flags_values = columns[6 * DEPTH10_LEN]
586 .as_any()
587 .downcast_ref::<UInt8Array>()
588 .unwrap();
589 let sequence_values = columns[6 * DEPTH10_LEN + 1]
590 .as_any()
591 .downcast_ref::<UInt64Array>()
592 .unwrap();
593 let ts_event_values = columns[6 * DEPTH10_LEN + 2]
594 .as_any()
595 .downcast_ref::<UInt64Array>()
596 .unwrap();
597 let ts_init_values = columns[6 * DEPTH10_LEN + 3]
598 .as_any()
599 .downcast_ref::<UInt64Array>()
600 .unwrap();
601
602 assert_eq!(flags_values.len(), 1);
603 assert_eq!(flags_values.value(0), 0);
604 assert_eq!(sequence_values.len(), 1);
605 assert_eq!(sequence_values.value(0), 0);
606 assert_eq!(ts_event_values.len(), 1);
607 assert_eq!(ts_event_values.value(0), 1);
608 assert_eq!(ts_init_values.len(), 1);
609 assert_eq!(ts_init_values.value(0), 2);
610 }
611
612 #[rstest]
613 fn test_decode_batch(stub_depth10: OrderBookDepth10) {
614 let instrument_id = InstrumentId::from("AAPL.XNAS");
615 let metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
616
617 let data = vec![stub_depth10];
618 let record_batch = OrderBookDepth10::encode_batch(&metadata, &data).unwrap();
619 let decoded_data = OrderBookDepth10::decode_batch(&metadata, record_batch).unwrap();
620
621 assert_eq!(decoded_data.len(), 1);
622 }
623
624 #[rstest]
625 fn test_decode_batch_missing_instrument_id_returns_error(stub_depth10: OrderBookDepth10) {
626 let instrument_id = InstrumentId::from("AAPL.XNAS");
627 let mut metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
628 let record_batch = OrderBookDepth10::encode_batch(&metadata, &[stub_depth10]).unwrap();
629
630 metadata.remove(KEY_INSTRUMENT_ID);
631
632 let result = OrderBookDepth10::decode_batch(&metadata, record_batch);
633 assert!(result.is_err());
634 let err = result.unwrap_err();
635 assert!(
636 err.to_string().contains("instrument_id"),
637 "Expected missing instrument_id error, was: {err}"
638 );
639 }
640
641 #[rstest]
642 fn test_decode_batch_missing_price_precision_returns_error(stub_depth10: OrderBookDepth10) {
643 let instrument_id = InstrumentId::from("AAPL.XNAS");
644 let mut metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
645 let record_batch = OrderBookDepth10::encode_batch(&metadata, &[stub_depth10]).unwrap();
646
647 metadata.remove(KEY_PRICE_PRECISION);
648
649 let result = OrderBookDepth10::decode_batch(&metadata, record_batch);
650 assert!(result.is_err());
651 let err = result.unwrap_err();
652 assert!(
653 err.to_string().contains("price_precision"),
654 "Expected missing price_precision error, was: {err}"
655 );
656 }
657
658 #[rstest]
659 fn test_encode_decode_round_trip(stub_depth10: OrderBookDepth10) {
660 let instrument_id = InstrumentId::from("AAPL.XNAS");
661 let metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0);
662
663 let original = vec![stub_depth10];
664 let record_batch = OrderBookDepth10::encode_batch(&metadata, &original).unwrap();
665 let decoded = OrderBookDepth10::decode_batch(&metadata, record_batch).unwrap();
666
667 assert_eq!(decoded.len(), original.len());
668 let orig = &original[0];
669 let dec = &decoded[0];
670
671 assert_eq!(dec.instrument_id, orig.instrument_id);
672 assert_eq!(dec.flags, orig.flags);
673 assert_eq!(dec.sequence, orig.sequence);
674 assert_eq!(dec.ts_event, orig.ts_event);
675 assert_eq!(dec.ts_init, orig.ts_init);
676
677 for i in 0..DEPTH10_LEN {
678 assert_eq!(
679 dec.bids[i].price, orig.bids[i].price,
680 "bid price mismatch at level {i}"
681 );
682 assert_eq!(
683 dec.bids[i].size, orig.bids[i].size,
684 "bid size mismatch at level {i}"
685 );
686 assert_eq!(
687 dec.asks[i].price, orig.asks[i].price,
688 "ask price mismatch at level {i}"
689 );
690 assert_eq!(
691 dec.asks[i].size, orig.asks[i].size,
692 "ask size mismatch at level {i}"
693 );
694 }
695 }
696
697 #[rstest]
701 #[case::price_only(true, false)]
702 #[case::size_only(false, true)]
703 #[case::both(true, true)]
704 #[case::neither(false, false)]
705 fn test_decode_batch_with_undefined_levels(
706 stub_depth10: OrderBookDepth10,
707 #[case] price_undef: bool,
708 #[case] size_undef: bool,
709 ) {
710 let instrument_id = InstrumentId::from("AAPL.XNAS");
711 let price_precision = 2;
712 let size_precision = 0;
713 let metadata =
714 OrderBookDepth10::get_metadata(&instrument_id, price_precision, size_precision);
715
716 let mut depth = stub_depth10;
717 let original_bid = depth.bids[5];
718 let original_ask = depth.asks[7];
719 let sentinel_bid_price = if price_undef {
720 Price::from_raw(PRICE_UNDEF, 0)
721 } else {
722 original_bid.price
723 };
724 let sentinel_bid_size = if size_undef {
725 Quantity::from_raw(QUANTITY_UNDEF, 0)
726 } else {
727 original_bid.size
728 };
729 depth.bids[5] = BookOrder {
730 side: OrderSide::Buy,
731 price: sentinel_bid_price,
732 size: sentinel_bid_size,
733 order_id: 0,
734 };
735 let sentinel_ask_price = if price_undef {
736 Price::from_raw(PRICE_UNDEF, 0)
737 } else {
738 original_ask.price
739 };
740 let sentinel_ask_size = if size_undef {
741 Quantity::from_raw(QUANTITY_UNDEF, 0)
742 } else {
743 original_ask.size
744 };
745 depth.asks[7] = BookOrder {
746 side: OrderSide::Sell,
747 price: sentinel_ask_price,
748 size: sentinel_ask_size,
749 order_id: 0,
750 };
751
752 let record_batch = OrderBookDepth10::encode_batch(&metadata, &[depth]).unwrap();
753 let decoded = OrderBookDepth10::decode_batch(&metadata, record_batch).unwrap();
754
755 assert_eq!(decoded.len(), 1);
756 let decoded = &decoded[0];
757
758 let expect_null = price_undef || size_undef;
759 if expect_null {
760 assert_eq!(decoded.bids[5].side, OrderSide::NoOrderSide);
761 assert_eq!(decoded.bids[5].price.raw, 0);
762 assert_eq!(decoded.bids[5].price.precision, 0);
763 assert_eq!(decoded.bids[5].size.raw, 0);
764 assert_eq!(decoded.bids[5].size.precision, 0);
765
766 assert_eq!(decoded.asks[7].side, OrderSide::NoOrderSide);
767 assert_eq!(decoded.asks[7].price.raw, 0);
768 assert_eq!(decoded.asks[7].price.precision, 0);
769 assert_eq!(decoded.asks[7].size.raw, 0);
770 assert_eq!(decoded.asks[7].size.precision, 0);
771 } else {
772 assert_eq!(decoded.bids[5].side, OrderSide::Buy);
773 assert_eq!(decoded.bids[5].price, original_bid.price);
774 assert_eq!(decoded.bids[5].size, original_bid.size);
775 assert_eq!(decoded.asks[7].side, OrderSide::Sell);
776 assert_eq!(decoded.asks[7].price, original_ask.price);
777 assert_eq!(decoded.asks[7].size, original_ask.size);
778 }
779
780 assert_eq!(decoded.bids[0].side, OrderSide::Buy);
782 assert_eq!(decoded.bids[0].price.precision, price_precision);
783 assert_eq!(decoded.bids[0].size.precision, size_precision);
784 assert_eq!(decoded.asks[0].side, OrderSide::Sell);
785 assert_eq!(decoded.asks[0].price.precision, price_precision);
786 assert_eq!(decoded.asks[0].size.precision, size_precision);
787 }
788}