par_iter/iter/
take_any.rs1use std::sync::atomic::{AtomicUsize, Ordering};
2
3use super::{plumbing::*, *};
4
5#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
12#[derive(Clone, Debug)]
13pub struct TakeAny<I: ParallelIterator> {
14 base: I,
15 count: usize,
16}
17
18impl<I> TakeAny<I>
19where
20 I: ParallelIterator,
21{
22 pub(super) fn new(base: I, count: usize) -> Self {
24 TakeAny { base, count }
25 }
26}
27
28impl<I> ParallelIterator for TakeAny<I>
29where
30 I: ParallelIterator,
31{
32 type Item = I::Item;
33
34 fn drive_unindexed<C>(self, consumer: C) -> C::Result
35 where
36 C: UnindexedConsumer<Self::Item>,
37 {
38 let consumer1 = TakeAnyConsumer {
39 base: consumer,
40 count: &AtomicUsize::new(self.count),
41 };
42 self.base.drive_unindexed(consumer1)
43 }
44}
45
46struct TakeAnyConsumer<'f, C> {
50 base: C,
51 count: &'f AtomicUsize,
52}
53
54impl<'f, T, C> Consumer<T> for TakeAnyConsumer<'f, C>
55where
56 C: Consumer<T>,
57 T: Send,
58{
59 type Folder = TakeAnyFolder<'f, C::Folder>;
60 type Reducer = C::Reducer;
61 type Result = C::Result;
62
63 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
64 let (left, right, reducer) = self.base.split_at(index);
65 (
66 TakeAnyConsumer { base: left, ..self },
67 TakeAnyConsumer {
68 base: right,
69 ..self
70 },
71 reducer,
72 )
73 }
74
75 fn into_folder(self) -> Self::Folder {
76 TakeAnyFolder {
77 base: self.base.into_folder(),
78 count: self.count,
79 }
80 }
81
82 fn full(&self) -> bool {
83 self.count.load(Ordering::Relaxed) == 0 || self.base.full()
84 }
85}
86
87impl<'f, T, C> UnindexedConsumer<T> for TakeAnyConsumer<'f, C>
88where
89 C: UnindexedConsumer<T>,
90 T: Send,
91{
92 fn split_off_left(&self) -> Self {
93 TakeAnyConsumer {
94 base: self.base.split_off_left(),
95 ..*self
96 }
97 }
98
99 fn to_reducer(&self) -> Self::Reducer {
100 self.base.to_reducer()
101 }
102}
103
104struct TakeAnyFolder<'f, C> {
105 base: C,
106 count: &'f AtomicUsize,
107}
108
109fn checked_decrement(u: &AtomicUsize) -> bool {
110 u.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |u| u.checked_sub(1))
111 .is_ok()
112}
113
114impl<'f, T, C> Folder<T> for TakeAnyFolder<'f, C>
115where
116 C: Folder<T>,
117{
118 type Result = C::Result;
119
120 fn consume(mut self, item: T) -> Self {
121 if checked_decrement(self.count) {
122 self.base = self.base.consume(item);
123 }
124 self
125 }
126
127 fn consume_iter<I>(mut self, iter: I) -> Self
128 where
129 I: IntoIterator<Item = T>,
130 {
131 self.base = self.base.consume_iter(
132 iter.into_iter()
133 .take_while(move |_| checked_decrement(self.count)),
134 );
135 self
136 }
137
138 fn complete(self) -> C::Result {
139 self.base.complete()
140 }
141
142 fn full(&self) -> bool {
143 self.count.load(Ordering::Relaxed) == 0 || self.base.full()
144 }
145}