Skip to main content

nautilus_backtest/
data_iterator.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Multi-stream, time-ordered data iterator for replaying historical market data.
17
18use std::collections::BinaryHeap;
19
20use ahash::AHashMap;
21use nautilus_core::UnixNanos;
22use nautilus_model::data::{Data, HasTsInit};
23
24/// Internal convenience struct to keep heap entries ordered by `(ts_init, priority)`.
25#[derive(Debug, Eq, PartialEq)]
26struct HeapEntry {
27    ts: UnixNanos,
28    priority: i32,
29    index: usize,
30}
31
32impl Ord for HeapEntry {
33    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
34        // min-heap on ts, then priority sign (+/-) then index
35        self.ts
36            .cmp(&other.ts)
37            .then_with(|| self.priority.cmp(&other.priority))
38            .then_with(|| self.index.cmp(&other.index))
39            .reverse() // BinaryHeap is max by default -> reverse for min behaviour
40    }
41}
42
43impl PartialOrd for HeapEntry {
44    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
45        Some(self.cmp(other))
46    }
47}
48
49/// Multi-stream, time-ordered data iterator used by the backtest engine.
50#[derive(Debug, Default)]
51pub struct BacktestDataIterator {
52    streams: AHashMap<i32, Vec<Data>>, // key: priority, value: Vec<Data>
53    names: AHashMap<i32, String>,      // priority -> name
54    priorities: AHashMap<String, i32>, // name -> priority
55    indices: AHashMap<i32, usize>,     // cursor per stream
56    heap: BinaryHeap<HeapEntry>,
57    single_priority: Option<i32>,
58    next_priority_counter: i32, // monotonically increasing counter used to assign priorities
59}
60
61impl BacktestDataIterator {
62    /// Creates a new empty [`BacktestDataIterator`].
63    #[must_use]
64    pub fn new() -> Self {
65        Self {
66            streams: AHashMap::new(),
67            names: AHashMap::new(),
68            priorities: AHashMap::new(),
69            indices: AHashMap::new(),
70            heap: BinaryHeap::new(),
71            single_priority: None,
72            next_priority_counter: 0,
73        }
74    }
75
76    /// Adds (or replaces) a named data stream.
77    ///
78    /// When `append_data` is true the stream gets lower priority on timestamp
79    /// ties; when false (prepend) it wins ties.
80    pub fn add_data(&mut self, name: &str, mut data: Vec<Data>, append_data: bool) {
81        if data.is_empty() {
82            return;
83        }
84
85        // Ensure sorted by ts_init
86        data.sort_by_key(HasTsInit::ts_init);
87
88        let priority = if let Some(p) = self.priorities.get(name) {
89            // Replace existing stream – remove previous traces then re-insert below.
90            *p
91        } else {
92            self.next_priority_counter += 1;
93            let sign = if append_data { 1 } else { -1 };
94            sign * self.next_priority_counter
95        };
96
97        // Remove old state if any
98        self.remove_data(name, true);
99
100        self.streams.insert(priority, data);
101        self.names.insert(priority, name.to_string());
102        self.priorities.insert(name.to_string(), priority);
103        self.indices.insert(priority, 0);
104
105        self.rebuild_heap();
106    }
107
108    /// Removes a named data stream.
109    pub fn remove_data(&mut self, name: &str, complete_remove: bool) {
110        if let Some(priority) = self.priorities.remove(name) {
111            self.streams.remove(&priority);
112            self.indices.remove(&priority);
113            self.names.remove(&priority);
114
115            // Rebuild heap sans removed priority
116            self.heap.retain(|e| e.priority != priority);
117
118            if self.heap.is_empty() {
119                self.single_priority = None;
120            }
121        }
122        if complete_remove {
123            // Placeholder for future generator cleanup
124        }
125    }
126
127    /// Sets the cursor of a named stream to `index` (0-based).
128    pub fn set_index(&mut self, name: &str, index: usize) {
129        if let Some(priority) = self.priorities.get(name) {
130            self.indices.insert(*priority, index);
131            self.rebuild_heap();
132        }
133    }
134
135    /// Resets all stream cursors to the beginning.
136    pub fn reset_all_cursors(&mut self) {
137        for idx in self.indices.values_mut() {
138            *idx = 0;
139        }
140        self.rebuild_heap();
141    }
142
143    /// Returns the next [`Data`] element across all streams in chronological order.
144    #[allow(clippy::should_implement_trait)]
145    pub fn next(&mut self) -> Option<Data> {
146        // Fast path for single stream
147        if let Some(p) = self.single_priority {
148            let data = self.streams.get_mut(&p)?;
149            let idx = self.indices.get_mut(&p)?;
150            if *idx >= data.len() {
151                return None;
152            }
153            let element = data[*idx].clone();
154            *idx += 1;
155            return Some(element);
156        }
157
158        // Multi-stream path using heap
159        let entry = self.heap.pop()?;
160        let stream_vec = self.streams.get(&entry.priority)?;
161        let element = stream_vec[entry.index].clone();
162
163        // Advance cursor and push next entry
164        let next_index = entry.index + 1;
165        self.indices.insert(entry.priority, next_index);
166        if next_index < stream_vec.len() {
167            self.heap.push(HeapEntry {
168                ts: stream_vec[next_index].ts_init(),
169                priority: entry.priority,
170                index: next_index,
171            });
172        }
173
174        Some(element)
175    }
176
177    /// Returns whether all streams have been fully consumed.
178    #[must_use]
179    pub fn is_done(&self) -> bool {
180        if let Some(p) = self.single_priority {
181            if let Some(idx) = self.indices.get(&p)
182                && let Some(vec) = self.streams.get(&p)
183            {
184                return *idx >= vec.len();
185            }
186            true
187        } else {
188            self.heap.is_empty()
189        }
190    }
191
192    fn rebuild_heap(&mut self) {
193        self.heap.clear();
194
195        // Determine if we’re in single-stream mode
196        if self.streams.len() == 1 {
197            self.single_priority = self.streams.keys().next().copied();
198            return;
199        }
200        self.single_priority = None;
201
202        for (&priority, vec) in &self.streams {
203            let idx = *self.indices.get(&priority).unwrap_or(&0);
204            if idx < vec.len() {
205                self.heap.push(HeapEntry {
206                    ts: vec[idx].ts_init(),
207                    priority,
208                    index: idx,
209                });
210            }
211        }
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use nautilus_model::{
218        data::QuoteTick,
219        identifiers::InstrumentId,
220        types::{Price, Quantity},
221    };
222    use rstest::rstest;
223
224    use super::*;
225
226    fn quote(id: &str, ts: u64) -> Data {
227        let inst = InstrumentId::from(id);
228        Data::Quote(QuoteTick::new(
229            inst,
230            Price::from("1.0"),
231            Price::from("1.0"),
232            Quantity::from(100),
233            Quantity::from(100),
234            ts.into(),
235            ts.into(),
236        ))
237    }
238
239    fn collect_ts(it: &mut BacktestDataIterator) -> Vec<u64> {
240        let mut ts = Vec::new();
241        while let Some(d) = it.next() {
242            ts.push(d.ts_init().as_u64());
243        }
244        ts
245    }
246
247    #[rstest]
248    fn test_single_stream_yields_in_order() {
249        let mut it = BacktestDataIterator::new();
250        it.add_data(
251            "s",
252            vec![quote("A.B", 100), quote("A.B", 200), quote("A.B", 300)],
253            true,
254        );
255
256        assert_eq!(collect_ts(&mut it), vec![100, 200, 300]);
257        assert!(it.is_done());
258    }
259
260    #[rstest]
261    fn test_single_stream_exhaustion_returns_none() {
262        let mut it = BacktestDataIterator::new();
263        it.add_data("s", vec![quote("A.B", 1), quote("A.B", 3)], true);
264        assert_eq!(it.next().unwrap().ts_init(), UnixNanos::from(1));
265        assert_eq!(it.next().unwrap().ts_init(), UnixNanos::from(3));
266        assert!(it.next().is_none());
267    }
268
269    #[rstest]
270    fn test_single_stream_sorts_unsorted_input() {
271        let mut it = BacktestDataIterator::new();
272        it.add_data(
273            "s",
274            vec![quote("A.B", 300), quote("A.B", 100), quote("A.B", 200)],
275            true,
276        );
277
278        assert_eq!(collect_ts(&mut it), vec![100, 200, 300]);
279    }
280
281    #[rstest]
282    fn test_two_stream_merge_chronological() {
283        let mut it = BacktestDataIterator::new();
284        it.add_data("s1", vec![quote("A.B", 1), quote("A.B", 4)], true);
285        it.add_data("s2", vec![quote("C.D", 2), quote("C.D", 3)], false);
286
287        assert_eq!(collect_ts(&mut it), vec![1, 2, 3, 4]);
288    }
289
290    #[rstest]
291    fn test_three_stream_merge_sorted() {
292        let mut it = BacktestDataIterator::new();
293        let data_len = 5;
294        let d0: Vec<Data> = (0..data_len).map(|k| quote("A.B", 3 * k)).collect();
295        let d1: Vec<Data> = (0..data_len).map(|k| quote("C.D", 3 * k + 1)).collect();
296        let d2: Vec<Data> = (0..data_len).map(|k| quote("E.F", 3 * k + 2)).collect();
297        it.add_data("d0", d0, true);
298        it.add_data("d1", d1, true);
299        it.add_data("d2", d2, true);
300
301        let ts = collect_ts(&mut it);
302        assert_eq!(ts.len(), 15);
303        for i in 0..ts.len() - 1 {
304            assert!(ts[i] <= ts[i + 1], "Not sorted at index {i}");
305        }
306    }
307
308    #[rstest]
309    fn test_multiple_streams_merge_order() {
310        let mut it = BacktestDataIterator::new();
311        it.add_data("s1", vec![quote("A.B", 100), quote("A.B", 300)], true);
312        it.add_data("s2", vec![quote("C.D", 200), quote("C.D", 400)], true);
313
314        assert_eq!(collect_ts(&mut it), vec![100, 200, 300, 400]);
315    }
316
317    #[rstest]
318    fn test_append_data_priority_default_fifo() {
319        let mut it = BacktestDataIterator::new();
320        it.add_data("a", vec![quote("A.B", 100)], true);
321        it.add_data("b", vec![quote("C.D", 100)], true);
322
323        // Both at same timestamp, FIFO order (a before b)
324        let ts = collect_ts(&mut it);
325        assert_eq!(ts, vec![100, 100]);
326    }
327
328    #[rstest]
329    fn test_prepend_priority_wins_ties() {
330        let mut it = BacktestDataIterator::new();
331        // "a" is appended (lower priority), "b" is prepended (higher priority)
332        it.add_data("a", vec![quote("A.B", 100)], true);
333        it.add_data("b", vec![quote("C.D", 100)], false);
334
335        // "b" (prepend) should come first despite being added second
336        let first = it.next().unwrap();
337        let second = it.next().unwrap();
338        // Prepend stream (negative priority) wins ties over append (positive)
339        assert_eq!(first.instrument_id(), InstrumentId::from("C.D"));
340        assert_eq!(second.instrument_id(), InstrumentId::from("A.B"));
341    }
342
343    #[rstest]
344    fn test_is_done_empty_iterator() {
345        let it = BacktestDataIterator::new();
346        assert!(it.is_done());
347    }
348
349    #[rstest]
350    fn test_is_done_after_consumption() {
351        let mut it = BacktestDataIterator::new();
352        it.add_data("s", vec![quote("A.B", 1)], true);
353
354        assert!(!it.is_done());
355        it.next();
356        assert!(it.is_done());
357    }
358
359    #[rstest]
360    fn test_is_done_multi_stream() {
361        let mut it = BacktestDataIterator::new();
362        it.add_data("s1", vec![quote("A.B", 1)], true);
363        it.add_data("s2", vec![quote("C.D", 2)], true);
364
365        assert!(!it.is_done());
366        it.next();
367        assert!(!it.is_done());
368        it.next();
369        assert!(it.is_done());
370    }
371
372    #[rstest]
373    fn test_partial_consumption_then_complete() {
374        let mut it = BacktestDataIterator::new();
375        it.add_data(
376            "s",
377            vec![
378                quote("A.B", 0),
379                quote("A.B", 1),
380                quote("A.B", 2),
381                quote("A.B", 3),
382            ],
383            true,
384        );
385
386        assert_eq!(it.next().unwrap().ts_init().as_u64(), 0);
387        assert_eq!(it.next().unwrap().ts_init().as_u64(), 1);
388
389        let remaining = collect_ts(&mut it);
390        assert_eq!(remaining, vec![2, 3]);
391        assert!(it.is_done());
392    }
393
394    #[rstest]
395    fn test_remove_stream_reduces_output() {
396        let mut it = BacktestDataIterator::new();
397        it.add_data("a", vec![quote("A.B", 1)], true);
398        it.add_data("b", vec![quote("C.D", 2)], true);
399
400        it.remove_data("a", false);
401
402        assert_eq!(collect_ts(&mut it), vec![2]);
403    }
404
405    #[rstest]
406    fn test_remove_all_streams_yields_empty() {
407        let mut it = BacktestDataIterator::new();
408        it.add_data("x", vec![quote("A.B", 1)], true);
409        it.add_data("y", vec![quote("C.D", 2)], true);
410
411        it.remove_data("x", false);
412        it.remove_data("y", false);
413
414        assert!(it.next().is_none());
415        assert!(it.is_done());
416    }
417
418    #[rstest]
419    fn test_remove_nonexistent_stream_is_noop() {
420        let mut it = BacktestDataIterator::new();
421        it.add_data("s", vec![quote("A.B", 1)], true);
422
423        it.remove_data("nonexistent", false);
424
425        assert_eq!(collect_ts(&mut it), vec![1]);
426    }
427
428    #[rstest]
429    fn test_remove_after_full_consumption() {
430        let mut it = BacktestDataIterator::new();
431        it.add_data("s", vec![quote("A.B", 1), quote("A.B", 2)], true);
432
433        collect_ts(&mut it);
434
435        it.remove_data("s", true);
436        assert!(it.is_done());
437    }
438
439    #[rstest]
440    fn test_set_index_rewinds_stream() {
441        let mut it = BacktestDataIterator::new();
442        it.add_data(
443            "s",
444            vec![quote("A.B", 10), quote("A.B", 20), quote("A.B", 30)],
445            true,
446        );
447
448        assert_eq!(it.next().unwrap().ts_init().as_u64(), 10);
449
450        it.set_index("s", 0);
451
452        assert_eq!(collect_ts(&mut it), vec![10, 20, 30]);
453    }
454
455    #[rstest]
456    fn test_set_index_skips_forward() {
457        let mut it = BacktestDataIterator::new();
458        it.add_data(
459            "s",
460            vec![quote("A.B", 10), quote("A.B", 20), quote("A.B", 30)],
461            true,
462        );
463
464        it.set_index("s", 2);
465
466        assert_eq!(collect_ts(&mut it), vec![30]);
467    }
468
469    #[rstest]
470    fn test_set_index_nonexistent_stream_is_noop() {
471        let mut it = BacktestDataIterator::new();
472        it.add_data("s", vec![quote("A.B", 1)], true);
473
474        it.set_index("nonexistent", 0);
475
476        assert_eq!(collect_ts(&mut it), vec![1]);
477    }
478
479    #[rstest]
480    fn test_reset_all_cursors_single_stream() {
481        let mut it = BacktestDataIterator::new();
482        it.add_data("s", vec![quote("A.B", 1), quote("A.B", 2)], true);
483
484        collect_ts(&mut it);
485        assert!(it.is_done());
486
487        it.reset_all_cursors();
488        assert!(!it.is_done());
489        assert_eq!(collect_ts(&mut it), vec![1, 2]);
490    }
491
492    #[rstest]
493    fn test_reset_all_cursors_multi_stream() {
494        let mut it = BacktestDataIterator::new();
495        it.add_data("s1", vec![quote("A.B", 1), quote("A.B", 3)], true);
496        it.add_data("s2", vec![quote("C.D", 2), quote("C.D", 4)], true);
497
498        collect_ts(&mut it);
499        assert!(it.is_done());
500
501        it.reset_all_cursors();
502        assert_eq!(collect_ts(&mut it), vec![1, 2, 3, 4]);
503    }
504
505    #[rstest]
506    fn test_readding_data_replaces_stream() {
507        let mut it = BacktestDataIterator::new();
508        it.add_data("X", vec![quote("A.B", 1), quote("A.B", 2)], true);
509        it.add_data("X", vec![quote("A.B", 10)], true);
510
511        assert_eq!(collect_ts(&mut it), vec![10]);
512    }
513
514    #[rstest]
515    fn test_add_empty_data_is_noop() {
516        let mut it = BacktestDataIterator::new();
517        it.add_data("empty", vec![], true);
518
519        assert!(it.is_done());
520        assert!(it.next().is_none());
521    }
522
523    #[rstest]
524    fn test_empty_iterator_returns_none() {
525        let mut it = BacktestDataIterator::new();
526        assert!(it.next().is_none());
527        assert!(it.is_done());
528    }
529
530    #[rstest]
531    fn test_multiple_add_data_calls_with_different_names() {
532        let mut it = BacktestDataIterator::new();
533        it.add_data("batch_0", vec![quote("A.B", 1), quote("A.B", 3)], true);
534        it.add_data("batch_1", vec![quote("A.B", 2), quote("A.B", 4)], true);
535
536        assert_eq!(collect_ts(&mut it), vec![1, 2, 3, 4]);
537    }
538
539    #[rstest]
540    fn test_prepend_stream_always_wins_ties_across_batches() {
541        // Verifies that a prepend stream (negative priority) wins ties
542        // even when added after multiple append streams
543        let mut it = BacktestDataIterator::new();
544        it.add_data("append_a", vec![quote("A.B", 100)], true);
545        it.add_data("append_b", vec![quote("C.D", 100)], true);
546        it.add_data("prepend", vec![quote("E.F", 100)], false);
547
548        let first = it.next().unwrap();
549        assert_eq!(
550            first.instrument_id(),
551            InstrumentId::from("E.F"),
552            "Prepend stream should always come first in ties"
553        );
554    }
555
556    #[rstest]
557    fn test_equal_timestamps_across_many_streams_preserves_priority_order() {
558        // All items at the same timestamp — ordering is strictly by priority
559        let mut it = BacktestDataIterator::new();
560        it.add_data("s1", vec![quote("A.B", 50)], true);
561        it.add_data("s2", vec![quote("C.D", 50)], true);
562        it.add_data("s3", vec![quote("E.F", 50)], true);
563        it.add_data("s4", vec![quote("G.H", 50)], true);
564
565        let mut ids = Vec::new();
566        while let Some(d) = it.next() {
567            ids.push(d.instrument_id());
568        }
569
570        assert_eq!(ids.len(), 4);
571
572        // All should be yielded (no duplicates dropped, no items lost)
573        assert!(ids.contains(&InstrumentId::from("A.B")));
574        assert!(ids.contains(&InstrumentId::from("C.D")));
575        assert!(ids.contains(&InstrumentId::from("E.F")));
576        assert!(ids.contains(&InstrumentId::from("G.H")));
577    }
578}