1use std::{io::Read, path::Path};
19
20use ahash::AHashMap;
21use nautilus_core::UnixNanos;
22use nautilus_model::{
23 data::{delta::OrderBookDelta, order::BookOrder},
24 enums::{BookAction, OrderSide, RecordFlag},
25 identifiers::InstrumentId,
26 types::{Price, Quantity},
27};
28
29const PRICE_PRECISION: u8 = 4;
31
32const SIZE_PRECISION: u8 = 0;
34
35#[derive(Debug)]
36struct OrderState {
37 price: Price,
38 size: u32,
39 side: OrderSide,
40}
41
42#[derive(Debug)]
48pub struct ItchParser {
49 instrument_id: InstrumentId,
50 target_locate: Option<u16>,
51 target_stock: String,
52 base_ns: u64,
53 orders: AHashMap<u64, OrderState>,
54 sequence: u64,
55}
56
57impl ItchParser {
58 pub fn new(instrument_id: InstrumentId, stock: &str, base_ns: u64) -> Self {
67 Self {
68 instrument_id,
69 target_locate: None,
70 target_stock: stock.to_string(),
71 base_ns,
72 orders: AHashMap::new(),
73 sequence: 0,
74 }
75 }
76
77 pub fn parse_gzip_file(&mut self, path: &Path) -> anyhow::Result<Vec<OrderBookDelta>> {
84 let stream = itchy::MessageStream::from_gzip(path)
85 .map_err(|e| anyhow::anyhow!("Failed to open ITCH gzip: {e}"))?;
86 self.parse_stream(stream)
87 }
88
89 pub fn parse_reader<R: Read>(&mut self, reader: R) -> anyhow::Result<Vec<OrderBookDelta>> {
95 let stream = itchy::MessageStream::from_reader(reader);
96 self.parse_stream(stream)
97 }
98
99 fn parse_stream<R: Read>(
100 &mut self,
101 stream: itchy::MessageStream<R>,
102 ) -> anyhow::Result<Vec<OrderBookDelta>> {
103 let mut deltas = Vec::new();
104
105 for result in stream {
106 let msg = result.map_err(|e| anyhow::anyhow!("ITCH parse error: {e}"))?;
107
108 match msg.body {
110 itchy::Body::StockDirectory(ref dir) => {
111 let symbol = dir.stock.trim();
112 if symbol == self.target_stock {
113 self.target_locate = Some(msg.stock_locate);
114 }
115 continue;
116 }
117 itchy::Body::SystemEvent {
118 event: itchy::EventCode::EndOfMessages,
119 } => {
120 let ts = UnixNanos::from(self.base_ns + msg.timestamp);
121 self.handle_end_of_messages(ts, &mut deltas);
122 continue;
123 }
124 _ => {}
125 }
126
127 let Some(locate) = self.target_locate else {
129 continue;
130 };
131
132 if msg.stock_locate != locate {
133 continue;
134 }
135
136 let ts = UnixNanos::from(self.base_ns + msg.timestamp);
137
138 match msg.body {
139 itchy::Body::AddOrder(ref add) => {
140 self.handle_add_order(add, ts, &mut deltas);
141 }
142 itchy::Body::DeleteOrder { reference } => {
143 self.handle_delete_order(reference, ts, &mut deltas);
144 }
145 itchy::Body::OrderCancelled {
146 reference,
147 cancelled,
148 } => {
149 self.handle_cancel(reference, cancelled, ts, &mut deltas);
150 }
151 itchy::Body::OrderExecuted {
152 reference,
153 executed,
154 ..
155 } => {
156 self.handle_execution(reference, executed, ts, &mut deltas);
157 }
158 itchy::Body::OrderExecutedWithPrice {
159 reference,
160 executed,
161 ..
162 } => {
163 self.handle_execution(reference, executed, ts, &mut deltas);
164 }
165 itchy::Body::ReplaceOrder(ref replace) => {
166 self.handle_replace(replace, ts, &mut deltas);
167 }
168 _ => {}
169 }
170 }
171
172 if let Some(last) = deltas.last_mut() {
174 last.flags |= RecordFlag::F_LAST as u8;
175 }
176
177 Ok(deltas)
178 }
179
180 fn handle_add_order(
181 &mut self,
182 add: &itchy::AddOrder,
183 ts: UnixNanos,
184 deltas: &mut Vec<OrderBookDelta>,
185 ) {
186 let side = convert_side(add.side);
187 let price = convert_price(add.price);
188
189 self.orders.insert(
190 add.reference,
191 OrderState {
192 price,
193 size: add.shares,
194 side,
195 },
196 );
197
198 self.sequence += 1;
199 let order = BookOrder::new(
200 side,
201 price,
202 Quantity::new(f64::from(add.shares), SIZE_PRECISION),
203 add.reference,
204 );
205 deltas.push(OrderBookDelta::new(
206 self.instrument_id,
207 BookAction::Add,
208 order,
209 RecordFlag::F_LAST as u8,
210 self.sequence,
211 ts,
212 ts,
213 ));
214 }
215
216 fn handle_delete_order(
217 &mut self,
218 reference: u64,
219 ts: UnixNanos,
220 deltas: &mut Vec<OrderBookDelta>,
221 ) {
222 if let Some(state) = self.orders.remove(&reference) {
223 self.sequence += 1;
224 let order = BookOrder::new(
225 state.side,
226 state.price,
227 Quantity::new(0.0, SIZE_PRECISION),
228 reference,
229 );
230 deltas.push(OrderBookDelta::new(
231 self.instrument_id,
232 BookAction::Delete,
233 order,
234 RecordFlag::F_LAST as u8,
235 self.sequence,
236 ts,
237 ts,
238 ));
239 }
240 }
241
242 fn handle_cancel(
243 &mut self,
244 reference: u64,
245 cancelled: u32,
246 ts: UnixNanos,
247 deltas: &mut Vec<OrderBookDelta>,
248 ) {
249 if let Some(state) = self.orders.get_mut(&reference) {
250 state.size = state.size.saturating_sub(cancelled);
251
252 if state.size == 0 {
253 let state = self.orders.remove(&reference).unwrap();
255 self.sequence += 1;
256 let order = BookOrder::new(
257 state.side,
258 state.price,
259 Quantity::new(0.0, SIZE_PRECISION),
260 reference,
261 );
262 deltas.push(OrderBookDelta::new(
263 self.instrument_id,
264 BookAction::Delete,
265 order,
266 RecordFlag::F_LAST as u8,
267 self.sequence,
268 ts,
269 ts,
270 ));
271 } else {
272 self.sequence += 1;
274 let order = BookOrder::new(
275 state.side,
276 state.price,
277 Quantity::new(f64::from(state.size), SIZE_PRECISION),
278 reference,
279 );
280 deltas.push(OrderBookDelta::new(
281 self.instrument_id,
282 BookAction::Update,
283 order,
284 RecordFlag::F_LAST as u8,
285 self.sequence,
286 ts,
287 ts,
288 ));
289 }
290 }
291 }
292
293 fn handle_execution(
294 &mut self,
295 reference: u64,
296 executed: u32,
297 ts: UnixNanos,
298 deltas: &mut Vec<OrderBookDelta>,
299 ) {
300 if let Some(state) = self.orders.get_mut(&reference) {
301 state.size = state.size.saturating_sub(executed);
302
303 if state.size == 0 {
304 let state = self.orders.remove(&reference).unwrap();
306 self.sequence += 1;
307 let order = BookOrder::new(
308 state.side,
309 state.price,
310 Quantity::new(0.0, SIZE_PRECISION),
311 reference,
312 );
313 deltas.push(OrderBookDelta::new(
314 self.instrument_id,
315 BookAction::Delete,
316 order,
317 RecordFlag::F_LAST as u8,
318 self.sequence,
319 ts,
320 ts,
321 ));
322 } else {
323 self.sequence += 1;
325 let order = BookOrder::new(
326 state.side,
327 state.price,
328 Quantity::new(f64::from(state.size), SIZE_PRECISION),
329 reference,
330 );
331 deltas.push(OrderBookDelta::new(
332 self.instrument_id,
333 BookAction::Update,
334 order,
335 RecordFlag::F_LAST as u8,
336 self.sequence,
337 ts,
338 ts,
339 ));
340 }
341 }
342 }
343
344 fn handle_replace(
345 &mut self,
346 replace: &itchy::ReplaceOrder,
347 ts: UnixNanos,
348 deltas: &mut Vec<OrderBookDelta>,
349 ) {
350 if let Some(old_state) = self.orders.remove(&replace.old_reference) {
352 self.sequence += 1;
353 let old_order = BookOrder::new(
354 old_state.side,
355 old_state.price,
356 Quantity::new(0.0, SIZE_PRECISION),
357 replace.old_reference,
358 );
359 deltas.push(OrderBookDelta::new(
360 self.instrument_id,
361 BookAction::Delete,
362 old_order,
363 0, self.sequence,
365 ts,
366 ts,
367 ));
368
369 let new_price = convert_price(replace.price);
371 self.orders.insert(
372 replace.new_reference,
373 OrderState {
374 price: new_price,
375 size: replace.shares,
376 side: old_state.side,
377 },
378 );
379
380 self.sequence += 1;
381 let new_order = BookOrder::new(
382 old_state.side,
383 new_price,
384 Quantity::new(f64::from(replace.shares), SIZE_PRECISION),
385 replace.new_reference,
386 );
387 deltas.push(OrderBookDelta::new(
388 self.instrument_id,
389 BookAction::Add,
390 new_order,
391 RecordFlag::F_LAST as u8,
392 self.sequence,
393 ts,
394 ts,
395 ));
396 }
397 }
398
399 fn handle_end_of_messages(&mut self, ts: UnixNanos, deltas: &mut Vec<OrderBookDelta>) {
400 self.sequence += 1;
401 deltas.push(OrderBookDelta::clear(
402 self.instrument_id,
403 self.sequence,
404 ts,
405 ts,
406 ));
407 }
408}
409
410fn convert_side(side: itchy::Side) -> OrderSide {
411 match side {
412 itchy::Side::Buy => OrderSide::Buy,
413 itchy::Side::Sell => OrderSide::Sell,
414 }
415}
416
417fn convert_price(price: itchy::Price4) -> Price {
418 Price::new(f64::from(price.raw()) / 10_000.0, PRICE_PRECISION)
419}
420
421#[cfg(test)]
422mod tests {
423 use std::{fs, fs::File, path::PathBuf, sync::Arc};
424
425 use nautilus_model::data::OrderBookDelta;
426 use nautilus_serialization::arrow::{ArrowSchemaProvider, EncodeToRecordBatch};
427 use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
428 use rstest::rstest;
429
430 use super::*;
431
432 const AAPL_ID: &str = "AAPL.XNAS";
433
434 fn setup_parser(base_ns: u64) -> ItchParser {
435 ItchParser::new(InstrumentId::from(AAPL_ID), "AAPL", base_ns)
436 }
437
438 fn aapl_stream_with(messages: &[Vec<u8>]) -> Vec<u8> {
439 let mut buf = build_stock_directory_msg(1, b"AAPL ");
440 for msg in messages {
441 buf.extend_from_slice(msg);
442 }
443 buf
444 }
445
446 #[rstest]
447 fn test_convert_side() {
448 assert_eq!(convert_side(itchy::Side::Buy), OrderSide::Buy);
449 assert_eq!(convert_side(itchy::Side::Sell), OrderSide::Sell);
450 }
451
452 #[rstest]
453 fn test_convert_price() {
454 let price = convert_price(itchy::Price4::from(1_2345));
455 assert_eq!(price.as_f64(), 1.2345);
456 assert_eq!(price.precision, PRICE_PRECISION);
457 }
458
459 #[rstest]
460 fn test_convert_price_whole_dollar() {
461 let price = convert_price(itchy::Price4::from(100_0000));
462 assert_eq!(price.as_f64(), 100.0);
463 }
464
465 #[rstest]
466 fn test_convert_price_sub_penny() {
467 let price = convert_price(itchy::Price4::from(150_2501));
468 assert_eq!(price.as_f64(), 150.2501);
469 }
470
471 #[rstest]
472 fn test_add_order() {
473 let buf = aapl_stream_with(&[build_add_order_msg(1, 42, b'B', 100, 1_502_500)]);
474 let mut parser = setup_parser(0);
475 let deltas = parser.parse_reader(&buf[..]).unwrap();
476
477 assert_eq!(deltas.len(), 1);
478 assert_eq!(deltas[0].action, BookAction::Add);
479 assert_eq!(deltas[0].order.side, OrderSide::Buy);
480 assert_eq!(deltas[0].order.price.as_f64(), 150.25);
481 assert_eq!(deltas[0].order.size.as_f64(), 100.0);
482 assert_eq!(deltas[0].order.order_id, 42);
483 }
484
485 #[rstest]
486 fn test_delete_order() {
487 let buf = aapl_stream_with(&[
488 build_add_order_msg(1, 42, b'B', 100, 1_500_000),
489 build_delete_order_msg(1, 42),
490 ]);
491 let mut parser = setup_parser(0);
492 let deltas = parser.parse_reader(&buf[..]).unwrap();
493
494 assert_eq!(deltas.len(), 2);
495 assert_eq!(deltas[1].action, BookAction::Delete);
496 assert_eq!(deltas[1].order.order_id, 42);
497 assert_eq!(deltas[1].order.size.as_f64(), 0.0);
498 }
499
500 #[rstest]
501 fn test_delete_unknown_order_is_ignored() {
502 let buf = aapl_stream_with(&[build_delete_order_msg(1, 999)]);
503 let mut parser = setup_parser(0);
504 let deltas = parser.parse_reader(&buf[..]).unwrap();
505
506 assert_eq!(deltas.len(), 0);
507 }
508
509 #[rstest]
510 fn test_partial_cancel_reduces_size() {
511 let buf = aapl_stream_with(&[
512 build_add_order_msg(1, 42, b'B', 100, 1_500_000),
513 build_order_cancelled_msg(1, 42, 30),
514 ]);
515 let mut parser = setup_parser(0);
516 let deltas = parser.parse_reader(&buf[..]).unwrap();
517
518 assert_eq!(deltas.len(), 2);
519 assert_eq!(deltas[1].action, BookAction::Update);
520 assert_eq!(deltas[1].order.size.as_f64(), 70.0);
521 assert_eq!(deltas[1].order.price.as_f64(), 150.0);
522 assert_eq!(deltas[1].order.order_id, 42);
523 }
524
525 #[rstest]
526 fn test_full_cancel_deletes_order() {
527 let buf = aapl_stream_with(&[
528 build_add_order_msg(1, 42, b'S', 100, 1_500_000),
529 build_order_cancelled_msg(1, 42, 100),
530 ]);
531 let mut parser = setup_parser(0);
532 let deltas = parser.parse_reader(&buf[..]).unwrap();
533
534 assert_eq!(deltas.len(), 2);
535 assert_eq!(deltas[1].action, BookAction::Delete);
536 assert_eq!(deltas[1].order.size.as_f64(), 0.0);
537 }
538
539 #[rstest]
540 fn test_cancel_unknown_order_is_ignored() {
541 let buf = aapl_stream_with(&[build_order_cancelled_msg(1, 999, 50)]);
542 let mut parser = setup_parser(0);
543 let deltas = parser.parse_reader(&buf[..]).unwrap();
544
545 assert_eq!(deltas.len(), 0);
546 }
547
548 #[rstest]
549 fn test_partial_execution_updates_size() {
550 let buf = aapl_stream_with(&[
551 build_add_order_msg(1, 42, b'B', 100, 1_500_000),
552 build_order_executed_msg(1, 42, 40, 1001),
553 ]);
554 let mut parser = setup_parser(0);
555 let deltas = parser.parse_reader(&buf[..]).unwrap();
556
557 assert_eq!(deltas.len(), 2);
558 assert_eq!(deltas[1].action, BookAction::Update);
559 assert_eq!(deltas[1].order.size.as_f64(), 60.0);
560 assert_eq!(deltas[1].order.order_id, 42);
561 }
562
563 #[rstest]
564 fn test_full_execution_deletes_order() {
565 let buf = aapl_stream_with(&[
566 build_add_order_msg(1, 42, b'S', 100, 1_500_000),
567 build_order_executed_msg(1, 42, 100, 1001),
568 ]);
569 let mut parser = setup_parser(0);
570 let deltas = parser.parse_reader(&buf[..]).unwrap();
571
572 assert_eq!(deltas.len(), 2);
573 assert_eq!(deltas[1].action, BookAction::Delete);
574 assert_eq!(deltas[1].order.size.as_f64(), 0.0);
575 }
576
577 #[rstest]
578 fn test_multiple_partial_executions_then_full() {
579 let buf = aapl_stream_with(&[
580 build_add_order_msg(1, 42, b'B', 100, 1_500_000),
581 build_order_executed_msg(1, 42, 30, 1001),
582 build_order_executed_msg(1, 42, 30, 1002),
583 build_order_executed_msg(1, 42, 40, 1003),
584 ]);
585 let mut parser = setup_parser(0);
586 let deltas = parser.parse_reader(&buf[..]).unwrap();
587
588 assert_eq!(deltas.len(), 4);
589 assert_eq!(deltas[1].action, BookAction::Update);
590 assert_eq!(deltas[1].order.size.as_f64(), 70.0);
591 assert_eq!(deltas[2].action, BookAction::Update);
592 assert_eq!(deltas[2].order.size.as_f64(), 40.0);
593 assert_eq!(deltas[3].action, BookAction::Delete);
594 assert_eq!(deltas[3].order.size.as_f64(), 0.0);
595 }
596
597 #[rstest]
598 fn test_executed_with_price_partial() {
599 let buf = aapl_stream_with(&[
600 build_add_order_msg(1, 42, b'B', 100, 1_500_000),
601 build_order_executed_with_price_msg(1, 42, 25, 2001, 1_505_000),
602 ]);
603 let mut parser = setup_parser(0);
604 let deltas = parser.parse_reader(&buf[..]).unwrap();
605
606 assert_eq!(deltas.len(), 2);
607 assert_eq!(deltas[1].action, BookAction::Update);
608 assert_eq!(deltas[1].order.size.as_f64(), 75.0);
609 assert_eq!(deltas[1].order.price.as_f64(), 150.0);
611 }
612
613 #[rstest]
614 fn test_execution_unknown_order_is_ignored() {
615 let buf = aapl_stream_with(&[build_order_executed_msg(1, 999, 50, 1001)]);
616 let mut parser = setup_parser(0);
617 let deltas = parser.parse_reader(&buf[..]).unwrap();
618
619 assert_eq!(deltas.len(), 0);
620 }
621
622 #[rstest]
623 fn test_replace_order() {
624 let buf = aapl_stream_with(&[
625 build_add_order_msg(1, 42, b'B', 100, 1_500_000),
626 build_replace_order_msg(1, 42, 43, 150, 1_510_000),
627 ]);
628 let mut parser = setup_parser(0);
629 let deltas = parser.parse_reader(&buf[..]).unwrap();
630
631 assert_eq!(deltas.len(), 3);
632 assert_eq!(deltas[1].action, BookAction::Delete);
633 assert_eq!(deltas[1].order.order_id, 42);
634 assert_eq!(deltas[2].action, BookAction::Add);
635 assert_eq!(deltas[2].order.order_id, 43);
636 assert_eq!(deltas[2].order.price.as_f64(), 151.0);
637 assert_eq!(deltas[2].order.size.as_f64(), 150.0);
638 assert_eq!(deltas[2].order.side, OrderSide::Buy);
639 }
640
641 #[rstest]
642 fn test_replace_inherits_side_from_original() {
643 let buf = aapl_stream_with(&[
644 build_add_order_msg(1, 42, b'S', 100, 1_500_000),
645 build_replace_order_msg(1, 42, 43, 200, 1_490_000),
646 ]);
647 let mut parser = setup_parser(0);
648 let deltas = parser.parse_reader(&buf[..]).unwrap();
649
650 assert_eq!(deltas[2].order.side, OrderSide::Sell);
651 }
652
653 #[rstest]
654 fn test_replace_delete_has_no_f_last_flag() {
655 let buf = aapl_stream_with(&[
657 build_add_order_msg(1, 42, b'B', 100, 1_500_000),
658 build_replace_order_msg(1, 42, 43, 100, 1_510_000),
659 ]);
660 let mut parser = setup_parser(0);
661 let deltas = parser.parse_reader(&buf[..]).unwrap();
662
663 assert_eq!(deltas[1].flags, 0);
664 assert_ne!(deltas[2].flags & RecordFlag::F_LAST as u8, 0);
665 }
666
667 #[rstest]
668 fn test_replace_unknown_order_is_ignored() {
669 let buf = aapl_stream_with(&[build_replace_order_msg(1, 999, 1000, 100, 1_500_000)]);
670 let mut parser = setup_parser(0);
671 let deltas = parser.parse_reader(&buf[..]).unwrap();
672
673 assert_eq!(deltas.len(), 0);
674 }
675
676 #[rstest]
677 fn test_end_of_messages_emits_clear() {
678 let buf = aapl_stream_with(&[
679 build_add_order_msg(1, 42, b'B', 100, 1_500_000),
680 build_system_event_msg(0, b'C'),
682 ]);
683 let mut parser = setup_parser(0);
684 let deltas = parser.parse_reader(&buf[..]).unwrap();
685
686 assert_eq!(deltas.len(), 2);
687 assert_eq!(deltas[1].action, BookAction::Clear);
688 }
689
690 #[rstest]
691 fn test_end_of_messages_with_different_locate() {
692 let buf = aapl_stream_with(&[
694 build_add_order_msg(1, 42, b'B', 100, 1_500_000),
695 build_system_event_msg(99, b'C'),
696 ]);
697 let mut parser = setup_parser(0);
698 let deltas = parser.parse_reader(&buf[..]).unwrap();
699
700 assert_eq!(deltas.len(), 2);
701 assert_eq!(deltas[1].action, BookAction::Clear);
702 }
703
704 #[rstest]
705 fn test_filters_by_stock_locate() {
706 let mut buf = build_stock_directory_msg(1, b"AAPL ");
707 buf.extend_from_slice(&build_stock_directory_msg(2, b"MSFT "));
708 buf.extend_from_slice(&build_add_order_msg(2, 10, b'B', 50, 3_000_000));
709 buf.extend_from_slice(&build_add_order_msg(1, 11, b'S', 200, 1_500_000));
710
711 let mut parser = setup_parser(0);
712 let deltas = parser.parse_reader(&buf[..]).unwrap();
713
714 assert_eq!(deltas.len(), 1);
715 assert_eq!(deltas[0].order.order_id, 11);
716 }
717
718 #[rstest]
719 fn test_messages_before_directory_are_ignored() {
720 let mut buf = Vec::new();
721 buf.extend_from_slice(&build_add_order_msg(1, 42, b'B', 100, 1_500_000));
723 buf.extend_from_slice(&build_stock_directory_msg(1, b"AAPL "));
724 buf.extend_from_slice(&build_add_order_msg(1, 43, b'B', 100, 1_500_000));
725
726 let mut parser = setup_parser(0);
727 let deltas = parser.parse_reader(&buf[..]).unwrap();
728
729 assert_eq!(deltas.len(), 1);
731 assert_eq!(deltas[0].order.order_id, 43);
732 }
733
734 #[rstest]
735 fn test_timestamp_offset_from_midnight() {
736 let base_ns: u64 = 1_548_806_400_000_000_000; let itch_ts: u64 = 34_200_000_000_000; let buf = aapl_stream_with(&[build_add_order_msg_with_ts(
739 1, 42, b'B', 100, 1_500_000, itch_ts,
740 )]);
741 let mut parser = setup_parser(base_ns);
742 let deltas = parser.parse_reader(&buf[..]).unwrap();
743
744 assert_eq!(deltas[0].ts_event, UnixNanos::from(base_ns + itch_ts));
745 assert_eq!(deltas[0].ts_init, deltas[0].ts_event);
746 }
747
748 #[rstest]
749 fn test_f_last_set_on_final_delta() {
750 let buf = aapl_stream_with(&[
751 build_add_order_msg(1, 42, b'B', 100, 1_500_000),
752 build_add_order_msg(1, 43, b'S', 200, 1_510_000),
753 ]);
754 let mut parser = setup_parser(0);
755 let deltas = parser.parse_reader(&buf[..]).unwrap();
756
757 assert_eq!(deltas.len(), 2);
758 assert_ne!(deltas[1].flags & RecordFlag::F_LAST as u8, 0);
759 }
760
761 #[rstest]
762 fn test_sequence_numbers_are_monotonic() {
763 let buf = aapl_stream_with(&[
764 build_add_order_msg(1, 42, b'B', 100, 1_500_000),
765 build_order_executed_msg(1, 42, 50, 1001),
766 build_add_order_msg(1, 43, b'S', 200, 1_510_000),
767 build_delete_order_msg(1, 43),
768 ]);
769 let mut parser = setup_parser(0);
770 let deltas = parser.parse_reader(&buf[..]).unwrap();
771
772 for i in 1..deltas.len() {
773 assert!(deltas[i].sequence > deltas[i - 1].sequence);
774 }
775 }
776
777 #[rstest]
778 fn test_empty_stream() {
779 let buf: &[u8] = &[];
780 let mut parser = setup_parser(0);
781 let deltas = parser.parse_reader(buf).unwrap();
782
783 assert_eq!(deltas.len(), 0);
784 }
785
786 fn build_msg(tag: u8, stock_locate: u16, timestamp: u64, body: &[u8]) -> Vec<u8> {
787 let msg_len = (1 + 2 + 2 + 6 + body.len()) as u16;
788 let mut buf = Vec::new();
789 buf.extend_from_slice(&msg_len.to_be_bytes());
790 buf.push(tag);
791 buf.extend_from_slice(&stock_locate.to_be_bytes());
792 buf.extend_from_slice(&0u16.to_be_bytes()); buf.push((timestamp >> 40) as u8);
795 buf.push((timestamp >> 32) as u8);
796 buf.push((timestamp >> 24) as u8);
797 buf.push((timestamp >> 16) as u8);
798 buf.push((timestamp >> 8) as u8);
799 buf.push(timestamp as u8);
800 buf.extend_from_slice(body);
801 buf
802 }
803
804 fn build_stock_directory_msg(locate: u16, stock: &[u8; 8]) -> Vec<u8> {
805 let mut body = Vec::new();
806 body.extend_from_slice(stock);
807 body.push(b'Q'); body.push(b'N'); body.extend_from_slice(&100u32.to_be_bytes()); body.push(b'Y'); body.push(b'C'); body.extend_from_slice(b"C "); body.push(b'P'); body.push(b'N'); body.push(b'N'); body.push(b'1'); body.push(b'N'); body.extend_from_slice(&0u32.to_be_bytes()); body.push(b'N'); build_msg(b'R', locate, 0, &body)
821 }
822
823 fn build_add_order_msg(
824 locate: u16,
825 reference: u64,
826 side: u8,
827 shares: u32,
828 price: u32,
829 ) -> Vec<u8> {
830 build_add_order_msg_with_ts(locate, reference, side, shares, price, 0)
831 }
832
833 fn build_add_order_msg_with_ts(
834 locate: u16,
835 reference: u64,
836 side: u8,
837 shares: u32,
838 price: u32,
839 timestamp: u64,
840 ) -> Vec<u8> {
841 let mut body = Vec::new();
842 body.extend_from_slice(&reference.to_be_bytes());
843 body.push(side);
844 body.extend_from_slice(&shares.to_be_bytes());
845 body.extend_from_slice(b"AAPL ");
846 body.extend_from_slice(&price.to_be_bytes());
847 build_msg(b'A', locate, timestamp, &body)
848 }
849
850 fn build_delete_order_msg(locate: u16, reference: u64) -> Vec<u8> {
851 build_msg(b'D', locate, 0, &reference.to_be_bytes())
852 }
853
854 fn build_order_cancelled_msg(locate: u16, reference: u64, cancelled: u32) -> Vec<u8> {
855 let mut body = Vec::new();
856 body.extend_from_slice(&reference.to_be_bytes());
857 body.extend_from_slice(&cancelled.to_be_bytes());
858 build_msg(b'X', locate, 0, &body)
859 }
860
861 fn build_order_executed_msg(
862 locate: u16,
863 reference: u64,
864 executed: u32,
865 match_number: u64,
866 ) -> Vec<u8> {
867 let mut body = Vec::new();
868 body.extend_from_slice(&reference.to_be_bytes());
869 body.extend_from_slice(&executed.to_be_bytes());
870 body.extend_from_slice(&match_number.to_be_bytes());
871 build_msg(b'E', locate, 0, &body)
872 }
873
874 fn build_order_executed_with_price_msg(
875 locate: u16,
876 reference: u64,
877 executed: u32,
878 match_number: u64,
879 price: u32,
880 ) -> Vec<u8> {
881 let mut body = Vec::new();
882 body.extend_from_slice(&reference.to_be_bytes());
883 body.extend_from_slice(&executed.to_be_bytes());
884 body.extend_from_slice(&match_number.to_be_bytes());
885 body.push(b'Y'); body.extend_from_slice(&price.to_be_bytes());
887 build_msg(b'C', locate, 0, &body)
888 }
889
890 fn build_replace_order_msg(
891 locate: u16,
892 old_reference: u64,
893 new_reference: u64,
894 shares: u32,
895 price: u32,
896 ) -> Vec<u8> {
897 let mut body = Vec::new();
898 body.extend_from_slice(&old_reference.to_be_bytes());
899 body.extend_from_slice(&new_reference.to_be_bytes());
900 body.extend_from_slice(&shares.to_be_bytes());
901 body.extend_from_slice(&price.to_be_bytes());
902 build_msg(b'U', locate, 0, &body)
903 }
904
905 fn build_system_event_msg(locate: u16, event_code: u8) -> Vec<u8> {
906 build_msg(b'S', locate, 0, &[event_code])
907 }
908
909 #[rstest]
913 #[ignore = "one-time dataset curation, not for routine CI"]
914 fn test_curate_aapl_itch() {
915 let itch_path = PathBuf::from("/tmp/01302019.NASDAQ_ITCH50.gz");
916 let instrument_id = InstrumentId::from("AAPL.XNAS");
917
918 let base_ns: u64 = 1_548_824_400_000_000_000;
920 let parquet_path = "/tmp/itch_AAPL.XNAS_2019-01-30_deltas.parquet";
921
922 println!("Parsing ITCH from {}", itch_path.display());
923 let mut parser = ItchParser::new(instrument_id, "AAPL", base_ns);
924 let deltas = parser.parse_gzip_file(&itch_path).unwrap();
925 let count = deltas.len();
926 println!("Parsed {count} deltas for AAPL");
927
928 let metadata =
929 OrderBookDelta::get_metadata(&instrument_id, PRICE_PRECISION, SIZE_PRECISION);
930 let schema = OrderBookDelta::get_schema(Some(metadata.clone()));
931
932 println!("Writing Parquet to {parquet_path}");
933 let file = File::create(parquet_path).unwrap();
934 let zstd_level = parquet::basic::ZstdLevel::try_new(3).unwrap();
935 let props = WriterProperties::builder()
936 .set_compression(parquet::basic::Compression::ZSTD(zstd_level))
937 .set_max_row_group_size(1_000_000)
938 .build();
939 let mut writer = ArrowWriter::try_new(file, Arc::new(schema), Some(props)).unwrap();
940
941 let chunk_size = 1_000_000;
942 for (i, chunk) in deltas.chunks(chunk_size).enumerate() {
943 println!(" Encoding chunk {} ({} records)...", i + 1, chunk.len());
944 let batch = OrderBookDelta::encode_batch(&metadata, chunk).unwrap();
945 writer.write(&batch).unwrap();
946 }
947 writer.close().unwrap();
948
949 let file_size = fs::metadata(parquet_path).unwrap().len();
950 println!("\nRecords: {count}");
951 println!("Price precision: {PRICE_PRECISION}");
952 println!("Size precision: {SIZE_PRECISION}");
953 println!(
954 "File size: {} bytes ({:.1} MB)",
955 file_size,
956 file_size as f64 / 1_048_576.0
957 );
958 println!("Output: {parquet_path}");
959 println!("\nNext steps:");
960 println!(" sha256sum {parquet_path}");
961 }
962}