dynqueue/
lib.rs

1//! DynQueue - dynamically extendable Rayon parallel iterator
2//!
3//! A `DynQueue<T>` can be iterated with `into_par_iter` producing `(DynQueueHandle, T)` elements.
4//! With the `DynQueueHandle<T>` a new `T` can be inserted in the `DynQueue<T>`,
5//! which is currently iterated over.
6//!
7//! # Example
8//!
9//! ```
10//! use rayon::iter::IntoParallelIterator as _;
11//! use rayon::iter::ParallelIterator as _;
12//!
13//! use dynqueue::IntoDynQueue as _;
14//!
15//! let mut result = vec![1, 2, 3]
16//!     .into_dyn_queue()
17//!     .into_par_iter()
18//!     .map(|(handle, value)| {
19//!         if value == 2 {
20//!             handle.enqueue(4)
21//!         };
22//!         value
23//!     })
24//!     .collect::<Vec<_>>();
25//! result.sort();
26//!
27//! assert_eq!(result, vec![1, 2, 3, 4]);
28//! ```
29//!
30//! # Panics
31//!
32//! The `DynQueueHandle` shall not outlive the `DynQueue` iterator
33//!
34//! ```should_panic
35//! use dynqueue::{DynQueue, DynQueueHandle, IntoDynQueue};
36//!
37//! use rayon::iter::IntoParallelIterator as _;
38//! use rayon::iter::ParallelIterator as _;
39//! use std::sync::RwLock;
40//!
41//! static mut STALE_HANDLE: Option<DynQueueHandle<u8, RwLock<Vec<u8>>>> = None;
42//!
43//! pub fn test_func() -> Vec<u8> {
44//!     vec![1u8, 2u8, 3u8]
45//!         .into_dyn_queue()
46//!         .into_par_iter()
47//!         .map(|(handle, value)| unsafe {
48//!             STALE_HANDLE.replace(handle);
49//!             value
50//!         })
51//!         .collect::<Vec<_>>()
52//! }
53//! // test_func() panics
54//! let result = test_func();
55//! unsafe {
56//!     STALE_HANDLE.as_ref().unwrap().enqueue(4);
57//! }
58//! ```
59
60#![deny(clippy::all)]
61#![deny(missing_docs)]
62
63#[allow(unused)]
64macro_rules! doc_comment {
65    ($x:expr) => {
66        #[doc = $x]
67        #[doc(hidden)]
68        mod readme_tests {}
69    };
70}
71
72doc_comment!(include_str!("../README.md"));
73
74use rayon::iter::plumbing::{
75    bridge_unindexed, Consumer, Folder, UnindexedConsumer, UnindexedProducer,
76};
77use std::collections::VecDeque;
78use std::marker::PhantomData;
79use std::sync::{Arc, RwLock};
80
81#[cfg(test)]
82mod tests;
83
84/// Trait to produce a new DynQueue
85pub trait IntoDynQueue<T, U: Queue<T>> {
86    /// new
87    fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, U>;
88}
89
90/// Everything implementing `Queue` can be handled by DynQueue
91#[allow(clippy::len_without_is_empty)]
92pub trait Queue<T>
93where
94    Self: Sized,
95{
96    /// push an element in the queue
97    fn push(&self, v: T);
98
99    /// pop an element from the queue
100    fn pop(&self) -> Option<T>;
101
102    /// number of elements in the queue
103    fn len(&self) -> usize;
104
105    /// split off `size` elements
106    fn split_off(&self, size: usize) -> Self;
107}
108
109impl<T> IntoDynQueue<T, RwLock<Vec<T>>> for Vec<T> {
110    #[inline(always)]
111    fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock<Vec<T>>> {
112        DynQueue(Arc::new(DynQueueInner(RwLock::new(self), PhantomData)))
113    }
114}
115
116impl<T> IntoDynQueue<T, RwLock<Vec<T>>> for RwLock<Vec<T>> {
117    #[inline(always)]
118    fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock<Vec<T>>> {
119        DynQueue(Arc::new(DynQueueInner(self, PhantomData)))
120    }
121}
122
123impl<T> Queue<T> for RwLock<Vec<T>> {
124    #[inline(always)]
125    fn push(&self, v: T) {
126        self.write().unwrap().push(v)
127    }
128
129    #[inline(always)]
130    fn pop(&self) -> Option<T> {
131        self.write().unwrap().pop()
132    }
133
134    #[inline(always)]
135    fn len(&self) -> usize {
136        self.read().unwrap().len()
137    }
138
139    #[inline(always)]
140    fn split_off(&self, size: usize) -> Self {
141        RwLock::new(self.write().unwrap().split_off(size))
142    }
143}
144
145impl<T> IntoDynQueue<T, RwLock<VecDeque<T>>> for VecDeque<T> {
146    #[inline(always)]
147    fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock<VecDeque<T>>> {
148        DynQueue(Arc::new(DynQueueInner(RwLock::new(self), PhantomData)))
149    }
150}
151
152impl<T> IntoDynQueue<T, RwLock<VecDeque<T>>> for RwLock<VecDeque<T>> {
153    #[inline(always)]
154    fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock<VecDeque<T>>> {
155        DynQueue(Arc::new(DynQueueInner(self, PhantomData)))
156    }
157}
158
159impl<T> Queue<T> for RwLock<VecDeque<T>> {
160    #[inline(always)]
161    fn push(&self, v: T) {
162        self.write().unwrap().push_back(v)
163    }
164
165    #[inline(always)]
166    fn pop(&self) -> Option<T> {
167        self.write().unwrap().pop_front()
168    }
169
170    #[inline(always)]
171    fn len(&self) -> usize {
172        self.read().unwrap().len()
173    }
174
175    #[inline(always)]
176    fn split_off(&self, size: usize) -> Self {
177        RwLock::new(self.write().unwrap().split_off(size))
178    }
179}
180
181#[cfg(feature = "crossbeam-queue")]
182use crossbeam_queue::SegQueue;
183
184#[cfg(feature = "crossbeam-queue")]
185impl<T> IntoDynQueue<T, SegQueue<T>> for SegQueue<T> {
186    #[inline(always)]
187    fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, Self> {
188        DynQueue(Arc::new(DynQueueInner(self, PhantomData)))
189    }
190}
191
192#[cfg(feature = "crossbeam-queue")]
193impl<T> Queue<T> for SegQueue<T> {
194    #[inline(always)]
195    fn push(&self, v: T) {
196        SegQueue::push(self, v);
197    }
198
199    #[inline(always)]
200    fn pop(&self) -> Option<T> {
201        SegQueue::pop(self)
202    }
203
204    #[inline(always)]
205    fn len(&self) -> usize {
206        SegQueue::len(self)
207    }
208
209    #[inline(always)]
210    fn split_off(&self, size: usize) -> Self {
211        let q = SegQueue::new();
212        (0..size)
213            .filter_map(|_| Queue::pop(self))
214            .for_each(|ele| q.push(ele));
215        q
216    }
217}
218
219// PhantomData should prevent `DynQueueInner` to outlive the original `DynQueue`
220// but does not always.
221struct DynQueueInner<'a, T, U: Queue<T>>(U, PhantomData<&'a T>);
222
223/// The `DynQueueHandle` returned by the iterator in addition to `T`
224pub struct DynQueueHandle<'a, T, U: Queue<T>>(Arc<DynQueueInner<'a, T, U>>);
225
226impl<'a, T, U: Queue<T>> DynQueueHandle<'a, T, U> {
227    /// Enqueue `T` in the `DynQueue<T>`, which is currently iterated.
228    #[inline]
229    pub fn enqueue(&self, job: T) {
230        (self.0).0.push(job)
231    }
232}
233
234/// The `DynQueue<T>` which can be parallel iterated over
235pub struct DynQueue<'a, T, U: Queue<T>>(Arc<DynQueueInner<'a, T, U>>);
236
237impl<'a, T, U> UnindexedProducer for DynQueue<'a, T, U>
238where
239    T: Send + Sync,
240    U: IntoDynQueue<T, U> + Queue<T> + Send + Sync,
241{
242    type Item = (DynQueueHandle<'a, T, U>, T);
243
244    fn split(self) -> (Self, Option<Self>) {
245        let len = (self.0).0.len();
246
247        if len >= 2 {
248            let new_q = (self.0).0.split_off(len / 2);
249            (self, Some(new_q.into_dyn_queue()))
250        } else {
251            (self, None)
252        }
253    }
254
255    fn fold_with<F>(self, folder: F) -> F
256    where
257        F: Folder<Self::Item>,
258    {
259        let mut folder = folder;
260        loop {
261            let ret = (self.0).0.pop();
262
263            if let Some(v) = ret {
264                folder = folder.consume((DynQueueHandle(self.0.clone()), v));
265
266                if folder.full() {
267                    break;
268                }
269            } else {
270                // Self shall have the only reference
271                assert_eq!(Arc::strong_count(&self.0), 1, "Stale Handle");
272                break;
273            }
274        }
275        folder
276    }
277}
278
279impl<'a, T, U> rayon::iter::ParallelIterator for DynQueue<'a, T, U>
280where
281    T: Send + Sync,
282    U: IntoDynQueue<T, U> + Queue<T> + Send + Sync,
283{
284    type Item = (DynQueueHandle<'a, T, U>, T);
285
286    fn drive_unindexed<C>(self, consumer: C) -> <C as Consumer<Self::Item>>::Result
287    where
288        C: UnindexedConsumer<Self::Item>,
289    {
290        bridge_unindexed(self, consumer)
291    }
292}