git_features/parallel/
in_order.rs1use std::{cmp::Ordering, collections::BTreeMap};
2
3pub type SequenceId = usize;
5
6pub struct InOrderIter<T, I> {
8 pub inner: I,
10 store: BTreeMap<SequenceId, T>,
11 next_chunk: SequenceId,
12 is_done: bool,
13}
14
15impl<T, E, I> From<I> for InOrderIter<T, I>
16where
17 I: Iterator<Item = Result<(SequenceId, T), E>>,
18{
19 fn from(iter: I) -> Self {
20 InOrderIter {
21 inner: iter,
22 store: Default::default(),
23 next_chunk: 0,
24 is_done: false,
25 }
26 }
27}
28
29impl<T, E, I> Iterator for InOrderIter<T, I>
30where
31 I: Iterator<Item = Result<(SequenceId, T), E>>,
32{
33 type Item = Result<T, E>;
34
35 fn next(&mut self) -> Option<Self::Item> {
36 if self.is_done {
37 return None;
38 }
39 'find_next_in_sequence: loop {
40 match self.inner.next() {
41 Some(Ok((c, v))) => match c.cmp(&self.next_chunk) {
42 Ordering::Equal => {
43 self.next_chunk += 1;
44 return Some(Ok(v));
45 }
46 Ordering::Less => {
47 unreachable!("in a correctly ordered sequence we can never see keys again, got {}", c)
48 }
49 Ordering::Greater => {
50 let previous = self.store.insert(c, v);
51 assert!(
52 previous.is_none(),
53 "Chunks are returned only once, input is an invalid sequence"
54 );
55 if let Some(v) = self.store.remove(&self.next_chunk) {
56 self.next_chunk += 1;
57 return Some(Ok(v));
58 }
59 continue 'find_next_in_sequence;
60 }
61 },
62 Some(Err(e)) => {
63 self.is_done = true;
64 self.store.clear();
65 return Some(Err(e));
66 }
67 None => match self.store.remove(&self.next_chunk) {
68 Some(v) => {
69 self.next_chunk += 1;
70 return Some(Ok(v));
71 }
72 None => {
73 debug_assert!(
74 self.store.is_empty(),
75 "When iteration is done we should not have stored items left"
76 );
77 return None;
78 }
79 },
80 }
81 }
82 }
83}