orx_concurrent_iter/pullers/flattened_enumerated_chunk_puller.rs
1use super::ChunkPuller;
2use core::iter::Enumerate;
3
4/// Flattened version of a [`ChunkPuller`] which conveniently implements [`Iterator`].
5///
6/// Similar to the regular chunk puller, a flattened enumerated chunk puller is created from and
7/// linked to and pulls its elements from a [`ConcurrentIter`].
8///
9/// It can be created by calling the [`flattened_with_idx`] method on a chunk puller that is
10/// created by the [`chunk_puller`] method of a concurrent iterator.
11///
12/// Unlike the [`FlattenedChunkPuller`], flattened enumerated chunk puller additionally returns
13/// the indices of elements.
14///
15/// [`ChunkPuller`]: crate::ChunkPuller
16/// [`ConcurrentIter`]: crate::ConcurrentIter
17/// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
18/// [`flattened_with_idx`]: crate::ChunkPuller::flattened_with_idx
19/// [`FlattenedChunkPuller`]: crate::FlattenedChunkPuller
20///
21/// # Examples
22///
23/// See the [`FlattenedChunkPuller`] for detailed examples.
24/// The following example only demonstrates the additional index that is returned by the
25/// next method of the `FlattenedEnumeratedChunkPuller`.
26///
27/// ```
28/// use orx_concurrent_iter::*;
29///
30/// let num_threads = 4;
31/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
32/// let con_iter = data.con_iter();
33///
34/// std::thread::scope(|s| {
35/// for _ in 0..num_threads {
36/// s.spawn(|| {
37/// for (idx, value) in con_iter.chunk_puller(8).flattened_with_idx() {
38/// assert_eq!(value, &idx.to_string());
39/// }
40/// });
41/// }
42/// });
43/// ```
44pub struct FlattenedEnumeratedChunkPuller<'c, P>
45where
46 P: ChunkPuller + 'c,
47{
48 puller: P,
49 current_begin_idx: usize,
50 current_chunk: Enumerate<P::Chunk<'c>>,
51}
52
53impl<'c, P> From<P> for FlattenedEnumeratedChunkPuller<'c, P>
54where
55 P: ChunkPuller + 'c,
56{
57 fn from(puller: P) -> Self {
58 Self {
59 puller,
60 current_begin_idx: 0,
61 current_chunk: Default::default(),
62 }
63 }
64}
65
66impl<'c, P> FlattenedEnumeratedChunkPuller<'c, P>
67where
68 P: ChunkPuller + 'c,
69{
70 /// Converts the flattened chunk puller back to the chunk puller it
71 /// is created from.
72 pub fn into_chunk_puller(self) -> P {
73 self.puller
74 }
75
76 fn next_chunk(&mut self) -> Option<(usize, P::ChunkItem)> {
77 let puller = unsafe { &mut *(&mut self.puller as *mut P) };
78 match puller.pull_with_idx() {
79 Some((begin_idx, chunk)) => {
80 self.current_begin_idx = begin_idx;
81 self.current_chunk = chunk.enumerate();
82 self.next()
83 }
84 None => None,
85 }
86 }
87}
88
89impl<'c, P> Iterator for FlattenedEnumeratedChunkPuller<'c, P>
90where
91 P: ChunkPuller + 'c,
92{
93 type Item = (usize, P::ChunkItem);
94
95 fn next(&mut self) -> Option<Self::Item> {
96 match self.current_chunk.next() {
97 Some((i, x)) => Some((self.current_begin_idx + i, x)),
98 None => self.next_chunk(),
99 }
100 }
101}