par_stream/index_stream.rs
1use crate::common::*;
2
3/// The trait extends [Stream](futures::stream::Stream) types with ordering manipulation combinators.
4pub trait IndexStreamExt
5where
6 Self: Stream<Item = (usize, Self::IndexedItem)>,
7{
8 type IndexedItem;
9
10 /// Reorders the input items `(index, item)` according to the index number and returns `item`.
11 ///
12 /// It can be combined with [enumerate()](futures::StreamExt::enumerate) and parallel
13 /// unordered tasks.
14 ///
15 /// The index numbers must start from zero, be unique and contiguous. Index not starting
16 /// from zero causes the stream to hang indefinitely.
17 ///
18 /// # Panics
19 /// The repeating of an index will cause the stream to panic.
20 ///
21 /// ```rust
22 /// # par_stream::rt::block_on_executor(async move {
23 /// use futures::prelude::*;
24 /// use par_stream::prelude::*;
25 ///
26 /// let doubled: Vec<_> = stream::iter(0..1000)
27 /// // add index number
28 /// .enumerate()
29 /// // double the values in parallel
30 /// .par_then_unordered(None, move |(index, value)| {
31 /// // the closure is sent to parallel worker
32 /// async move { (index, value * 2) }
33 /// })
34 /// // add values by one in parallel
35 /// .par_then_unordered(None, move |(index, value)| {
36 /// // the closure is sent to parallel worker
37 /// async move { (index, value + 1) }
38 /// })
39 /// // reorder the values according to index number
40 /// .reorder_enumerated()
41 /// .collect()
42 /// .await;
43 /// let expect: Vec<_> = (0..1000).map(|value| value * 2 + 1).collect();
44 /// assert_eq!(doubled, expect);
45 /// # })
46 /// ```
47 fn reorder_enumerated(self) -> ReorderEnumerated<Self, Self::IndexedItem>;
48}
49
50impl<S, T> IndexStreamExt for S
51where
52 S: Stream<Item = (usize, T)>,
53{
54 type IndexedItem = T;
55
56 fn reorder_enumerated(self) -> ReorderEnumerated<Self, Self::IndexedItem> {
57 ReorderEnumerated {
58 commit: 0,
59 buffer: HashMap::new(),
60 stream: self,
61 }
62 }
63}
64
65// reorder_enumerated
66
67pub use reorder_enumerated::*;
68
69mod reorder_enumerated {
70 use super::*;
71
72 /// Stream for the [reorder_enumerated](IndexStreamExt::reorder_enumerated) method.
73 #[derive(Derivative)]
74 #[derivative(Debug)]
75 #[pin_project]
76 pub struct ReorderEnumerated<S, T>
77 where
78 S: ?Sized,
79 {
80 pub(super) commit: usize,
81 pub(super) buffer: HashMap<usize, T>,
82 #[pin]
83 pub(super) stream: S,
84 }
85
86 impl<S, T> Stream for ReorderEnumerated<S, T>
87 where
88 S: Stream<Item = (usize, T)>,
89 {
90 type Item = T;
91
92 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
93 let mut this = self.project();
94
95 Ready(loop {
96 if let Some(item) = this.buffer.remove(&*this.commit) {
97 *this.commit += 1;
98 break Some(item);
99 } else {
100 match ready!(Pin::new(&mut this.stream).poll_next(cx)) {
101 Some((index, item)) => match (*this.commit).cmp(&index) {
102 Less => {
103 let prev = this.buffer.insert(index, item);
104 assert!(
105 prev.is_none(),
106 "the index number {} appears more than once",
107 index
108 );
109 }
110 Equal => {
111 *this.commit += 1;
112 break Some(item);
113 }
114 Greater => {
115 panic!("the index number {} appears more than once", index);
116 }
117 },
118 None => {
119 assert!(
120 this.buffer.is_empty(),
121 "the item for index number {} is missing",
122 this.commit
123 );
124 break None;
125 }
126 }
127 }
128 })
129 }
130 }
131}