par_stream/
index_stream.rs

1use crate::common::*;
2
3/// The trait extends [Stream](futures::stream::Stream) types with ordering manipulation combinators.
4pub trait IndexStreamExt
5where
6    Self: Stream<Item = (usize, Self::IndexedItem)>,
7{
8    type IndexedItem;
9
10    /// Reorders the input items `(index, item)` according to the index number and returns `item`.
11    ///
12    /// It can be combined with [enumerate()](futures::StreamExt::enumerate) and parallel
13    /// unordered tasks.
14    ///
15    /// The index numbers must start from zero, be unique and contiguous. Index not starting
16    /// from zero causes the stream to hang indefinitely.
17    ///
18    /// # Panics
19    /// The repeating of an index will cause the stream to panic.
20    ///
21    /// ```rust
22    /// # par_stream::rt::block_on_executor(async move {
23    /// use futures::prelude::*;
24    /// use par_stream::prelude::*;
25    ///
26    /// let doubled: Vec<_> = stream::iter(0..1000)
27    ///     // add index number
28    ///     .enumerate()
29    ///     // double the values in parallel
30    ///     .par_then_unordered(None, move |(index, value)| {
31    ///         // the closure is sent to parallel worker
32    ///         async move { (index, value * 2) }
33    ///     })
34    ///     // add values by one in parallel
35    ///     .par_then_unordered(None, move |(index, value)| {
36    ///         // the closure is sent to parallel worker
37    ///         async move { (index, value + 1) }
38    ///     })
39    ///     // reorder the values according to index number
40    ///     .reorder_enumerated()
41    ///     .collect()
42    ///     .await;
43    /// let expect: Vec<_> = (0..1000).map(|value| value * 2 + 1).collect();
44    /// assert_eq!(doubled, expect);
45    /// # })
46    /// ```
47    fn reorder_enumerated(self) -> ReorderEnumerated<Self, Self::IndexedItem>;
48}
49
50impl<S, T> IndexStreamExt for S
51where
52    S: Stream<Item = (usize, T)>,
53{
54    type IndexedItem = T;
55
56    fn reorder_enumerated(self) -> ReorderEnumerated<Self, Self::IndexedItem> {
57        ReorderEnumerated {
58            commit: 0,
59            buffer: HashMap::new(),
60            stream: self,
61        }
62    }
63}
64
65// reorder_enumerated
66
67pub use reorder_enumerated::*;
68
69mod reorder_enumerated {
70    use super::*;
71
72    /// Stream for the [reorder_enumerated](IndexStreamExt::reorder_enumerated) method.
73    #[derive(Derivative)]
74    #[derivative(Debug)]
75    #[pin_project]
76    pub struct ReorderEnumerated<S, T>
77    where
78        S: ?Sized,
79    {
80        pub(super) commit: usize,
81        pub(super) buffer: HashMap<usize, T>,
82        #[pin]
83        pub(super) stream: S,
84    }
85
86    impl<S, T> Stream for ReorderEnumerated<S, T>
87    where
88        S: Stream<Item = (usize, T)>,
89    {
90        type Item = T;
91
92        fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
93            let mut this = self.project();
94
95            Ready(loop {
96                if let Some(item) = this.buffer.remove(&*this.commit) {
97                    *this.commit += 1;
98                    break Some(item);
99                } else {
100                    match ready!(Pin::new(&mut this.stream).poll_next(cx)) {
101                        Some((index, item)) => match (*this.commit).cmp(&index) {
102                            Less => {
103                                let prev = this.buffer.insert(index, item);
104                                assert!(
105                                    prev.is_none(),
106                                    "the index number {} appears more than once",
107                                    index
108                                );
109                            }
110                            Equal => {
111                                *this.commit += 1;
112                                break Some(item);
113                            }
114                            Greater => {
115                                panic!("the index number {} appears more than once", index);
116                            }
117                        },
118                        None => {
119                            assert!(
120                                this.buffer.is_empty(),
121                                "the item for index number {} is missing",
122                                this.commit
123                            );
124                            break None;
125                        }
126                    }
127                }
128            })
129        }
130    }
131}