1use std::sync::atomic::{AtomicBool, Ordering};
9
10use crate::iter::{plumbing::*, *};
11
12#[derive(Debug, Clone)]
23pub struct IntoIter<T: Send> {
24 opt: Option<T>,
25}
26
27impl<T: Send> IntoParallelIterator for Option<T> {
28 type Item = T;
29 type Iter = IntoIter<T>;
30
31 fn into_par_iter(self) -> Self::Iter {
32 IntoIter { opt: self }
33 }
34}
35
36impl<T: Send> ParallelIterator for IntoIter<T> {
37 type Item = T;
38
39 fn drive_unindexed<C>(self, consumer: C) -> C::Result
40 where
41 C: UnindexedConsumer<Self::Item>,
42 {
43 self.drive(consumer)
44 }
45
46 fn opt_len(&self) -> Option<usize> {
47 Some(self.len())
48 }
49}
50
51impl<T: Send> IndexedParallelIterator for IntoIter<T> {
52 fn drive<C>(self, consumer: C) -> C::Result
53 where
54 C: Consumer<Self::Item>,
55 {
56 let mut folder = consumer.into_folder();
57 if let Some(item) = self.opt {
58 folder = folder.consume(item);
59 }
60 folder.complete()
61 }
62
63 fn len(&self) -> usize {
64 match self.opt {
65 Some(_) => 1,
66 None => 0,
67 }
68 }
69
70 fn with_producer<CB>(self, callback: CB) -> CB::Output
71 where
72 CB: ProducerCallback<Self::Item>,
73 {
74 callback.callback(OptionProducer { opt: self.opt })
75 }
76}
77
78#[derive(Debug)]
90pub struct Iter<'a, T: Sync> {
91 inner: IntoIter<&'a T>,
92}
93
94impl<'a, T: Sync> Clone for Iter<'a, T> {
95 fn clone(&self) -> Self {
96 Iter {
97 inner: self.inner.clone(),
98 }
99 }
100}
101
102impl<'a, T: Sync> IntoParallelIterator for &'a Option<T> {
103 type Item = &'a T;
104 type Iter = Iter<'a, T>;
105
106 fn into_par_iter(self) -> Self::Iter {
107 Iter {
108 inner: self.as_ref().into_par_iter(),
109 }
110 }
111}
112
113delegate_indexed_iterator! {
114 Iter<'a, T> => &'a T,
115 impl<'a, T: Sync + 'a>
116}
117
118#[derive(Debug)]
130pub struct IterMut<'a, T: Send> {
131 inner: IntoIter<&'a mut T>,
132}
133
134impl<'a, T: Send> IntoParallelIterator for &'a mut Option<T> {
135 type Item = &'a mut T;
136 type Iter = IterMut<'a, T>;
137
138 fn into_par_iter(self) -> Self::Iter {
139 IterMut {
140 inner: self.as_mut().into_par_iter(),
141 }
142 }
143}
144
145delegate_indexed_iterator! {
146 IterMut<'a, T> => &'a mut T,
147 impl<'a, T: Send + 'a>
148}
149
150struct OptionProducer<T: Send> {
152 opt: Option<T>,
153}
154
155impl<T: Send> Producer for OptionProducer<T> {
156 type IntoIter = std::option::IntoIter<T>;
157 type Item = T;
158
159 fn into_iter(self) -> Self::IntoIter {
160 self.opt.into_iter()
161 }
162
163 fn split_at(self, index: usize) -> (Self, Self) {
164 debug_assert!(index <= 1);
165 let none = OptionProducer { opt: None };
166 if index == 0 {
167 (none, self)
168 } else {
169 (self, none)
170 }
171 }
172}
173
174impl<C, T> FromParallelIterator<Option<T>> for Option<C>
179where
180 C: FromParallelIterator<T>,
181 T: Send,
182{
183 fn from_par_iter<I>(par_iter: I) -> Self
184 where
185 I: IntoParallelIterator<Item = Option<T>>,
186 {
187 fn check<T>(found_none: &AtomicBool) -> impl Fn(&Option<T>) + '_ {
188 move |item| {
189 if item.is_none() {
190 found_none.store(true, Ordering::Relaxed);
191 }
192 }
193 }
194
195 let found_none = AtomicBool::new(false);
196 let collection = par_iter
197 .into_par_iter()
198 .inspect(check(&found_none))
199 .while_some()
200 .collect();
201
202 if found_none.load(Ordering::Relaxed) {
203 None
204 } else {
205 Some(collection)
206 }
207 }
208}