par_iter/iter/
while_some.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2
3use super::{plumbing::*, *};
4
5/// `WhileSome` is an iterator that yields the `Some` elements of an iterator,
6/// halting as soon as any `None` is produced.
7///
8/// This struct is created by the [`while_some()`] method on
9/// [`ParallelIterator`]
10///
11/// [`while_some()`]: trait.ParallelIterator.html#method.while_some
12/// [`ParallelIterator`]: trait.ParallelIterator.html
13#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
14#[derive(Debug, Clone)]
15pub struct WhileSome<I: ParallelIterator> {
16    base: I,
17}
18
19impl<I> WhileSome<I>
20where
21    I: ParallelIterator,
22{
23    /// Creates a new `WhileSome` iterator.
24    pub(super) fn new(base: I) -> Self {
25        WhileSome { base }
26    }
27}
28
29impl<I, T> ParallelIterator for WhileSome<I>
30where
31    I: ParallelIterator<Item = Option<T>>,
32    T: Send,
33{
34    type Item = T;
35
36    fn drive_unindexed<C>(self, consumer: C) -> C::Result
37    where
38        C: UnindexedConsumer<Self::Item>,
39    {
40        let full = AtomicBool::new(false);
41        let consumer1 = WhileSomeConsumer {
42            base: consumer,
43            full: &full,
44        };
45        self.base.drive_unindexed(consumer1)
46    }
47}
48
49/// ////////////////////////////////////////////////////////////////////////
50/// Consumer implementation
51
52struct WhileSomeConsumer<'f, C> {
53    base: C,
54    full: &'f AtomicBool,
55}
56
57impl<'f, T, C> Consumer<Option<T>> for WhileSomeConsumer<'f, C>
58where
59    C: Consumer<T>,
60    T: Send,
61{
62    type Folder = WhileSomeFolder<'f, C::Folder>;
63    type Reducer = C::Reducer;
64    type Result = C::Result;
65
66    fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
67        let (left, right, reducer) = self.base.split_at(index);
68        (
69            WhileSomeConsumer { base: left, ..self },
70            WhileSomeConsumer {
71                base: right,
72                ..self
73            },
74            reducer,
75        )
76    }
77
78    fn into_folder(self) -> Self::Folder {
79        WhileSomeFolder {
80            base: self.base.into_folder(),
81            full: self.full,
82        }
83    }
84
85    fn full(&self) -> bool {
86        self.full.load(Ordering::Relaxed) || self.base.full()
87    }
88}
89
90impl<'f, T, C> UnindexedConsumer<Option<T>> for WhileSomeConsumer<'f, C>
91where
92    C: UnindexedConsumer<T>,
93    T: Send,
94{
95    fn split_off_left(&self) -> Self {
96        WhileSomeConsumer {
97            base: self.base.split_off_left(),
98            ..*self
99        }
100    }
101
102    fn to_reducer(&self) -> Self::Reducer {
103        self.base.to_reducer()
104    }
105}
106
107struct WhileSomeFolder<'f, C> {
108    base: C,
109    full: &'f AtomicBool,
110}
111
112impl<'f, T, C> Folder<Option<T>> for WhileSomeFolder<'f, C>
113where
114    C: Folder<T>,
115{
116    type Result = C::Result;
117
118    fn consume(mut self, item: Option<T>) -> Self {
119        match item {
120            Some(item) => self.base = self.base.consume(item),
121            None => self.full.store(true, Ordering::Relaxed),
122        }
123        self
124    }
125
126    fn consume_iter<I>(mut self, iter: I) -> Self
127    where
128        I: IntoIterator<Item = Option<T>>,
129    {
130        fn some<T>(full: &AtomicBool) -> impl Fn(&Option<T>) -> bool + '_ {
131            move |x| match *x {
132                Some(_) => !full.load(Ordering::Relaxed),
133                None => {
134                    full.store(true, Ordering::Relaxed);
135                    false
136                }
137            }
138        }
139
140        self.base = self.base.consume_iter(
141            iter.into_iter()
142                .take_while(some(self.full))
143                .map(Option::unwrap),
144        );
145        self
146    }
147
148    fn complete(self) -> C::Result {
149        self.base.complete()
150    }
151
152    fn full(&self) -> bool {
153        self.full.load(Ordering::Relaxed) || self.base.full()
154    }
155}