1use super::{plumbing::*, *};
2
3struct BlocksCallback<S, C> {
4 sizes: S,
5 consumer: C,
6 len: usize,
7}
8
9impl<T, S, C> ProducerCallback<T> for BlocksCallback<S, C>
10where
11 C: UnindexedConsumer<T>,
12 S: Iterator<Item = usize>,
13{
14 type Output = C::Result;
15
16 fn callback<P: Producer<Item = T>>(mut self, mut producer: P) -> Self::Output {
17 let mut remaining_len = self.len;
18 let mut consumer = self.consumer;
19
20 let (left_consumer, right_consumer, _) = consumer.split_at(0);
23 let mut leftmost_res = left_consumer.into_folder().complete();
24 consumer = right_consumer;
25
26 while remaining_len > 0 && !consumer.full() {
28 let size = self.sizes.next().unwrap_or(usize::MAX);
30 let capped_size = remaining_len.min(size);
31 remaining_len -= capped_size;
32
33 let (left_producer, right_producer) = producer.split_at(capped_size);
35 producer = right_producer;
36
37 let (left_consumer, right_consumer, _) = consumer.split_at(capped_size);
39 consumer = right_consumer;
40
41 leftmost_res = consumer.to_reducer().reduce(
42 leftmost_res,
43 bridge_producer_consumer(capped_size, left_producer, left_consumer),
44 );
45 }
46 leftmost_res
47 }
48}
49
50#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
59#[derive(Debug, Clone)]
60pub struct ExponentialBlocks<I> {
61 base: I,
62}
63
64impl<I> ExponentialBlocks<I> {
65 pub(super) fn new(base: I) -> Self {
66 Self { base }
67 }
68}
69
70impl<I> ParallelIterator for ExponentialBlocks<I>
71where
72 I: IndexedParallelIterator,
73{
74 type Item = I::Item;
75
76 fn drive_unindexed<C>(self, consumer: C) -> C::Result
77 where
78 C: UnindexedConsumer<Self::Item>,
79 {
80 let first = crate::current_num_threads();
81 let callback = BlocksCallback {
82 consumer,
83 sizes: std::iter::successors(Some(first), exponential_size),
84 len: self.base.len(),
85 };
86 self.base.with_producer(callback)
87 }
88}
89
90fn exponential_size(size: &usize) -> Option<usize> {
91 Some(size.saturating_mul(2))
92}
93
94#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
103#[derive(Debug, Clone)]
104pub struct UniformBlocks<I> {
105 base: I,
106 block_size: usize,
107}
108
109impl<I> UniformBlocks<I> {
110 pub(super) fn new(base: I, block_size: usize) -> Self {
111 Self { base, block_size }
112 }
113}
114
115impl<I> ParallelIterator for UniformBlocks<I>
116where
117 I: IndexedParallelIterator,
118{
119 type Item = I::Item;
120
121 fn drive_unindexed<C>(self, consumer: C) -> C::Result
122 where
123 C: UnindexedConsumer<Self::Item>,
124 {
125 let callback = BlocksCallback {
126 consumer,
127 sizes: std::iter::repeat(self.block_size),
128 len: self.base.len(),
129 };
130 self.base.with_producer(callback)
131 }
132}