1use std::{collections::HashMap, str::FromStr, sync::Arc};
17
18use arrow::{
19 array::{FixedSizeBinaryArray, FixedSizeBinaryBuilder, UInt8Array, UInt64Array},
20 datatypes::{DataType, Field, Schema},
21 error::ArrowError,
22 record_batch::RecordBatch,
23};
24use nautilus_model::{
25 data::{BookOrder, OrderBookDelta},
26 enums::{BookAction, FromU8, OrderSide},
27 identifiers::InstrumentId,
28 types::fixed::PRECISION_BYTES,
29};
30
31use super::{
32 DecodeDataFromRecordBatch, EncodingError, KEY_INSTRUMENT_ID, KEY_PRICE_PRECISION,
33 KEY_SIZE_PRECISION, decode_price_with_sentinel, decode_quantity_with_sentinel, extract_column,
34};
35use crate::arrow::{ArrowSchemaProvider, Data, DecodeFromRecordBatch, EncodeToRecordBatch};
36
37impl ArrowSchemaProvider for OrderBookDelta {
38 fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema {
39 let fields = vec![
40 Field::new("action", DataType::UInt8, false),
41 Field::new("side", DataType::UInt8, false),
42 Field::new("price", DataType::FixedSizeBinary(PRECISION_BYTES), false),
43 Field::new("size", DataType::FixedSizeBinary(PRECISION_BYTES), false),
44 Field::new("order_id", DataType::UInt64, false),
45 Field::new("flags", DataType::UInt8, false),
46 Field::new("sequence", DataType::UInt64, false),
47 Field::new("ts_event", DataType::UInt64, false),
48 Field::new("ts_init", DataType::UInt64, false),
49 ];
50
51 match metadata {
52 Some(metadata) => Schema::new_with_metadata(fields, metadata),
53 None => Schema::new(fields),
54 }
55 }
56}
57
58fn parse_metadata(
59 metadata: &HashMap<String, String>,
60) -> Result<(InstrumentId, u8, u8), EncodingError> {
61 let instrument_id_str = metadata
62 .get(KEY_INSTRUMENT_ID)
63 .ok_or_else(|| EncodingError::MissingMetadata(KEY_INSTRUMENT_ID))?;
64 let instrument_id = InstrumentId::from_str(instrument_id_str)
65 .map_err(|e| EncodingError::ParseError(KEY_INSTRUMENT_ID, e.to_string()))?;
66
67 let price_precision = metadata
68 .get(KEY_PRICE_PRECISION)
69 .ok_or_else(|| EncodingError::MissingMetadata(KEY_PRICE_PRECISION))?
70 .parse::<u8>()
71 .map_err(|e| EncodingError::ParseError(KEY_PRICE_PRECISION, e.to_string()))?;
72
73 let size_precision = metadata
74 .get(KEY_SIZE_PRECISION)
75 .ok_or_else(|| EncodingError::MissingMetadata(KEY_SIZE_PRECISION))?
76 .parse::<u8>()
77 .map_err(|e| EncodingError::ParseError(KEY_SIZE_PRECISION, e.to_string()))?;
78
79 Ok((instrument_id, price_precision, size_precision))
80}
81
82impl EncodeToRecordBatch for OrderBookDelta {
83 fn encode_batch(
84 metadata: &HashMap<String, String>,
85 data: &[Self],
86 ) -> Result<RecordBatch, ArrowError> {
87 let mut action_builder = UInt8Array::builder(data.len());
88 let mut side_builder = UInt8Array::builder(data.len());
89 let mut price_builder = FixedSizeBinaryBuilder::with_capacity(data.len(), PRECISION_BYTES);
90 let mut size_builder = FixedSizeBinaryBuilder::with_capacity(data.len(), PRECISION_BYTES);
91 let mut order_id_builder = UInt64Array::builder(data.len());
92 let mut flags_builder = UInt8Array::builder(data.len());
93 let mut sequence_builder = UInt64Array::builder(data.len());
94 let mut ts_event_builder = UInt64Array::builder(data.len());
95 let mut ts_init_builder = UInt64Array::builder(data.len());
96
97 for delta in data {
98 action_builder.append_value(delta.action as u8);
99 side_builder.append_value(delta.order.side as u8);
100 price_builder
101 .append_value(delta.order.price.raw.to_le_bytes())
102 .unwrap();
103 size_builder
104 .append_value(delta.order.size.raw.to_le_bytes())
105 .unwrap();
106 order_id_builder.append_value(delta.order.order_id);
107 flags_builder.append_value(delta.flags);
108 sequence_builder.append_value(delta.sequence);
109 ts_event_builder.append_value(delta.ts_event.as_u64());
110 ts_init_builder.append_value(delta.ts_init.as_u64());
111 }
112
113 let action_array = action_builder.finish();
114 let side_array = side_builder.finish();
115 let price_array = price_builder.finish();
116 let size_array = size_builder.finish();
117 let order_id_array = order_id_builder.finish();
118 let flags_array = flags_builder.finish();
119 let sequence_array = sequence_builder.finish();
120 let ts_event_array = ts_event_builder.finish();
121 let ts_init_array = ts_init_builder.finish();
122
123 RecordBatch::try_new(
124 Self::get_schema(Some(metadata.clone())).into(),
125 vec![
126 Arc::new(action_array),
127 Arc::new(side_array),
128 Arc::new(price_array),
129 Arc::new(size_array),
130 Arc::new(order_id_array),
131 Arc::new(flags_array),
132 Arc::new(sequence_array),
133 Arc::new(ts_event_array),
134 Arc::new(ts_init_array),
135 ],
136 )
137 }
138
139 fn metadata(&self) -> HashMap<String, String> {
140 Self::get_metadata(
141 &self.instrument_id,
142 self.order.price.precision,
143 self.order.size.precision,
144 )
145 }
146
147 fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
151 let delta = chunk
152 .first()
153 .expect("Chunk should have at least one element to encode");
154
155 if delta.order.price.precision == 0
156 && delta.order.size.precision == 0
157 && let Some(delta) = chunk.get(1)
158 {
159 return EncodeToRecordBatch::metadata(delta);
160 }
161
162 EncodeToRecordBatch::metadata(delta)
163 }
164}
165
166impl DecodeFromRecordBatch for OrderBookDelta {
167 fn decode_batch(
168 metadata: &HashMap<String, String>,
169 record_batch: RecordBatch,
170 ) -> Result<Vec<Self>, EncodingError> {
171 let (instrument_id, price_precision, size_precision) = parse_metadata(metadata)?;
172 let cols = record_batch.columns();
173
174 let action_values = extract_column::<UInt8Array>(cols, "action", 0, DataType::UInt8)?;
175 let side_values = extract_column::<UInt8Array>(cols, "side", 1, DataType::UInt8)?;
176 let price_values = extract_column::<FixedSizeBinaryArray>(
177 cols,
178 "price",
179 2,
180 DataType::FixedSizeBinary(PRECISION_BYTES),
181 )?;
182 let size_values = extract_column::<FixedSizeBinaryArray>(
183 cols,
184 "size",
185 3,
186 DataType::FixedSizeBinary(PRECISION_BYTES),
187 )?;
188 let order_id_values = extract_column::<UInt64Array>(cols, "order_id", 4, DataType::UInt64)?;
189 let flags_values = extract_column::<UInt8Array>(cols, "flags", 5, DataType::UInt8)?;
190 let sequence_values = extract_column::<UInt64Array>(cols, "sequence", 6, DataType::UInt64)?;
191 let ts_event_values = extract_column::<UInt64Array>(cols, "ts_event", 7, DataType::UInt64)?;
192 let ts_init_values = extract_column::<UInt64Array>(cols, "ts_init", 8, DataType::UInt64)?;
193
194 if price_values.value_length() != PRECISION_BYTES {
195 return Err(EncodingError::ParseError(
196 "price",
197 format!(
198 "Invalid value length: expected {PRECISION_BYTES}, found {}",
199 price_values.value_length()
200 ),
201 ));
202 }
203
204 let result: Result<Vec<Self>, EncodingError> = (0..record_batch.num_rows())
205 .map(|i| {
206 let action_value = action_values.value(i);
207 let action = BookAction::from_u8(action_value).ok_or_else(|| {
208 EncodingError::ParseError(
209 stringify!(BookAction),
210 format!("Invalid enum value, was {action_value}"),
211 )
212 })?;
213 let side_value = side_values.value(i);
214 let side = OrderSide::from_u8(side_value).ok_or_else(|| {
215 EncodingError::ParseError(
216 stringify!(OrderSide),
217 format!("Invalid enum value, was {side_value}"),
218 )
219 })?;
220 let price =
221 decode_price_with_sentinel(price_values.value(i), price_precision, "price", i)?;
222 let size =
223 decode_quantity_with_sentinel(size_values.value(i), size_precision, "size", i)?;
224 let order_id = order_id_values.value(i);
225 let flags = flags_values.value(i);
226 let sequence = sequence_values.value(i);
227 let ts_event = ts_event_values.value(i).into();
228 let ts_init = ts_init_values.value(i).into();
229
230 Ok(Self {
231 instrument_id,
232 action,
233 order: BookOrder {
234 side,
235 price,
236 size,
237 order_id,
238 },
239 flags,
240 sequence,
241 ts_event,
242 ts_init,
243 })
244 })
245 .collect();
246
247 result
248 }
249}
250
251impl DecodeDataFromRecordBatch for OrderBookDelta {
252 fn decode_data_batch(
253 metadata: &HashMap<String, String>,
254 record_batch: RecordBatch,
255 ) -> Result<Vec<Data>, EncodingError> {
256 let deltas: Vec<Self> = Self::decode_batch(metadata, record_batch)?;
257 Ok(deltas.into_iter().map(Data::from).collect())
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use std::sync::Arc;
264
265 use arrow::{array::Array, record_batch::RecordBatch};
266 use nautilus_model::types::{
267 Price, Quantity,
268 fixed::FIXED_SCALAR,
269 price::{PRICE_UNDEF, PriceRaw},
270 quantity::{QUANTITY_UNDEF, QuantityRaw},
271 };
272 use pretty_assertions::assert_eq;
273 use rstest::rstest;
274
275 use super::*;
276 use crate::arrow::get_raw_price;
277
278 #[rstest]
279 fn test_get_schema() {
280 let instrument_id = InstrumentId::from("AAPL.XNAS");
281 let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
282 let schema = OrderBookDelta::get_schema(Some(metadata.clone()));
283
284 let expected_fields = vec![
285 Field::new("action", DataType::UInt8, false),
286 Field::new("side", DataType::UInt8, false),
287 Field::new("price", DataType::FixedSizeBinary(PRECISION_BYTES), false),
288 Field::new("size", DataType::FixedSizeBinary(PRECISION_BYTES), false),
289 Field::new("order_id", DataType::UInt64, false),
290 Field::new("flags", DataType::UInt8, false),
291 Field::new("sequence", DataType::UInt64, false),
292 Field::new("ts_event", DataType::UInt64, false),
293 Field::new("ts_init", DataType::UInt64, false),
294 ];
295
296 let expected_schema = Schema::new_with_metadata(expected_fields, metadata);
297 assert_eq!(schema, expected_schema);
298 }
299
300 #[rstest]
301 fn test_get_schema_map() {
302 let schema_map = OrderBookDelta::get_schema_map();
303 let fixed_size_binary = format!("FixedSizeBinary({PRECISION_BYTES})");
304
305 assert_eq!(schema_map.get("action").unwrap(), "UInt8");
306 assert_eq!(schema_map.get("side").unwrap(), "UInt8");
307 assert_eq!(*schema_map.get("price").unwrap(), fixed_size_binary);
308 assert_eq!(*schema_map.get("size").unwrap(), fixed_size_binary);
309 assert_eq!(schema_map.get("order_id").unwrap(), "UInt64");
310 assert_eq!(schema_map.get("flags").unwrap(), "UInt8");
311 assert_eq!(schema_map.get("sequence").unwrap(), "UInt64");
312 assert_eq!(schema_map.get("ts_event").unwrap(), "UInt64");
313 assert_eq!(schema_map.get("ts_init").unwrap(), "UInt64");
314 }
315
316 #[rstest]
317 fn test_encode_batch() {
318 let instrument_id = InstrumentId::from("AAPL.XNAS");
319 let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
320
321 let delta1 = OrderBookDelta {
322 instrument_id,
323 action: BookAction::Add,
324 order: BookOrder {
325 side: OrderSide::Buy,
326 price: Price::from("100.10"),
327 size: Quantity::from(100),
328 order_id: 1,
329 },
330 flags: 0,
331 sequence: 1,
332 ts_event: 1.into(),
333 ts_init: 3.into(),
334 };
335
336 let delta2 = OrderBookDelta {
337 instrument_id,
338 action: BookAction::Update,
339 order: BookOrder {
340 side: OrderSide::Sell,
341 price: Price::from("101.20"),
342 size: Quantity::from(200),
343 order_id: 2,
344 },
345 flags: 1,
346 sequence: 2,
347 ts_event: 2.into(),
348 ts_init: 4.into(),
349 };
350
351 let data = vec![delta1, delta2];
352 let record_batch = OrderBookDelta::encode_batch(&metadata, &data).unwrap();
353
354 let columns = record_batch.columns();
355 let action_values = columns[0].as_any().downcast_ref::<UInt8Array>().unwrap();
356 let side_values = columns[1].as_any().downcast_ref::<UInt8Array>().unwrap();
357 let price_values = columns[2]
358 .as_any()
359 .downcast_ref::<FixedSizeBinaryArray>()
360 .unwrap();
361 let size_values = columns[3]
362 .as_any()
363 .downcast_ref::<FixedSizeBinaryArray>()
364 .unwrap();
365 let order_id_values = columns[4].as_any().downcast_ref::<UInt64Array>().unwrap();
366 let flags_values = columns[5].as_any().downcast_ref::<UInt8Array>().unwrap();
367 let sequence_values = columns[6].as_any().downcast_ref::<UInt64Array>().unwrap();
368 let ts_event_values = columns[7].as_any().downcast_ref::<UInt64Array>().unwrap();
369 let ts_init_values = columns[8].as_any().downcast_ref::<UInt64Array>().unwrap();
370
371 assert_eq!(columns.len(), 9);
372 assert_eq!(action_values.len(), 2);
373 assert_eq!(action_values.value(0), 1);
374 assert_eq!(action_values.value(1), 2);
375 assert_eq!(side_values.len(), 2);
376 assert_eq!(side_values.value(0), 1);
377 assert_eq!(side_values.value(1), 2);
378
379 assert_eq!(price_values.len(), 2);
380 assert_eq!(
381 get_raw_price(price_values.value(0)),
382 (100.10 * FIXED_SCALAR) as PriceRaw
383 );
384 assert_eq!(
385 get_raw_price(price_values.value(1)),
386 (101.20 * FIXED_SCALAR) as PriceRaw
387 );
388
389 assert_eq!(size_values.len(), 2);
390 assert_eq!(
391 get_raw_price(size_values.value(0)),
392 (100.0 * FIXED_SCALAR) as PriceRaw
393 );
394 assert_eq!(
395 get_raw_price(size_values.value(1)),
396 (200.0 * FIXED_SCALAR) as PriceRaw
397 );
398 assert_eq!(order_id_values.len(), 2);
399 assert_eq!(order_id_values.value(0), 1);
400 assert_eq!(order_id_values.value(1), 2);
401 assert_eq!(flags_values.len(), 2);
402 assert_eq!(flags_values.value(0), 0);
403 assert_eq!(flags_values.value(1), 1);
404 assert_eq!(sequence_values.len(), 2);
405 assert_eq!(sequence_values.value(0), 1);
406 assert_eq!(sequence_values.value(1), 2);
407 assert_eq!(ts_event_values.len(), 2);
408 assert_eq!(ts_event_values.value(0), 1);
409 assert_eq!(ts_event_values.value(1), 2);
410 assert_eq!(ts_init_values.len(), 2);
411 assert_eq!(ts_init_values.value(0), 3);
412 assert_eq!(ts_init_values.value(1), 4);
413 }
414
415 #[rstest]
416 fn test_decode_batch() {
417 let instrument_id = InstrumentId::from("AAPL.XNAS");
418 let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
419
420 let action = UInt8Array::from(vec![1, 2]);
421 let side = UInt8Array::from(vec![1, 1]);
422 let price = FixedSizeBinaryArray::from(vec![
423 &((101.10 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
424 &((101.20 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
425 ]);
426 let size = FixedSizeBinaryArray::from(vec![
427 &((10000.0 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
428 &((9000.0 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
429 ]);
430 let order_id = UInt64Array::from(vec![1, 2]);
431 let flags = UInt8Array::from(vec![0, 0]);
432 let sequence = UInt64Array::from(vec![1, 2]);
433 let ts_event = UInt64Array::from(vec![1, 2]);
434 let ts_init = UInt64Array::from(vec![3, 4]);
435
436 let record_batch = RecordBatch::try_new(
437 OrderBookDelta::get_schema(Some(metadata.clone())).into(),
438 vec![
439 Arc::new(action),
440 Arc::new(side),
441 Arc::new(price),
442 Arc::new(size),
443 Arc::new(order_id),
444 Arc::new(flags),
445 Arc::new(sequence),
446 Arc::new(ts_event),
447 Arc::new(ts_init),
448 ],
449 )
450 .unwrap();
451
452 let decoded_data = OrderBookDelta::decode_batch(&metadata, record_batch).unwrap();
453 assert_eq!(decoded_data.len(), 2);
454 }
455
456 #[rstest]
457 fn test_decode_batch_with_undef_values() {
458 let instrument_id = InstrumentId::from("PLTR.XNAS");
459 let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
460
461 let action = UInt8Array::from(vec![4, 1]); let side = UInt8Array::from(vec![0, 1]); let price = FixedSizeBinaryArray::from(vec![
465 &PRICE_UNDEF.to_le_bytes(),
466 &((100.50 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
467 ]);
468 let size = FixedSizeBinaryArray::from(vec![
469 &QUANTITY_UNDEF.to_le_bytes(),
470 &((1000.0 * FIXED_SCALAR) as PriceRaw).to_le_bytes(),
471 ]);
472 let order_id = UInt64Array::from(vec![0, 1]);
473 let flags = UInt8Array::from(vec![0, 0]);
474 let sequence = UInt64Array::from(vec![1, 2]);
475 let ts_event = UInt64Array::from(vec![1, 2]);
476 let ts_init = UInt64Array::from(vec![3, 4]);
477
478 let record_batch = RecordBatch::try_new(
479 OrderBookDelta::get_schema(Some(metadata.clone())).into(),
480 vec![
481 Arc::new(action),
482 Arc::new(side),
483 Arc::new(price),
484 Arc::new(size),
485 Arc::new(order_id),
486 Arc::new(flags),
487 Arc::new(sequence),
488 Arc::new(ts_event),
489 Arc::new(ts_init),
490 ],
491 )
492 .unwrap();
493
494 let decoded_data = OrderBookDelta::decode_batch(&metadata, record_batch).unwrap();
495 assert_eq!(decoded_data.len(), 2);
496 assert_eq!(decoded_data[0].order.price.raw, PRICE_UNDEF);
497 assert_eq!(decoded_data[0].order.price.precision, 0);
498 assert_eq!(decoded_data[0].order.size.raw, QUANTITY_UNDEF);
499 assert_eq!(decoded_data[0].order.size.precision, 0);
500 assert_eq!(decoded_data[1].order.price.precision, 2);
501 assert_eq!(decoded_data[1].order.size.precision, 0);
502 }
503
504 #[rstest]
505 fn test_decode_batch_invalid_price_returns_error() {
506 let instrument_id = InstrumentId::from("AAPL.XNAS");
507 let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
508
509 let action = UInt8Array::from(vec![1]);
510 let side = UInt8Array::from(vec![1]);
511
512 let invalid_price: PriceRaw = PriceRaw::MAX - 1000;
513 let price = FixedSizeBinaryArray::from(vec![&invalid_price.to_le_bytes()]);
514 let size = FixedSizeBinaryArray::from(vec![
515 &((100.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
516 ]);
517 let order_id = UInt64Array::from(vec![1]);
518 let flags = UInt8Array::from(vec![0]);
519 let sequence = UInt64Array::from(vec![1]);
520 let ts_event = UInt64Array::from(vec![1]);
521 let ts_init = UInt64Array::from(vec![2]);
522
523 let record_batch = RecordBatch::try_new(
524 OrderBookDelta::get_schema(Some(metadata.clone())).into(),
525 vec![
526 Arc::new(action),
527 Arc::new(side),
528 Arc::new(price),
529 Arc::new(size),
530 Arc::new(order_id),
531 Arc::new(flags),
532 Arc::new(sequence),
533 Arc::new(ts_event),
534 Arc::new(ts_init),
535 ],
536 )
537 .unwrap();
538
539 let result = OrderBookDelta::decode_batch(&metadata, record_batch);
540 assert!(result.is_err());
541 let err = result.unwrap_err();
542 assert!(
543 err.to_string().contains("price") && err.to_string().contains("row 0"),
544 "Expected price error at row 0, got: {err}"
545 );
546 }
547
548 #[rstest]
549 fn test_decode_batch_invalid_action_returns_error() {
550 let instrument_id = InstrumentId::from("AAPL.XNAS");
551 let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
552
553 let action = UInt8Array::from(vec![99]);
554 let side = UInt8Array::from(vec![1]);
555 let price =
556 FixedSizeBinaryArray::from(vec![&((100.0 * FIXED_SCALAR) as PriceRaw).to_le_bytes()]);
557 let size = FixedSizeBinaryArray::from(vec![
558 &((100.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
559 ]);
560 let order_id = UInt64Array::from(vec![1]);
561 let flags = UInt8Array::from(vec![0]);
562 let sequence = UInt64Array::from(vec![1]);
563 let ts_event = UInt64Array::from(vec![1]);
564 let ts_init = UInt64Array::from(vec![2]);
565
566 let record_batch = RecordBatch::try_new(
567 OrderBookDelta::get_schema(Some(metadata.clone())).into(),
568 vec![
569 Arc::new(action),
570 Arc::new(side),
571 Arc::new(price),
572 Arc::new(size),
573 Arc::new(order_id),
574 Arc::new(flags),
575 Arc::new(sequence),
576 Arc::new(ts_event),
577 Arc::new(ts_init),
578 ],
579 )
580 .unwrap();
581
582 let result = OrderBookDelta::decode_batch(&metadata, record_batch);
583 assert!(result.is_err());
584 let err = result.unwrap_err();
585 assert!(
586 err.to_string().contains("BookAction"),
587 "Expected BookAction error, got: {err}"
588 );
589 }
590
591 #[rstest]
592 fn test_decode_batch_missing_instrument_id_returns_error() {
593 let instrument_id = InstrumentId::from("AAPL.XNAS");
594 let mut metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
595 metadata.remove(KEY_INSTRUMENT_ID);
596
597 let action = UInt8Array::from(vec![1]);
598 let side = UInt8Array::from(vec![1]);
599 let price =
600 FixedSizeBinaryArray::from(vec![&((100.0 * FIXED_SCALAR) as PriceRaw).to_le_bytes()]);
601 let size = FixedSizeBinaryArray::from(vec![
602 &((100.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
603 ]);
604 let order_id = UInt64Array::from(vec![1]);
605 let flags = UInt8Array::from(vec![0]);
606 let sequence = UInt64Array::from(vec![1]);
607 let ts_event = UInt64Array::from(vec![1]);
608 let ts_init = UInt64Array::from(vec![2]);
609
610 let record_batch = RecordBatch::try_new(
611 OrderBookDelta::get_schema(Some(metadata.clone())).into(),
612 vec![
613 Arc::new(action),
614 Arc::new(side),
615 Arc::new(price),
616 Arc::new(size),
617 Arc::new(order_id),
618 Arc::new(flags),
619 Arc::new(sequence),
620 Arc::new(ts_event),
621 Arc::new(ts_init),
622 ],
623 )
624 .unwrap();
625
626 let result = OrderBookDelta::decode_batch(&metadata, record_batch);
627 assert!(result.is_err());
628 let err = result.unwrap_err();
629 assert!(
630 err.to_string().contains("instrument_id"),
631 "Expected missing instrument_id error, got: {err}"
632 );
633 }
634
635 #[rstest]
636 fn test_encode_decode_round_trip() {
637 let instrument_id = InstrumentId::from("AAPL.XNAS");
638 let metadata = OrderBookDelta::get_metadata(&instrument_id, 2, 0);
639
640 let delta1 = OrderBookDelta {
641 instrument_id,
642 action: BookAction::Add,
643 order: BookOrder {
644 side: OrderSide::Buy,
645 price: Price::from("100.10"),
646 size: Quantity::from(100),
647 order_id: 1,
648 },
649 flags: 0,
650 sequence: 1,
651 ts_event: 1_000_000_000.into(),
652 ts_init: 1_000_000_001.into(),
653 };
654
655 let delta2 = OrderBookDelta {
656 instrument_id,
657 action: BookAction::Update,
658 order: BookOrder {
659 side: OrderSide::Sell,
660 price: Price::from("101.20"),
661 size: Quantity::from(200),
662 order_id: 2,
663 },
664 flags: 1,
665 sequence: 2,
666 ts_event: 2_000_000_000.into(),
667 ts_init: 2_000_000_001.into(),
668 };
669
670 let original = vec![delta1, delta2];
671 let record_batch = OrderBookDelta::encode_batch(&metadata, &original).unwrap();
672 let decoded = OrderBookDelta::decode_batch(&metadata, record_batch).unwrap();
673
674 assert_eq!(decoded.len(), original.len());
675 for (orig, dec) in original.iter().zip(decoded.iter()) {
676 assert_eq!(dec.instrument_id, orig.instrument_id);
677 assert_eq!(dec.action, orig.action);
678 assert_eq!(dec.order.side, orig.order.side);
679 assert_eq!(dec.order.price, orig.order.price);
680 assert_eq!(dec.order.size, orig.order.size);
681 assert_eq!(dec.order.order_id, orig.order.order_id);
682 assert_eq!(dec.flags, orig.flags);
683 assert_eq!(dec.sequence, orig.sequence);
684 assert_eq!(dec.ts_event, orig.ts_event);
685 assert_eq!(dec.ts_init, orig.ts_init);
686 }
687 }
688}