orx_concurrent_iter/pullers/
flattened_enumerated_chunk_puller.rs

1use super::ChunkPuller;
2use core::iter::Enumerate;
3
4/// Flattened version of a [`ChunkPuller`] which conveniently implements [`Iterator`].
5///
6/// Similar to the regular chunk puller, a flattened enumerated chunk puller is created from and
7/// linked to and pulls its elements from a [`ConcurrentIter`].
8///
9/// It can be created by calling the [`flattened_with_idx`] method on a chunk puller that is
10/// created by the [`chunk_puller`] method of a concurrent iterator.
11///
12/// Unlike the [`FlattenedChunkPuller`], flattened enumerated chunk puller additionally returns
13/// the indices of elements.
14///
15/// [`ChunkPuller`]: crate::ChunkPuller
16/// [`ConcurrentIter`]: crate::ConcurrentIter
17/// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
18/// [`flattened_with_idx`]: crate::ChunkPuller::flattened_with_idx
19/// [`FlattenedChunkPuller`]: crate::FlattenedChunkPuller
20///
21/// # Examples
22///
23/// See the [`FlattenedChunkPuller`] for detailed examples.
24/// The following example only demonstrates the additional index that is returned by the
25/// next method of the `FlattenedEnumeratedChunkPuller`.
26///
27/// ```
28/// use orx_concurrent_iter::*;
29///
30/// let num_threads = 4;
31/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
32/// let con_iter = data.con_iter();
33///
34/// std::thread::scope(|s| {
35///     for _ in 0..num_threads {
36///         s.spawn(|| {
37///             for (idx, value) in con_iter.chunk_puller(8).flattened_with_idx() {
38///                 assert_eq!(value, &idx.to_string());
39///             }
40///         });
41///     }
42/// });
43/// ```
44pub struct FlattenedEnumeratedChunkPuller<'c, P>
45where
46    P: ChunkPuller + 'c,
47{
48    puller: P,
49    current_begin_idx: usize,
50    current_chunk: Enumerate<P::Chunk<'c>>,
51}
52
53impl<'c, P> From<P> for FlattenedEnumeratedChunkPuller<'c, P>
54where
55    P: ChunkPuller + 'c,
56{
57    fn from(puller: P) -> Self {
58        Self {
59            puller,
60            current_begin_idx: 0,
61            current_chunk: Default::default(),
62        }
63    }
64}
65
66impl<'c, P> FlattenedEnumeratedChunkPuller<'c, P>
67where
68    P: ChunkPuller + 'c,
69{
70    /// Converts the flattened chunk puller back to the chunk puller it
71    /// is created from.
72    pub fn into_chunk_puller(self) -> P {
73        self.puller
74    }
75
76    fn next_chunk(&mut self) -> Option<(usize, P::ChunkItem)> {
77        let puller = unsafe { &mut *(&mut self.puller as *mut P) };
78        match puller.pull_with_idx() {
79            Some((begin_idx, chunk)) => {
80                self.current_begin_idx = begin_idx;
81                self.current_chunk = chunk.enumerate();
82                self.next()
83            }
84            None => None,
85        }
86    }
87}
88
89impl<'c, P> Iterator for FlattenedEnumeratedChunkPuller<'c, P>
90where
91    P: ChunkPuller + 'c,
92{
93    type Item = (usize, P::ChunkItem);
94
95    fn next(&mut self) -> Option<Self::Item> {
96        match self.current_chunk.next() {
97            Some((i, x)) => Some((self.current_begin_idx + i, x)),
98            None => self.next_chunk(),
99        }
100    }
101}