par_iter/iter/
blocks.rs

1use super::{plumbing::*, *};
2
3struct BlocksCallback<S, C> {
4    sizes: S,
5    consumer: C,
6    len: usize,
7}
8
9impl<T, S, C> ProducerCallback<T> for BlocksCallback<S, C>
10where
11    C: UnindexedConsumer<T>,
12    S: Iterator<Item = usize>,
13{
14    type Output = C::Result;
15
16    fn callback<P: Producer<Item = T>>(mut self, mut producer: P) -> Self::Output {
17        let mut remaining_len = self.len;
18        let mut consumer = self.consumer;
19
20        // we need a local variable for the accumulated results
21        // we call the reducer's identity by splitting at 0
22        let (left_consumer, right_consumer, _) = consumer.split_at(0);
23        let mut leftmost_res = left_consumer.into_folder().complete();
24        consumer = right_consumer;
25
26        // now we loop on each block size
27        while remaining_len > 0 && !consumer.full() {
28            // we compute the next block's size
29            let size = self.sizes.next().unwrap_or(usize::MAX);
30            let capped_size = remaining_len.min(size);
31            remaining_len -= capped_size;
32
33            // split the producer
34            let (left_producer, right_producer) = producer.split_at(capped_size);
35            producer = right_producer;
36
37            // split the consumer
38            let (left_consumer, right_consumer, _) = consumer.split_at(capped_size);
39            consumer = right_consumer;
40
41            leftmost_res = consumer.to_reducer().reduce(
42                leftmost_res,
43                bridge_producer_consumer(capped_size, left_producer, left_consumer),
44            );
45        }
46        leftmost_res
47    }
48}
49
50/// `ExponentialBlocks` is a parallel iterator that consumes itself as a
51/// sequence of parallel blocks of increasing sizes (exponentially).
52///
53/// This struct is created by the [`by_exponential_blocks()`] method on
54/// [`IndexedParallelIterator`]
55///
56/// [`by_exponential_blocks()`]: trait.IndexedParallelIterator.html#method.by_exponential_blocks
57/// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html
58#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
59#[derive(Debug, Clone)]
60pub struct ExponentialBlocks<I> {
61    base: I,
62}
63
64impl<I> ExponentialBlocks<I> {
65    pub(super) fn new(base: I) -> Self {
66        Self { base }
67    }
68}
69
70impl<I> ParallelIterator for ExponentialBlocks<I>
71where
72    I: IndexedParallelIterator,
73{
74    type Item = I::Item;
75
76    fn drive_unindexed<C>(self, consumer: C) -> C::Result
77    where
78        C: UnindexedConsumer<Self::Item>,
79    {
80        let first = crate::current_num_threads();
81        let callback = BlocksCallback {
82            consumer,
83            sizes: std::iter::successors(Some(first), exponential_size),
84            len: self.base.len(),
85        };
86        self.base.with_producer(callback)
87    }
88}
89
90fn exponential_size(size: &usize) -> Option<usize> {
91    Some(size.saturating_mul(2))
92}
93
94/// `UniformBlocks` is a parallel iterator that consumes itself as a sequence
95/// of parallel blocks of constant sizes.
96///
97/// This struct is created by the [`by_uniform_blocks()`] method on
98/// [`IndexedParallelIterator`]
99///
100/// [`by_uniform_blocks()`]: trait.IndexedParallelIterator.html#method.by_uniform_blocks
101/// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html
102#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
103#[derive(Debug, Clone)]
104pub struct UniformBlocks<I> {
105    base: I,
106    block_size: usize,
107}
108
109impl<I> UniformBlocks<I> {
110    pub(super) fn new(base: I, block_size: usize) -> Self {
111        Self { base, block_size }
112    }
113}
114
115impl<I> ParallelIterator for UniformBlocks<I>
116where
117    I: IndexedParallelIterator,
118{
119    type Item = I::Item;
120
121    fn drive_unindexed<C>(self, consumer: C) -> C::Result
122    where
123        C: UnindexedConsumer<Self::Item>,
124    {
125        let callback = BlocksCallback {
126            consumer,
127            sizes: std::iter::repeat(self.block_size),
128            len: self.base.len(),
129        };
130        self.base.with_producer(callback)
131    }
132}