1use std::collections::BinaryHeap;
19
20use ahash::AHashMap;
21use nautilus_core::UnixNanos;
22use nautilus_model::data::{Data, HasTsInit};
23
24#[derive(Debug, Eq, PartialEq)]
26struct HeapEntry {
27 ts: UnixNanos,
28 priority: i32,
29 index: usize,
30}
31
32impl Ord for HeapEntry {
33 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
34 self.ts
36 .cmp(&other.ts)
37 .then_with(|| self.priority.cmp(&other.priority))
38 .then_with(|| self.index.cmp(&other.index))
39 .reverse() }
41}
42
43impl PartialOrd for HeapEntry {
44 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
45 Some(self.cmp(other))
46 }
47}
48
49#[derive(Debug, Default)]
51pub struct BacktestDataIterator {
52 streams: AHashMap<i32, Vec<Data>>, names: AHashMap<i32, String>, priorities: AHashMap<String, i32>, indices: AHashMap<i32, usize>, heap: BinaryHeap<HeapEntry>,
57 single_priority: Option<i32>,
58 next_priority_counter: i32, }
60
61impl BacktestDataIterator {
62 #[must_use]
64 pub fn new() -> Self {
65 Self {
66 streams: AHashMap::new(),
67 names: AHashMap::new(),
68 priorities: AHashMap::new(),
69 indices: AHashMap::new(),
70 heap: BinaryHeap::new(),
71 single_priority: None,
72 next_priority_counter: 0,
73 }
74 }
75
76 pub fn add_data(&mut self, name: &str, mut data: Vec<Data>, append_data: bool) {
81 if data.is_empty() {
82 return;
83 }
84
85 data.sort_by_key(HasTsInit::ts_init);
87
88 let priority = if let Some(p) = self.priorities.get(name) {
89 *p
91 } else {
92 self.next_priority_counter += 1;
93 let sign = if append_data { 1 } else { -1 };
94 sign * self.next_priority_counter
95 };
96
97 self.remove_data(name, true);
99
100 self.streams.insert(priority, data);
101 self.names.insert(priority, name.to_string());
102 self.priorities.insert(name.to_string(), priority);
103 self.indices.insert(priority, 0);
104
105 self.rebuild_heap();
106 }
107
108 pub fn remove_data(&mut self, name: &str, complete_remove: bool) {
110 if let Some(priority) = self.priorities.remove(name) {
111 self.streams.remove(&priority);
112 self.indices.remove(&priority);
113 self.names.remove(&priority);
114
115 self.heap.retain(|e| e.priority != priority);
117
118 if self.heap.is_empty() {
119 self.single_priority = None;
120 }
121 }
122 if complete_remove {
123 }
125 }
126
127 pub fn set_index(&mut self, name: &str, index: usize) {
129 if let Some(priority) = self.priorities.get(name) {
130 self.indices.insert(*priority, index);
131 self.rebuild_heap();
132 }
133 }
134
135 pub fn reset_all_cursors(&mut self) {
137 for idx in self.indices.values_mut() {
138 *idx = 0;
139 }
140 self.rebuild_heap();
141 }
142
143 #[allow(clippy::should_implement_trait)]
145 pub fn next(&mut self) -> Option<Data> {
146 if let Some(p) = self.single_priority {
148 let data = self.streams.get_mut(&p)?;
149 let idx = self.indices.get_mut(&p)?;
150 if *idx >= data.len() {
151 return None;
152 }
153 let element = data[*idx].clone();
154 *idx += 1;
155 return Some(element);
156 }
157
158 let entry = self.heap.pop()?;
160 let stream_vec = self.streams.get(&entry.priority)?;
161 let element = stream_vec[entry.index].clone();
162
163 let next_index = entry.index + 1;
165 self.indices.insert(entry.priority, next_index);
166 if next_index < stream_vec.len() {
167 self.heap.push(HeapEntry {
168 ts: stream_vec[next_index].ts_init(),
169 priority: entry.priority,
170 index: next_index,
171 });
172 }
173
174 Some(element)
175 }
176
177 #[must_use]
179 pub fn is_done(&self) -> bool {
180 if let Some(p) = self.single_priority {
181 if let Some(idx) = self.indices.get(&p)
182 && let Some(vec) = self.streams.get(&p)
183 {
184 return *idx >= vec.len();
185 }
186 true
187 } else {
188 self.heap.is_empty()
189 }
190 }
191
192 fn rebuild_heap(&mut self) {
193 self.heap.clear();
194
195 if self.streams.len() == 1 {
197 self.single_priority = self.streams.keys().next().copied();
198 return;
199 }
200 self.single_priority = None;
201
202 for (&priority, vec) in &self.streams {
203 let idx = *self.indices.get(&priority).unwrap_or(&0);
204 if idx < vec.len() {
205 self.heap.push(HeapEntry {
206 ts: vec[idx].ts_init(),
207 priority,
208 index: idx,
209 });
210 }
211 }
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use nautilus_model::{
218 data::QuoteTick,
219 identifiers::InstrumentId,
220 types::{Price, Quantity},
221 };
222 use rstest::rstest;
223
224 use super::*;
225
226 fn quote(id: &str, ts: u64) -> Data {
227 let inst = InstrumentId::from(id);
228 Data::Quote(QuoteTick::new(
229 inst,
230 Price::from("1.0"),
231 Price::from("1.0"),
232 Quantity::from(100),
233 Quantity::from(100),
234 ts.into(),
235 ts.into(),
236 ))
237 }
238
239 fn collect_ts(it: &mut BacktestDataIterator) -> Vec<u64> {
240 let mut ts = Vec::new();
241 while let Some(d) = it.next() {
242 ts.push(d.ts_init().as_u64());
243 }
244 ts
245 }
246
247 #[rstest]
248 fn test_single_stream_yields_in_order() {
249 let mut it = BacktestDataIterator::new();
250 it.add_data(
251 "s",
252 vec![quote("A.B", 100), quote("A.B", 200), quote("A.B", 300)],
253 true,
254 );
255
256 assert_eq!(collect_ts(&mut it), vec![100, 200, 300]);
257 assert!(it.is_done());
258 }
259
260 #[rstest]
261 fn test_single_stream_exhaustion_returns_none() {
262 let mut it = BacktestDataIterator::new();
263 it.add_data("s", vec![quote("A.B", 1), quote("A.B", 3)], true);
264 assert_eq!(it.next().unwrap().ts_init(), UnixNanos::from(1));
265 assert_eq!(it.next().unwrap().ts_init(), UnixNanos::from(3));
266 assert!(it.next().is_none());
267 }
268
269 #[rstest]
270 fn test_single_stream_sorts_unsorted_input() {
271 let mut it = BacktestDataIterator::new();
272 it.add_data(
273 "s",
274 vec![quote("A.B", 300), quote("A.B", 100), quote("A.B", 200)],
275 true,
276 );
277
278 assert_eq!(collect_ts(&mut it), vec![100, 200, 300]);
279 }
280
281 #[rstest]
282 fn test_two_stream_merge_chronological() {
283 let mut it = BacktestDataIterator::new();
284 it.add_data("s1", vec![quote("A.B", 1), quote("A.B", 4)], true);
285 it.add_data("s2", vec![quote("C.D", 2), quote("C.D", 3)], false);
286
287 assert_eq!(collect_ts(&mut it), vec![1, 2, 3, 4]);
288 }
289
290 #[rstest]
291 fn test_three_stream_merge_sorted() {
292 let mut it = BacktestDataIterator::new();
293 let data_len = 5;
294 let d0: Vec<Data> = (0..data_len).map(|k| quote("A.B", 3 * k)).collect();
295 let d1: Vec<Data> = (0..data_len).map(|k| quote("C.D", 3 * k + 1)).collect();
296 let d2: Vec<Data> = (0..data_len).map(|k| quote("E.F", 3 * k + 2)).collect();
297 it.add_data("d0", d0, true);
298 it.add_data("d1", d1, true);
299 it.add_data("d2", d2, true);
300
301 let ts = collect_ts(&mut it);
302 assert_eq!(ts.len(), 15);
303 for i in 0..ts.len() - 1 {
304 assert!(ts[i] <= ts[i + 1], "Not sorted at index {i}");
305 }
306 }
307
308 #[rstest]
309 fn test_multiple_streams_merge_order() {
310 let mut it = BacktestDataIterator::new();
311 it.add_data("s1", vec![quote("A.B", 100), quote("A.B", 300)], true);
312 it.add_data("s2", vec![quote("C.D", 200), quote("C.D", 400)], true);
313
314 assert_eq!(collect_ts(&mut it), vec![100, 200, 300, 400]);
315 }
316
317 #[rstest]
318 fn test_append_data_priority_default_fifo() {
319 let mut it = BacktestDataIterator::new();
320 it.add_data("a", vec![quote("A.B", 100)], true);
321 it.add_data("b", vec![quote("C.D", 100)], true);
322
323 let ts = collect_ts(&mut it);
325 assert_eq!(ts, vec![100, 100]);
326 }
327
328 #[rstest]
329 fn test_prepend_priority_wins_ties() {
330 let mut it = BacktestDataIterator::new();
331 it.add_data("a", vec![quote("A.B", 100)], true);
333 it.add_data("b", vec![quote("C.D", 100)], false);
334
335 let first = it.next().unwrap();
337 let second = it.next().unwrap();
338 assert_eq!(first.instrument_id(), InstrumentId::from("C.D"));
340 assert_eq!(second.instrument_id(), InstrumentId::from("A.B"));
341 }
342
343 #[rstest]
344 fn test_is_done_empty_iterator() {
345 let it = BacktestDataIterator::new();
346 assert!(it.is_done());
347 }
348
349 #[rstest]
350 fn test_is_done_after_consumption() {
351 let mut it = BacktestDataIterator::new();
352 it.add_data("s", vec![quote("A.B", 1)], true);
353
354 assert!(!it.is_done());
355 it.next();
356 assert!(it.is_done());
357 }
358
359 #[rstest]
360 fn test_is_done_multi_stream() {
361 let mut it = BacktestDataIterator::new();
362 it.add_data("s1", vec![quote("A.B", 1)], true);
363 it.add_data("s2", vec![quote("C.D", 2)], true);
364
365 assert!(!it.is_done());
366 it.next();
367 assert!(!it.is_done());
368 it.next();
369 assert!(it.is_done());
370 }
371
372 #[rstest]
373 fn test_partial_consumption_then_complete() {
374 let mut it = BacktestDataIterator::new();
375 it.add_data(
376 "s",
377 vec![
378 quote("A.B", 0),
379 quote("A.B", 1),
380 quote("A.B", 2),
381 quote("A.B", 3),
382 ],
383 true,
384 );
385
386 assert_eq!(it.next().unwrap().ts_init().as_u64(), 0);
387 assert_eq!(it.next().unwrap().ts_init().as_u64(), 1);
388
389 let remaining = collect_ts(&mut it);
390 assert_eq!(remaining, vec![2, 3]);
391 assert!(it.is_done());
392 }
393
394 #[rstest]
395 fn test_remove_stream_reduces_output() {
396 let mut it = BacktestDataIterator::new();
397 it.add_data("a", vec![quote("A.B", 1)], true);
398 it.add_data("b", vec![quote("C.D", 2)], true);
399
400 it.remove_data("a", false);
401
402 assert_eq!(collect_ts(&mut it), vec![2]);
403 }
404
405 #[rstest]
406 fn test_remove_all_streams_yields_empty() {
407 let mut it = BacktestDataIterator::new();
408 it.add_data("x", vec![quote("A.B", 1)], true);
409 it.add_data("y", vec![quote("C.D", 2)], true);
410
411 it.remove_data("x", false);
412 it.remove_data("y", false);
413
414 assert!(it.next().is_none());
415 assert!(it.is_done());
416 }
417
418 #[rstest]
419 fn test_remove_nonexistent_stream_is_noop() {
420 let mut it = BacktestDataIterator::new();
421 it.add_data("s", vec![quote("A.B", 1)], true);
422
423 it.remove_data("nonexistent", false);
424
425 assert_eq!(collect_ts(&mut it), vec![1]);
426 }
427
428 #[rstest]
429 fn test_remove_after_full_consumption() {
430 let mut it = BacktestDataIterator::new();
431 it.add_data("s", vec![quote("A.B", 1), quote("A.B", 2)], true);
432
433 collect_ts(&mut it);
434
435 it.remove_data("s", true);
436 assert!(it.is_done());
437 }
438
439 #[rstest]
440 fn test_set_index_rewinds_stream() {
441 let mut it = BacktestDataIterator::new();
442 it.add_data(
443 "s",
444 vec![quote("A.B", 10), quote("A.B", 20), quote("A.B", 30)],
445 true,
446 );
447
448 assert_eq!(it.next().unwrap().ts_init().as_u64(), 10);
449
450 it.set_index("s", 0);
451
452 assert_eq!(collect_ts(&mut it), vec![10, 20, 30]);
453 }
454
455 #[rstest]
456 fn test_set_index_skips_forward() {
457 let mut it = BacktestDataIterator::new();
458 it.add_data(
459 "s",
460 vec![quote("A.B", 10), quote("A.B", 20), quote("A.B", 30)],
461 true,
462 );
463
464 it.set_index("s", 2);
465
466 assert_eq!(collect_ts(&mut it), vec![30]);
467 }
468
469 #[rstest]
470 fn test_set_index_nonexistent_stream_is_noop() {
471 let mut it = BacktestDataIterator::new();
472 it.add_data("s", vec![quote("A.B", 1)], true);
473
474 it.set_index("nonexistent", 0);
475
476 assert_eq!(collect_ts(&mut it), vec![1]);
477 }
478
479 #[rstest]
480 fn test_reset_all_cursors_single_stream() {
481 let mut it = BacktestDataIterator::new();
482 it.add_data("s", vec![quote("A.B", 1), quote("A.B", 2)], true);
483
484 collect_ts(&mut it);
485 assert!(it.is_done());
486
487 it.reset_all_cursors();
488 assert!(!it.is_done());
489 assert_eq!(collect_ts(&mut it), vec![1, 2]);
490 }
491
492 #[rstest]
493 fn test_reset_all_cursors_multi_stream() {
494 let mut it = BacktestDataIterator::new();
495 it.add_data("s1", vec![quote("A.B", 1), quote("A.B", 3)], true);
496 it.add_data("s2", vec![quote("C.D", 2), quote("C.D", 4)], true);
497
498 collect_ts(&mut it);
499 assert!(it.is_done());
500
501 it.reset_all_cursors();
502 assert_eq!(collect_ts(&mut it), vec![1, 2, 3, 4]);
503 }
504
505 #[rstest]
506 fn test_readding_data_replaces_stream() {
507 let mut it = BacktestDataIterator::new();
508 it.add_data("X", vec![quote("A.B", 1), quote("A.B", 2)], true);
509 it.add_data("X", vec![quote("A.B", 10)], true);
510
511 assert_eq!(collect_ts(&mut it), vec![10]);
512 }
513
514 #[rstest]
515 fn test_add_empty_data_is_noop() {
516 let mut it = BacktestDataIterator::new();
517 it.add_data("empty", vec![], true);
518
519 assert!(it.is_done());
520 assert!(it.next().is_none());
521 }
522
523 #[rstest]
524 fn test_empty_iterator_returns_none() {
525 let mut it = BacktestDataIterator::new();
526 assert!(it.next().is_none());
527 assert!(it.is_done());
528 }
529
530 #[rstest]
531 fn test_multiple_add_data_calls_with_different_names() {
532 let mut it = BacktestDataIterator::new();
533 it.add_data("batch_0", vec![quote("A.B", 1), quote("A.B", 3)], true);
534 it.add_data("batch_1", vec![quote("A.B", 2), quote("A.B", 4)], true);
535
536 assert_eq!(collect_ts(&mut it), vec![1, 2, 3, 4]);
537 }
538
539 #[rstest]
540 fn test_prepend_stream_always_wins_ties_across_batches() {
541 let mut it = BacktestDataIterator::new();
544 it.add_data("append_a", vec![quote("A.B", 100)], true);
545 it.add_data("append_b", vec![quote("C.D", 100)], true);
546 it.add_data("prepend", vec![quote("E.F", 100)], false);
547
548 let first = it.next().unwrap();
549 assert_eq!(
550 first.instrument_id(),
551 InstrumentId::from("E.F"),
552 "Prepend stream should always come first in ties"
553 );
554 }
555
556 #[rstest]
557 fn test_equal_timestamps_across_many_streams_preserves_priority_order() {
558 let mut it = BacktestDataIterator::new();
560 it.add_data("s1", vec![quote("A.B", 50)], true);
561 it.add_data("s2", vec![quote("C.D", 50)], true);
562 it.add_data("s3", vec![quote("E.F", 50)], true);
563 it.add_data("s4", vec![quote("G.H", 50)], true);
564
565 let mut ids = Vec::new();
566 while let Some(d) = it.next() {
567 ids.push(d.instrument_id());
568 }
569
570 assert_eq!(ids.len(), 4);
571
572 assert!(ids.contains(&InstrumentId::from("A.B")));
574 assert!(ids.contains(&InstrumentId::from("C.D")));
575 assert!(ids.contains(&InstrumentId::from("E.F")));
576 assert!(ids.contains(&InstrumentId::from("G.H")));
577 }
578}