par_iter/iter/
take_any.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2
3use super::{plumbing::*, *};
4
5/// `TakeAny` is an iterator that iterates over `n` elements from anywhere in
6/// `I`. This struct is created by the [`take_any()`] method on
7/// [`ParallelIterator`]
8///
9/// [`take_any()`]: trait.ParallelIterator.html#method.take_any
10/// [`ParallelIterator`]: trait.ParallelIterator.html
11#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
12#[derive(Clone, Debug)]
13pub struct TakeAny<I: ParallelIterator> {
14    base: I,
15    count: usize,
16}
17
18impl<I> TakeAny<I>
19where
20    I: ParallelIterator,
21{
22    /// Creates a new `TakeAny` iterator.
23    pub(super) fn new(base: I, count: usize) -> Self {
24        TakeAny { base, count }
25    }
26}
27
28impl<I> ParallelIterator for TakeAny<I>
29where
30    I: ParallelIterator,
31{
32    type Item = I::Item;
33
34    fn drive_unindexed<C>(self, consumer: C) -> C::Result
35    where
36        C: UnindexedConsumer<Self::Item>,
37    {
38        let consumer1 = TakeAnyConsumer {
39            base: consumer,
40            count: &AtomicUsize::new(self.count),
41        };
42        self.base.drive_unindexed(consumer1)
43    }
44}
45
46/// ////////////////////////////////////////////////////////////////////////
47/// Consumer implementation
48
49struct TakeAnyConsumer<'f, C> {
50    base: C,
51    count: &'f AtomicUsize,
52}
53
54impl<'f, T, C> Consumer<T> for TakeAnyConsumer<'f, C>
55where
56    C: Consumer<T>,
57    T: Send,
58{
59    type Folder = TakeAnyFolder<'f, C::Folder>;
60    type Reducer = C::Reducer;
61    type Result = C::Result;
62
63    fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
64        let (left, right, reducer) = self.base.split_at(index);
65        (
66            TakeAnyConsumer { base: left, ..self },
67            TakeAnyConsumer {
68                base: right,
69                ..self
70            },
71            reducer,
72        )
73    }
74
75    fn into_folder(self) -> Self::Folder {
76        TakeAnyFolder {
77            base: self.base.into_folder(),
78            count: self.count,
79        }
80    }
81
82    fn full(&self) -> bool {
83        self.count.load(Ordering::Relaxed) == 0 || self.base.full()
84    }
85}
86
87impl<'f, T, C> UnindexedConsumer<T> for TakeAnyConsumer<'f, C>
88where
89    C: UnindexedConsumer<T>,
90    T: Send,
91{
92    fn split_off_left(&self) -> Self {
93        TakeAnyConsumer {
94            base: self.base.split_off_left(),
95            ..*self
96        }
97    }
98
99    fn to_reducer(&self) -> Self::Reducer {
100        self.base.to_reducer()
101    }
102}
103
104struct TakeAnyFolder<'f, C> {
105    base: C,
106    count: &'f AtomicUsize,
107}
108
109fn checked_decrement(u: &AtomicUsize) -> bool {
110    u.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |u| u.checked_sub(1))
111        .is_ok()
112}
113
114impl<'f, T, C> Folder<T> for TakeAnyFolder<'f, C>
115where
116    C: Folder<T>,
117{
118    type Result = C::Result;
119
120    fn consume(mut self, item: T) -> Self {
121        if checked_decrement(self.count) {
122            self.base = self.base.consume(item);
123        }
124        self
125    }
126
127    fn consume_iter<I>(mut self, iter: I) -> Self
128    where
129        I: IntoIterator<Item = T>,
130    {
131        self.base = self.base.consume_iter(
132            iter.into_iter()
133                .take_while(move |_| checked_decrement(self.count)),
134        );
135        self
136    }
137
138    fn complete(self) -> C::Result {
139        self.base.complete()
140    }
141
142    fn full(&self) -> bool {
143        self.count.load(Ordering::Relaxed) == 0 || self.base.full()
144    }
145}