Skip to main content

nautilus_testkit/itch/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! ITCH 5.0 message to [`OrderBookDelta`] conversion.
17
18use std::{io::Read, path::Path};
19
20use ahash::AHashMap;
21use nautilus_core::UnixNanos;
22use nautilus_model::{
23    data::{delta::OrderBookDelta, order::BookOrder},
24    enums::{BookAction, OrderSide, RecordFlag},
25    identifiers::InstrumentId,
26    types::{Price, Quantity},
27};
28
29/// Price precision for US equities (4 decimal places, $0.0001 increments).
30const PRICE_PRECISION: u8 = 4;
31
32/// Size precision for US equities (whole shares).
33const SIZE_PRECISION: u8 = 0;
34
35#[derive(Debug)]
36struct OrderState {
37    price: Price,
38    size: u32,
39    side: OrderSide,
40}
41
42/// Converts a stream of ITCH 5.0 messages into [`OrderBookDelta`] events
43/// for a single instrument.
44///
45/// Maintains internal order state to compute remaining sizes after partial
46/// executions and cancellations.
47#[derive(Debug)]
48pub struct ItchParser {
49    instrument_id: InstrumentId,
50    target_locate: Option<u16>,
51    target_stock: String,
52    base_ns: u64,
53    orders: AHashMap<u64, OrderState>,
54    sequence: u64,
55}
56
57impl ItchParser {
58    /// Creates a new [`ItchParser`] for the given instrument.
59    ///
60    /// # Arguments
61    ///
62    /// - `instrument_id` - The NautilusTrader instrument ID for output deltas.
63    /// - `stock` - The ITCH stock symbol to filter for (e.g., "AAPL").
64    /// - `base_ns` - Base UNIX nanoseconds for midnight of the trading day
65    ///   (ITCH timestamps are nanoseconds since midnight).
66    pub fn new(instrument_id: InstrumentId, stock: &str, base_ns: u64) -> Self {
67        Self {
68            instrument_id,
69            target_locate: None,
70            target_stock: stock.to_string(),
71            base_ns,
72            orders: AHashMap::new(),
73            sequence: 0,
74        }
75    }
76
77    /// Parses all ITCH messages from a gzip-compressed file and returns
78    /// the filtered [`OrderBookDelta`] events.
79    ///
80    /// # Errors
81    ///
82    /// Returns an error if the file cannot be opened or contains invalid data.
83    pub fn parse_gzip_file(&mut self, path: &Path) -> anyhow::Result<Vec<OrderBookDelta>> {
84        let stream = itchy::MessageStream::from_gzip(path)
85            .map_err(|e| anyhow::anyhow!("Failed to open ITCH gzip: {e}"))?;
86        self.parse_stream(stream)
87    }
88
89    /// Parses all ITCH messages from a reader and returns filtered deltas.
90    ///
91    /// # Errors
92    ///
93    /// Returns an error if the stream contains invalid data.
94    pub fn parse_reader<R: Read>(&mut self, reader: R) -> anyhow::Result<Vec<OrderBookDelta>> {
95        let stream = itchy::MessageStream::from_reader(reader);
96        self.parse_stream(stream)
97    }
98
99    fn parse_stream<R: Read>(
100        &mut self,
101        stream: itchy::MessageStream<R>,
102    ) -> anyhow::Result<Vec<OrderBookDelta>> {
103        let mut deltas = Vec::new();
104
105        for result in stream {
106            let msg = result.map_err(|e| anyhow::anyhow!("ITCH parse error: {e}"))?;
107
108            // Handle feed-level messages before stock locate filtering
109            match msg.body {
110                itchy::Body::StockDirectory(ref dir) => {
111                    let symbol = dir.stock.trim();
112                    if symbol == self.target_stock {
113                        self.target_locate = Some(msg.stock_locate);
114                    }
115                    continue;
116                }
117                itchy::Body::SystemEvent {
118                    event: itchy::EventCode::EndOfMessages,
119                } => {
120                    let ts = UnixNanos::from(self.base_ns + msg.timestamp);
121                    self.handle_end_of_messages(ts, &mut deltas);
122                    continue;
123                }
124                _ => {}
125            }
126
127            // Filter by target stock locate
128            let Some(locate) = self.target_locate else {
129                continue;
130            };
131
132            if msg.stock_locate != locate {
133                continue;
134            }
135
136            let ts = UnixNanos::from(self.base_ns + msg.timestamp);
137
138            match msg.body {
139                itchy::Body::AddOrder(ref add) => {
140                    self.handle_add_order(add, ts, &mut deltas);
141                }
142                itchy::Body::DeleteOrder { reference } => {
143                    self.handle_delete_order(reference, ts, &mut deltas);
144                }
145                itchy::Body::OrderCancelled {
146                    reference,
147                    cancelled,
148                } => {
149                    self.handle_cancel(reference, cancelled, ts, &mut deltas);
150                }
151                itchy::Body::OrderExecuted {
152                    reference,
153                    executed,
154                    ..
155                } => {
156                    self.handle_execution(reference, executed, ts, &mut deltas);
157                }
158                itchy::Body::OrderExecutedWithPrice {
159                    reference,
160                    executed,
161                    ..
162                } => {
163                    self.handle_execution(reference, executed, ts, &mut deltas);
164                }
165                itchy::Body::ReplaceOrder(ref replace) => {
166                    self.handle_replace(replace, ts, &mut deltas);
167                }
168                _ => {}
169            }
170        }
171
172        // Set F_LAST on the final delta
173        if let Some(last) = deltas.last_mut() {
174            last.flags |= RecordFlag::F_LAST as u8;
175        }
176
177        Ok(deltas)
178    }
179
180    fn handle_add_order(
181        &mut self,
182        add: &itchy::AddOrder,
183        ts: UnixNanos,
184        deltas: &mut Vec<OrderBookDelta>,
185    ) {
186        let side = convert_side(add.side);
187        let price = convert_price(add.price);
188
189        self.orders.insert(
190            add.reference,
191            OrderState {
192                price,
193                size: add.shares,
194                side,
195            },
196        );
197
198        self.sequence += 1;
199        let order = BookOrder::new(
200            side,
201            price,
202            Quantity::new(f64::from(add.shares), SIZE_PRECISION),
203            add.reference,
204        );
205        deltas.push(OrderBookDelta::new(
206            self.instrument_id,
207            BookAction::Add,
208            order,
209            RecordFlag::F_LAST as u8,
210            self.sequence,
211            ts,
212            ts,
213        ));
214    }
215
216    fn handle_delete_order(
217        &mut self,
218        reference: u64,
219        ts: UnixNanos,
220        deltas: &mut Vec<OrderBookDelta>,
221    ) {
222        if let Some(state) = self.orders.remove(&reference) {
223            self.sequence += 1;
224            let order = BookOrder::new(
225                state.side,
226                state.price,
227                Quantity::new(0.0, SIZE_PRECISION),
228                reference,
229            );
230            deltas.push(OrderBookDelta::new(
231                self.instrument_id,
232                BookAction::Delete,
233                order,
234                RecordFlag::F_LAST as u8,
235                self.sequence,
236                ts,
237                ts,
238            ));
239        }
240    }
241
242    fn handle_cancel(
243        &mut self,
244        reference: u64,
245        cancelled: u32,
246        ts: UnixNanos,
247        deltas: &mut Vec<OrderBookDelta>,
248    ) {
249        if let Some(state) = self.orders.get_mut(&reference) {
250            state.size = state.size.saturating_sub(cancelled);
251
252            if state.size == 0 {
253                // Full cancel
254                let state = self.orders.remove(&reference).unwrap();
255                self.sequence += 1;
256                let order = BookOrder::new(
257                    state.side,
258                    state.price,
259                    Quantity::new(0.0, SIZE_PRECISION),
260                    reference,
261                );
262                deltas.push(OrderBookDelta::new(
263                    self.instrument_id,
264                    BookAction::Delete,
265                    order,
266                    RecordFlag::F_LAST as u8,
267                    self.sequence,
268                    ts,
269                    ts,
270                ));
271            } else {
272                // Partial cancel
273                self.sequence += 1;
274                let order = BookOrder::new(
275                    state.side,
276                    state.price,
277                    Quantity::new(f64::from(state.size), SIZE_PRECISION),
278                    reference,
279                );
280                deltas.push(OrderBookDelta::new(
281                    self.instrument_id,
282                    BookAction::Update,
283                    order,
284                    RecordFlag::F_LAST as u8,
285                    self.sequence,
286                    ts,
287                    ts,
288                ));
289            }
290        }
291    }
292
293    fn handle_execution(
294        &mut self,
295        reference: u64,
296        executed: u32,
297        ts: UnixNanos,
298        deltas: &mut Vec<OrderBookDelta>,
299    ) {
300        if let Some(state) = self.orders.get_mut(&reference) {
301            state.size = state.size.saturating_sub(executed);
302
303            if state.size == 0 {
304                // Fully consumed
305                let state = self.orders.remove(&reference).unwrap();
306                self.sequence += 1;
307                let order = BookOrder::new(
308                    state.side,
309                    state.price,
310                    Quantity::new(0.0, SIZE_PRECISION),
311                    reference,
312                );
313                deltas.push(OrderBookDelta::new(
314                    self.instrument_id,
315                    BookAction::Delete,
316                    order,
317                    RecordFlag::F_LAST as u8,
318                    self.sequence,
319                    ts,
320                    ts,
321                ));
322            } else {
323                // Partial execution
324                self.sequence += 1;
325                let order = BookOrder::new(
326                    state.side,
327                    state.price,
328                    Quantity::new(f64::from(state.size), SIZE_PRECISION),
329                    reference,
330                );
331                deltas.push(OrderBookDelta::new(
332                    self.instrument_id,
333                    BookAction::Update,
334                    order,
335                    RecordFlag::F_LAST as u8,
336                    self.sequence,
337                    ts,
338                    ts,
339                ));
340            }
341        }
342    }
343
344    fn handle_replace(
345        &mut self,
346        replace: &itchy::ReplaceOrder,
347        ts: UnixNanos,
348        deltas: &mut Vec<OrderBookDelta>,
349    ) {
350        // Delete old order
351        if let Some(old_state) = self.orders.remove(&replace.old_reference) {
352            self.sequence += 1;
353            let old_order = BookOrder::new(
354                old_state.side,
355                old_state.price,
356                Quantity::new(0.0, SIZE_PRECISION),
357                replace.old_reference,
358            );
359            deltas.push(OrderBookDelta::new(
360                self.instrument_id,
361                BookAction::Delete,
362                old_order,
363                0, // Not the last in this event group
364                self.sequence,
365                ts,
366                ts,
367            ));
368
369            // Add new order (inherits side from old order)
370            let new_price = convert_price(replace.price);
371            self.orders.insert(
372                replace.new_reference,
373                OrderState {
374                    price: new_price,
375                    size: replace.shares,
376                    side: old_state.side,
377                },
378            );
379
380            self.sequence += 1;
381            let new_order = BookOrder::new(
382                old_state.side,
383                new_price,
384                Quantity::new(f64::from(replace.shares), SIZE_PRECISION),
385                replace.new_reference,
386            );
387            deltas.push(OrderBookDelta::new(
388                self.instrument_id,
389                BookAction::Add,
390                new_order,
391                RecordFlag::F_LAST as u8,
392                self.sequence,
393                ts,
394                ts,
395            ));
396        }
397    }
398
399    fn handle_end_of_messages(&mut self, ts: UnixNanos, deltas: &mut Vec<OrderBookDelta>) {
400        self.sequence += 1;
401        deltas.push(OrderBookDelta::clear(
402            self.instrument_id,
403            self.sequence,
404            ts,
405            ts,
406        ));
407    }
408}
409
410fn convert_side(side: itchy::Side) -> OrderSide {
411    match side {
412        itchy::Side::Buy => OrderSide::Buy,
413        itchy::Side::Sell => OrderSide::Sell,
414    }
415}
416
417fn convert_price(price: itchy::Price4) -> Price {
418    Price::new(f64::from(price.raw()) / 10_000.0, PRICE_PRECISION)
419}
420
421#[cfg(test)]
422mod tests {
423    use std::{fs, fs::File, path::PathBuf, sync::Arc};
424
425    use nautilus_model::data::OrderBookDelta;
426    use nautilus_serialization::arrow::{ArrowSchemaProvider, EncodeToRecordBatch};
427    use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
428    use rstest::rstest;
429
430    use super::*;
431
432    const AAPL_ID: &str = "AAPL.XNAS";
433
434    fn setup_parser(base_ns: u64) -> ItchParser {
435        ItchParser::new(InstrumentId::from(AAPL_ID), "AAPL", base_ns)
436    }
437
438    fn aapl_stream_with(messages: &[Vec<u8>]) -> Vec<u8> {
439        let mut buf = build_stock_directory_msg(1, b"AAPL    ");
440        for msg in messages {
441            buf.extend_from_slice(msg);
442        }
443        buf
444    }
445
446    #[rstest]
447    fn test_convert_side() {
448        assert_eq!(convert_side(itchy::Side::Buy), OrderSide::Buy);
449        assert_eq!(convert_side(itchy::Side::Sell), OrderSide::Sell);
450    }
451
452    #[rstest]
453    fn test_convert_price() {
454        let price = convert_price(itchy::Price4::from(1_2345));
455        assert_eq!(price.as_f64(), 1.2345);
456        assert_eq!(price.precision, PRICE_PRECISION);
457    }
458
459    #[rstest]
460    fn test_convert_price_whole_dollar() {
461        let price = convert_price(itchy::Price4::from(100_0000));
462        assert_eq!(price.as_f64(), 100.0);
463    }
464
465    #[rstest]
466    fn test_convert_price_sub_penny() {
467        let price = convert_price(itchy::Price4::from(150_2501));
468        assert_eq!(price.as_f64(), 150.2501);
469    }
470
471    #[rstest]
472    fn test_add_order() {
473        let buf = aapl_stream_with(&[build_add_order_msg(1, 42, b'B', 100, 1_502_500)]);
474        let mut parser = setup_parser(0);
475        let deltas = parser.parse_reader(&buf[..]).unwrap();
476
477        assert_eq!(deltas.len(), 1);
478        assert_eq!(deltas[0].action, BookAction::Add);
479        assert_eq!(deltas[0].order.side, OrderSide::Buy);
480        assert_eq!(deltas[0].order.price.as_f64(), 150.25);
481        assert_eq!(deltas[0].order.size.as_f64(), 100.0);
482        assert_eq!(deltas[0].order.order_id, 42);
483    }
484
485    #[rstest]
486    fn test_delete_order() {
487        let buf = aapl_stream_with(&[
488            build_add_order_msg(1, 42, b'B', 100, 1_500_000),
489            build_delete_order_msg(1, 42),
490        ]);
491        let mut parser = setup_parser(0);
492        let deltas = parser.parse_reader(&buf[..]).unwrap();
493
494        assert_eq!(deltas.len(), 2);
495        assert_eq!(deltas[1].action, BookAction::Delete);
496        assert_eq!(deltas[1].order.order_id, 42);
497        assert_eq!(deltas[1].order.size.as_f64(), 0.0);
498    }
499
500    #[rstest]
501    fn test_delete_unknown_order_is_ignored() {
502        let buf = aapl_stream_with(&[build_delete_order_msg(1, 999)]);
503        let mut parser = setup_parser(0);
504        let deltas = parser.parse_reader(&buf[..]).unwrap();
505
506        assert_eq!(deltas.len(), 0);
507    }
508
509    #[rstest]
510    fn test_partial_cancel_reduces_size() {
511        let buf = aapl_stream_with(&[
512            build_add_order_msg(1, 42, b'B', 100, 1_500_000),
513            build_order_cancelled_msg(1, 42, 30),
514        ]);
515        let mut parser = setup_parser(0);
516        let deltas = parser.parse_reader(&buf[..]).unwrap();
517
518        assert_eq!(deltas.len(), 2);
519        assert_eq!(deltas[1].action, BookAction::Update);
520        assert_eq!(deltas[1].order.size.as_f64(), 70.0);
521        assert_eq!(deltas[1].order.price.as_f64(), 150.0);
522        assert_eq!(deltas[1].order.order_id, 42);
523    }
524
525    #[rstest]
526    fn test_full_cancel_deletes_order() {
527        let buf = aapl_stream_with(&[
528            build_add_order_msg(1, 42, b'S', 100, 1_500_000),
529            build_order_cancelled_msg(1, 42, 100),
530        ]);
531        let mut parser = setup_parser(0);
532        let deltas = parser.parse_reader(&buf[..]).unwrap();
533
534        assert_eq!(deltas.len(), 2);
535        assert_eq!(deltas[1].action, BookAction::Delete);
536        assert_eq!(deltas[1].order.size.as_f64(), 0.0);
537    }
538
539    #[rstest]
540    fn test_cancel_unknown_order_is_ignored() {
541        let buf = aapl_stream_with(&[build_order_cancelled_msg(1, 999, 50)]);
542        let mut parser = setup_parser(0);
543        let deltas = parser.parse_reader(&buf[..]).unwrap();
544
545        assert_eq!(deltas.len(), 0);
546    }
547
548    #[rstest]
549    fn test_partial_execution_updates_size() {
550        let buf = aapl_stream_with(&[
551            build_add_order_msg(1, 42, b'B', 100, 1_500_000),
552            build_order_executed_msg(1, 42, 40, 1001),
553        ]);
554        let mut parser = setup_parser(0);
555        let deltas = parser.parse_reader(&buf[..]).unwrap();
556
557        assert_eq!(deltas.len(), 2);
558        assert_eq!(deltas[1].action, BookAction::Update);
559        assert_eq!(deltas[1].order.size.as_f64(), 60.0);
560        assert_eq!(deltas[1].order.order_id, 42);
561    }
562
563    #[rstest]
564    fn test_full_execution_deletes_order() {
565        let buf = aapl_stream_with(&[
566            build_add_order_msg(1, 42, b'S', 100, 1_500_000),
567            build_order_executed_msg(1, 42, 100, 1001),
568        ]);
569        let mut parser = setup_parser(0);
570        let deltas = parser.parse_reader(&buf[..]).unwrap();
571
572        assert_eq!(deltas.len(), 2);
573        assert_eq!(deltas[1].action, BookAction::Delete);
574        assert_eq!(deltas[1].order.size.as_f64(), 0.0);
575    }
576
577    #[rstest]
578    fn test_multiple_partial_executions_then_full() {
579        let buf = aapl_stream_with(&[
580            build_add_order_msg(1, 42, b'B', 100, 1_500_000),
581            build_order_executed_msg(1, 42, 30, 1001),
582            build_order_executed_msg(1, 42, 30, 1002),
583            build_order_executed_msg(1, 42, 40, 1003),
584        ]);
585        let mut parser = setup_parser(0);
586        let deltas = parser.parse_reader(&buf[..]).unwrap();
587
588        assert_eq!(deltas.len(), 4);
589        assert_eq!(deltas[1].action, BookAction::Update);
590        assert_eq!(deltas[1].order.size.as_f64(), 70.0);
591        assert_eq!(deltas[2].action, BookAction::Update);
592        assert_eq!(deltas[2].order.size.as_f64(), 40.0);
593        assert_eq!(deltas[3].action, BookAction::Delete);
594        assert_eq!(deltas[3].order.size.as_f64(), 0.0);
595    }
596
597    #[rstest]
598    fn test_executed_with_price_partial() {
599        let buf = aapl_stream_with(&[
600            build_add_order_msg(1, 42, b'B', 100, 1_500_000),
601            build_order_executed_with_price_msg(1, 42, 25, 2001, 1_505_000),
602        ]);
603        let mut parser = setup_parser(0);
604        let deltas = parser.parse_reader(&buf[..]).unwrap();
605
606        assert_eq!(deltas.len(), 2);
607        assert_eq!(deltas[1].action, BookAction::Update);
608        assert_eq!(deltas[1].order.size.as_f64(), 75.0);
609        // Book order retains the resting price, not the execution price
610        assert_eq!(deltas[1].order.price.as_f64(), 150.0);
611    }
612
613    #[rstest]
614    fn test_execution_unknown_order_is_ignored() {
615        let buf = aapl_stream_with(&[build_order_executed_msg(1, 999, 50, 1001)]);
616        let mut parser = setup_parser(0);
617        let deltas = parser.parse_reader(&buf[..]).unwrap();
618
619        assert_eq!(deltas.len(), 0);
620    }
621
622    #[rstest]
623    fn test_replace_order() {
624        let buf = aapl_stream_with(&[
625            build_add_order_msg(1, 42, b'B', 100, 1_500_000),
626            build_replace_order_msg(1, 42, 43, 150, 1_510_000),
627        ]);
628        let mut parser = setup_parser(0);
629        let deltas = parser.parse_reader(&buf[..]).unwrap();
630
631        assert_eq!(deltas.len(), 3);
632        assert_eq!(deltas[1].action, BookAction::Delete);
633        assert_eq!(deltas[1].order.order_id, 42);
634        assert_eq!(deltas[2].action, BookAction::Add);
635        assert_eq!(deltas[2].order.order_id, 43);
636        assert_eq!(deltas[2].order.price.as_f64(), 151.0);
637        assert_eq!(deltas[2].order.size.as_f64(), 150.0);
638        assert_eq!(deltas[2].order.side, OrderSide::Buy);
639    }
640
641    #[rstest]
642    fn test_replace_inherits_side_from_original() {
643        let buf = aapl_stream_with(&[
644            build_add_order_msg(1, 42, b'S', 100, 1_500_000),
645            build_replace_order_msg(1, 42, 43, 200, 1_490_000),
646        ]);
647        let mut parser = setup_parser(0);
648        let deltas = parser.parse_reader(&buf[..]).unwrap();
649
650        assert_eq!(deltas[2].order.side, OrderSide::Sell);
651    }
652
653    #[rstest]
654    fn test_replace_delete_has_no_f_last_flag() {
655        // The delete in a replace pair is not the last event in the group
656        let buf = aapl_stream_with(&[
657            build_add_order_msg(1, 42, b'B', 100, 1_500_000),
658            build_replace_order_msg(1, 42, 43, 100, 1_510_000),
659        ]);
660        let mut parser = setup_parser(0);
661        let deltas = parser.parse_reader(&buf[..]).unwrap();
662
663        assert_eq!(deltas[1].flags, 0);
664        assert_ne!(deltas[2].flags & RecordFlag::F_LAST as u8, 0);
665    }
666
667    #[rstest]
668    fn test_replace_unknown_order_is_ignored() {
669        let buf = aapl_stream_with(&[build_replace_order_msg(1, 999, 1000, 100, 1_500_000)]);
670        let mut parser = setup_parser(0);
671        let deltas = parser.parse_reader(&buf[..]).unwrap();
672
673        assert_eq!(deltas.len(), 0);
674    }
675
676    #[rstest]
677    fn test_end_of_messages_emits_clear() {
678        let buf = aapl_stream_with(&[
679            build_add_order_msg(1, 42, b'B', 100, 1_500_000),
680            // EndOfMessages uses locate=0 (feed-level, not stock-specific)
681            build_system_event_msg(0, b'C'),
682        ]);
683        let mut parser = setup_parser(0);
684        let deltas = parser.parse_reader(&buf[..]).unwrap();
685
686        assert_eq!(deltas.len(), 2);
687        assert_eq!(deltas[1].action, BookAction::Clear);
688    }
689
690    #[rstest]
691    fn test_end_of_messages_with_different_locate() {
692        // Regression: EndOfMessages must be processed regardless of locate code
693        let buf = aapl_stream_with(&[
694            build_add_order_msg(1, 42, b'B', 100, 1_500_000),
695            build_system_event_msg(99, b'C'),
696        ]);
697        let mut parser = setup_parser(0);
698        let deltas = parser.parse_reader(&buf[..]).unwrap();
699
700        assert_eq!(deltas.len(), 2);
701        assert_eq!(deltas[1].action, BookAction::Clear);
702    }
703
704    #[rstest]
705    fn test_filters_by_stock_locate() {
706        let mut buf = build_stock_directory_msg(1, b"AAPL    ");
707        buf.extend_from_slice(&build_stock_directory_msg(2, b"MSFT    "));
708        buf.extend_from_slice(&build_add_order_msg(2, 10, b'B', 50, 3_000_000));
709        buf.extend_from_slice(&build_add_order_msg(1, 11, b'S', 200, 1_500_000));
710
711        let mut parser = setup_parser(0);
712        let deltas = parser.parse_reader(&buf[..]).unwrap();
713
714        assert_eq!(deltas.len(), 1);
715        assert_eq!(deltas[0].order.order_id, 11);
716    }
717
718    #[rstest]
719    fn test_messages_before_directory_are_ignored() {
720        let mut buf = Vec::new();
721        // AddOrder arrives before any StockDirectory
722        buf.extend_from_slice(&build_add_order_msg(1, 42, b'B', 100, 1_500_000));
723        buf.extend_from_slice(&build_stock_directory_msg(1, b"AAPL    "));
724        buf.extend_from_slice(&build_add_order_msg(1, 43, b'B', 100, 1_500_000));
725
726        let mut parser = setup_parser(0);
727        let deltas = parser.parse_reader(&buf[..]).unwrap();
728
729        // Only the second add (after directory) should be captured
730        assert_eq!(deltas.len(), 1);
731        assert_eq!(deltas[0].order.order_id, 43);
732    }
733
734    #[rstest]
735    fn test_timestamp_offset_from_midnight() {
736        let base_ns: u64 = 1_548_806_400_000_000_000; // 2019-01-30 midnight UTC
737        let itch_ts: u64 = 34_200_000_000_000; // 9:30 AM (ns since midnight)
738        let buf = aapl_stream_with(&[build_add_order_msg_with_ts(
739            1, 42, b'B', 100, 1_500_000, itch_ts,
740        )]);
741        let mut parser = setup_parser(base_ns);
742        let deltas = parser.parse_reader(&buf[..]).unwrap();
743
744        assert_eq!(deltas[0].ts_event, UnixNanos::from(base_ns + itch_ts));
745        assert_eq!(deltas[0].ts_init, deltas[0].ts_event);
746    }
747
748    #[rstest]
749    fn test_f_last_set_on_final_delta() {
750        let buf = aapl_stream_with(&[
751            build_add_order_msg(1, 42, b'B', 100, 1_500_000),
752            build_add_order_msg(1, 43, b'S', 200, 1_510_000),
753        ]);
754        let mut parser = setup_parser(0);
755        let deltas = parser.parse_reader(&buf[..]).unwrap();
756
757        assert_eq!(deltas.len(), 2);
758        assert_ne!(deltas[1].flags & RecordFlag::F_LAST as u8, 0);
759    }
760
761    #[rstest]
762    fn test_sequence_numbers_are_monotonic() {
763        let buf = aapl_stream_with(&[
764            build_add_order_msg(1, 42, b'B', 100, 1_500_000),
765            build_order_executed_msg(1, 42, 50, 1001),
766            build_add_order_msg(1, 43, b'S', 200, 1_510_000),
767            build_delete_order_msg(1, 43),
768        ]);
769        let mut parser = setup_parser(0);
770        let deltas = parser.parse_reader(&buf[..]).unwrap();
771
772        for i in 1..deltas.len() {
773            assert!(deltas[i].sequence > deltas[i - 1].sequence);
774        }
775    }
776
777    #[rstest]
778    fn test_empty_stream() {
779        let buf: &[u8] = &[];
780        let mut parser = setup_parser(0);
781        let deltas = parser.parse_reader(buf).unwrap();
782
783        assert_eq!(deltas.len(), 0);
784    }
785
786    fn build_msg(tag: u8, stock_locate: u16, timestamp: u64, body: &[u8]) -> Vec<u8> {
787        let msg_len = (1 + 2 + 2 + 6 + body.len()) as u16;
788        let mut buf = Vec::new();
789        buf.extend_from_slice(&msg_len.to_be_bytes());
790        buf.push(tag);
791        buf.extend_from_slice(&stock_locate.to_be_bytes());
792        buf.extend_from_slice(&0u16.to_be_bytes()); // tracking_number
793        // 6-byte timestamp (big-endian u48)
794        buf.push((timestamp >> 40) as u8);
795        buf.push((timestamp >> 32) as u8);
796        buf.push((timestamp >> 24) as u8);
797        buf.push((timestamp >> 16) as u8);
798        buf.push((timestamp >> 8) as u8);
799        buf.push(timestamp as u8);
800        buf.extend_from_slice(body);
801        buf
802    }
803
804    fn build_stock_directory_msg(locate: u16, stock: &[u8; 8]) -> Vec<u8> {
805        let mut body = Vec::new();
806        body.extend_from_slice(stock);
807        body.push(b'Q'); // market_category
808        body.push(b'N'); // financial_status
809        body.extend_from_slice(&100u32.to_be_bytes()); // round_lot_size
810        body.push(b'Y'); // round_lots_only
811        body.push(b'C'); // issue_classification
812        body.extend_from_slice(b"C "); // issue_subtype
813        body.push(b'P'); // authenticity
814        body.push(b'N'); // short_sale_threshold
815        body.push(b'N'); // ipo_flag
816        body.push(b'1'); // luld_ref_price_tier
817        body.push(b'N'); // etp_flag
818        body.extend_from_slice(&0u32.to_be_bytes()); // etp_leverage_factor
819        body.push(b'N'); // inverse_indicator
820        build_msg(b'R', locate, 0, &body)
821    }
822
823    fn build_add_order_msg(
824        locate: u16,
825        reference: u64,
826        side: u8,
827        shares: u32,
828        price: u32,
829    ) -> Vec<u8> {
830        build_add_order_msg_with_ts(locate, reference, side, shares, price, 0)
831    }
832
833    fn build_add_order_msg_with_ts(
834        locate: u16,
835        reference: u64,
836        side: u8,
837        shares: u32,
838        price: u32,
839        timestamp: u64,
840    ) -> Vec<u8> {
841        let mut body = Vec::new();
842        body.extend_from_slice(&reference.to_be_bytes());
843        body.push(side);
844        body.extend_from_slice(&shares.to_be_bytes());
845        body.extend_from_slice(b"AAPL    ");
846        body.extend_from_slice(&price.to_be_bytes());
847        build_msg(b'A', locate, timestamp, &body)
848    }
849
850    fn build_delete_order_msg(locate: u16, reference: u64) -> Vec<u8> {
851        build_msg(b'D', locate, 0, &reference.to_be_bytes())
852    }
853
854    fn build_order_cancelled_msg(locate: u16, reference: u64, cancelled: u32) -> Vec<u8> {
855        let mut body = Vec::new();
856        body.extend_from_slice(&reference.to_be_bytes());
857        body.extend_from_slice(&cancelled.to_be_bytes());
858        build_msg(b'X', locate, 0, &body)
859    }
860
861    fn build_order_executed_msg(
862        locate: u16,
863        reference: u64,
864        executed: u32,
865        match_number: u64,
866    ) -> Vec<u8> {
867        let mut body = Vec::new();
868        body.extend_from_slice(&reference.to_be_bytes());
869        body.extend_from_slice(&executed.to_be_bytes());
870        body.extend_from_slice(&match_number.to_be_bytes());
871        build_msg(b'E', locate, 0, &body)
872    }
873
874    fn build_order_executed_with_price_msg(
875        locate: u16,
876        reference: u64,
877        executed: u32,
878        match_number: u64,
879        price: u32,
880    ) -> Vec<u8> {
881        let mut body = Vec::new();
882        body.extend_from_slice(&reference.to_be_bytes());
883        body.extend_from_slice(&executed.to_be_bytes());
884        body.extend_from_slice(&match_number.to_be_bytes());
885        body.push(b'Y'); // printable
886        body.extend_from_slice(&price.to_be_bytes());
887        build_msg(b'C', locate, 0, &body)
888    }
889
890    fn build_replace_order_msg(
891        locate: u16,
892        old_reference: u64,
893        new_reference: u64,
894        shares: u32,
895        price: u32,
896    ) -> Vec<u8> {
897        let mut body = Vec::new();
898        body.extend_from_slice(&old_reference.to_be_bytes());
899        body.extend_from_slice(&new_reference.to_be_bytes());
900        body.extend_from_slice(&shares.to_be_bytes());
901        body.extend_from_slice(&price.to_be_bytes());
902        build_msg(b'U', locate, 0, &body)
903    }
904
905    fn build_system_event_msg(locate: u16, event_code: u8) -> Vec<u8> {
906        build_msg(b'S', locate, 0, &[event_code])
907    }
908
909    // Curates AAPL L3 deltas from NASDAQ ITCH 5.0 binary into NautilusTrader Parquet.
910    // Download source: https://emi.nasdaq.com/ITCH/Nasdaq%20ITCH/01302019.NASDAQ_ITCH50.gz
911    // Run: cargo test -p nautilus-testkit --lib test_curate_aapl_itch -- --ignored --nocapture
912    #[rstest]
913    #[ignore = "one-time dataset curation, not for routine CI"]
914    fn test_curate_aapl_itch() {
915        let itch_path = PathBuf::from("/tmp/01302019.NASDAQ_ITCH50.gz");
916        let instrument_id = InstrumentId::from("AAPL.XNAS");
917
918        // 2019-01-30 midnight EST (UTC-5) as Unix nanoseconds
919        let base_ns: u64 = 1_548_824_400_000_000_000;
920        let parquet_path = "/tmp/itch_AAPL.XNAS_2019-01-30_deltas.parquet";
921
922        println!("Parsing ITCH from {}", itch_path.display());
923        let mut parser = ItchParser::new(instrument_id, "AAPL", base_ns);
924        let deltas = parser.parse_gzip_file(&itch_path).unwrap();
925        let count = deltas.len();
926        println!("Parsed {count} deltas for AAPL");
927
928        let metadata =
929            OrderBookDelta::get_metadata(&instrument_id, PRICE_PRECISION, SIZE_PRECISION);
930        let schema = OrderBookDelta::get_schema(Some(metadata.clone()));
931
932        println!("Writing Parquet to {parquet_path}");
933        let file = File::create(parquet_path).unwrap();
934        let zstd_level = parquet::basic::ZstdLevel::try_new(3).unwrap();
935        let props = WriterProperties::builder()
936            .set_compression(parquet::basic::Compression::ZSTD(zstd_level))
937            .set_max_row_group_size(1_000_000)
938            .build();
939        let mut writer = ArrowWriter::try_new(file, Arc::new(schema), Some(props)).unwrap();
940
941        let chunk_size = 1_000_000;
942        for (i, chunk) in deltas.chunks(chunk_size).enumerate() {
943            println!("  Encoding chunk {} ({} records)...", i + 1, chunk.len());
944            let batch = OrderBookDelta::encode_batch(&metadata, chunk).unwrap();
945            writer.write(&batch).unwrap();
946        }
947        writer.close().unwrap();
948
949        let file_size = fs::metadata(parquet_path).unwrap().len();
950        println!("\nRecords: {count}");
951        println!("Price precision: {PRICE_PRECISION}");
952        println!("Size precision: {SIZE_PRECISION}");
953        println!(
954            "File size: {} bytes ({:.1} MB)",
955            file_size,
956            file_size as f64 / 1_048_576.0
957        );
958        println!("Output: {parquet_path}");
959        println!("\nNext steps:");
960        println!("  sha256sum {parquet_path}");
961    }
962}