1#![deny(clippy::all)]
61#![deny(missing_docs)]
62
63#[allow(unused)]
64macro_rules! doc_comment {
65 ($x:expr) => {
66 #[doc = $x]
67 #[doc(hidden)]
68 mod readme_tests {}
69 };
70}
71
72doc_comment!(include_str!("../README.md"));
73
74use rayon::iter::plumbing::{
75 bridge_unindexed, Consumer, Folder, UnindexedConsumer, UnindexedProducer,
76};
77use std::collections::VecDeque;
78use std::marker::PhantomData;
79use std::sync::{Arc, RwLock};
80
81#[cfg(test)]
82mod tests;
83
84pub trait IntoDynQueue<T, U: Queue<T>> {
86 fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, U>;
88}
89
90#[allow(clippy::len_without_is_empty)]
92pub trait Queue<T>
93where
94 Self: Sized,
95{
96 fn push(&self, v: T);
98
99 fn pop(&self) -> Option<T>;
101
102 fn len(&self) -> usize;
104
105 fn split_off(&self, size: usize) -> Self;
107}
108
109impl<T> IntoDynQueue<T, RwLock<Vec<T>>> for Vec<T> {
110 #[inline(always)]
111 fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock<Vec<T>>> {
112 DynQueue(Arc::new(DynQueueInner(RwLock::new(self), PhantomData)))
113 }
114}
115
116impl<T> IntoDynQueue<T, RwLock<Vec<T>>> for RwLock<Vec<T>> {
117 #[inline(always)]
118 fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock<Vec<T>>> {
119 DynQueue(Arc::new(DynQueueInner(self, PhantomData)))
120 }
121}
122
123impl<T> Queue<T> for RwLock<Vec<T>> {
124 #[inline(always)]
125 fn push(&self, v: T) {
126 self.write().unwrap().push(v)
127 }
128
129 #[inline(always)]
130 fn pop(&self) -> Option<T> {
131 self.write().unwrap().pop()
132 }
133
134 #[inline(always)]
135 fn len(&self) -> usize {
136 self.read().unwrap().len()
137 }
138
139 #[inline(always)]
140 fn split_off(&self, size: usize) -> Self {
141 RwLock::new(self.write().unwrap().split_off(size))
142 }
143}
144
145impl<T> IntoDynQueue<T, RwLock<VecDeque<T>>> for VecDeque<T> {
146 #[inline(always)]
147 fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock<VecDeque<T>>> {
148 DynQueue(Arc::new(DynQueueInner(RwLock::new(self), PhantomData)))
149 }
150}
151
152impl<T> IntoDynQueue<T, RwLock<VecDeque<T>>> for RwLock<VecDeque<T>> {
153 #[inline(always)]
154 fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock<VecDeque<T>>> {
155 DynQueue(Arc::new(DynQueueInner(self, PhantomData)))
156 }
157}
158
159impl<T> Queue<T> for RwLock<VecDeque<T>> {
160 #[inline(always)]
161 fn push(&self, v: T) {
162 self.write().unwrap().push_back(v)
163 }
164
165 #[inline(always)]
166 fn pop(&self) -> Option<T> {
167 self.write().unwrap().pop_front()
168 }
169
170 #[inline(always)]
171 fn len(&self) -> usize {
172 self.read().unwrap().len()
173 }
174
175 #[inline(always)]
176 fn split_off(&self, size: usize) -> Self {
177 RwLock::new(self.write().unwrap().split_off(size))
178 }
179}
180
181#[cfg(feature = "crossbeam-queue")]
182use crossbeam_queue::SegQueue;
183
184#[cfg(feature = "crossbeam-queue")]
185impl<T> IntoDynQueue<T, SegQueue<T>> for SegQueue<T> {
186 #[inline(always)]
187 fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, Self> {
188 DynQueue(Arc::new(DynQueueInner(self, PhantomData)))
189 }
190}
191
192#[cfg(feature = "crossbeam-queue")]
193impl<T> Queue<T> for SegQueue<T> {
194 #[inline(always)]
195 fn push(&self, v: T) {
196 SegQueue::push(self, v);
197 }
198
199 #[inline(always)]
200 fn pop(&self) -> Option<T> {
201 SegQueue::pop(self)
202 }
203
204 #[inline(always)]
205 fn len(&self) -> usize {
206 SegQueue::len(self)
207 }
208
209 #[inline(always)]
210 fn split_off(&self, size: usize) -> Self {
211 let q = SegQueue::new();
212 (0..size)
213 .filter_map(|_| Queue::pop(self))
214 .for_each(|ele| q.push(ele));
215 q
216 }
217}
218
219struct DynQueueInner<'a, T, U: Queue<T>>(U, PhantomData<&'a T>);
222
223pub struct DynQueueHandle<'a, T, U: Queue<T>>(Arc<DynQueueInner<'a, T, U>>);
225
226impl<'a, T, U: Queue<T>> DynQueueHandle<'a, T, U> {
227 #[inline]
229 pub fn enqueue(&self, job: T) {
230 (self.0).0.push(job)
231 }
232}
233
234pub struct DynQueue<'a, T, U: Queue<T>>(Arc<DynQueueInner<'a, T, U>>);
236
237impl<'a, T, U> UnindexedProducer for DynQueue<'a, T, U>
238where
239 T: Send + Sync,
240 U: IntoDynQueue<T, U> + Queue<T> + Send + Sync,
241{
242 type Item = (DynQueueHandle<'a, T, U>, T);
243
244 fn split(self) -> (Self, Option<Self>) {
245 let len = (self.0).0.len();
246
247 if len >= 2 {
248 let new_q = (self.0).0.split_off(len / 2);
249 (self, Some(new_q.into_dyn_queue()))
250 } else {
251 (self, None)
252 }
253 }
254
255 fn fold_with<F>(self, folder: F) -> F
256 where
257 F: Folder<Self::Item>,
258 {
259 let mut folder = folder;
260 loop {
261 let ret = (self.0).0.pop();
262
263 if let Some(v) = ret {
264 folder = folder.consume((DynQueueHandle(self.0.clone()), v));
265
266 if folder.full() {
267 break;
268 }
269 } else {
270 assert_eq!(Arc::strong_count(&self.0), 1, "Stale Handle");
272 break;
273 }
274 }
275 folder
276 }
277}
278
279impl<'a, T, U> rayon::iter::ParallelIterator for DynQueue<'a, T, U>
280where
281 T: Send + Sync,
282 U: IntoDynQueue<T, U> + Queue<T> + Send + Sync,
283{
284 type Item = (DynQueueHandle<'a, T, U>, T);
285
286 fn drive_unindexed<C>(self, consumer: C) -> <C as Consumer<Self::Item>>::Result
287 where
288 C: UnindexedConsumer<Self::Item>,
289 {
290 bridge_unindexed(self, consumer)
291 }
292}