orx_concurrent_queue/queue.rs
1use crate::{
2 atomic_utils::{comp_exch, comp_exch_weak},
3 common_traits::iter::{QueueIterOfMut, QueueIterOfRef, QueueIterOwned},
4 write_permit::WritePermit,
5};
6use core::{
7 marker::PhantomData,
8 ops::Range,
9 sync::atomic::{AtomicUsize, Ordering},
10};
11use orx_pinned_vec::{ConcurrentPinnedVec, IntoConcurrentPinnedVec};
12use orx_split_vec::{Doubling, SplitVec, prelude::PseudoDefault};
13
14type DefaultPinnedVec<T> = SplitVec<T, Doubling>;
15pub type DefaultConVec<T> = <DefaultPinnedVec<T> as IntoConcurrentPinnedVec<T>>::ConPinnedVec;
16
17impl<T> Default for ConcurrentQueue<T, DefaultConVec<T>>
18where
19 T: Send,
20{
21 fn default() -> Self {
22 Self::new()
23 }
24}
25
26impl<T> ConcurrentQueue<T, DefaultConVec<T>>
27where
28 T: Send,
29{
30 /// Creates a new empty concurrent queue.
31 ///
32 /// This queue is backed with default concurrent pinned vec, which is the concurrent version of [`SplitVec`] with [`Doubling`] growth.
33 ///
34 /// In order to create a concurrent queue backed with a particular [`PinnedVec`], you may use the `From` trait.
35 ///
36 /// # Examples
37 ///
38 /// ```
39 /// use orx_concurrent_queue::ConcurrentQueue;
40 /// use orx_split_vec::{SplitVec, ConcurrentSplitVec, Doubling, Linear};
41 /// use orx_fixed_vec::{FixedVec, ConcurrentFixedVec};
42 ///
43 /// let bag: ConcurrentQueue<usize> = ConcurrentQueue::new();
44 /// // equivalent to:
45 /// let bag: ConcurrentQueue<usize> = SplitVec::new().into();
46 /// // equivalent to:
47 /// let bag: ConcurrentQueue<usize, ConcurrentSplitVec<_, Doubling>> = SplitVec::with_doubling_growth_and_max_concurrent_capacity().into();
48 ///
49 /// // in order to create a queue from a different pinned vec, use into, rather than new:
50 /// let bag: ConcurrentQueue<usize, _> = SplitVec::with_linear_growth_and_fragments_capacity(10, 64).into();
51 /// let bag: ConcurrentQueue<usize, ConcurrentSplitVec<_, Linear>> = SplitVec::with_linear_growth_and_fragments_capacity(10, 64).into();
52 ///
53 /// let bag: ConcurrentQueue<usize, _> = FixedVec::new(1000).into();
54 /// let bag: ConcurrentQueue<usize, ConcurrentFixedVec<usize>> = FixedVec::new(1000).into();
55 /// ```
56 ///
57 /// [`SplitVec`]: orx_split_vec::SplitVec
58 /// [`Doubling`]: orx_split_vec::Doubling
59 /// [`PinnedVec`]: orx_pinned_vec::PinnedVec
60 pub fn new() -> Self {
61 SplitVec::with_doubling_growth_and_max_concurrent_capacity().into()
62 }
63}
64
65/// A high performance and convenient thread safe queue that can concurrently
66/// grow and shrink with [`push`], [`extend`], [`pop`] and [`pull`] capabilities.
67///
68/// [`push`]: crate::ConcurrentQueue::push
69/// [`extend`]: crate::ConcurrentQueue::extend
70/// [`pop`]: crate::ConcurrentQueue::pop
71/// [`pull`]: crate::ConcurrentQueue::pull
72///
73/// # Examples
74///
75/// The following example demonstrates a basic usage of the queue within a synchronous program.
76/// Note that push, extend, pop and pull methods can be called with a shared reference `&self`.
77/// This allows to use the queue conveniently in a concurrent program.
78///
79/// ```
80/// use orx_concurrent_queue::ConcurrentQueue;
81///
82/// let queue = ConcurrentQueue::new();
83///
84/// queue.push(0); // [0]
85/// queue.push(1); // [0, 1]
86///
87/// let x = queue.pop(); // [1]
88/// assert_eq!(x, Some(0));
89///
90/// queue.extend(2..7); // [1, 2, 3, 4, 5, 6]
91///
92/// let x: Vec<_> = queue.pull(4).unwrap().collect(); // [5, 6]
93/// assert_eq!(x, vec![1, 2, 3, 4]);
94///
95/// assert_eq!(queue.len(), 2);
96///
97/// let vec = queue.into_inner();
98/// assert_eq!(vec, vec![5, 6]);
99/// ```
100/// The following example demonstrates the main purpose of the concurrent queue:
101/// to simultaneously push to and pop from the queue.
102/// This enables a parallel program where tasks can be handled by multiple threads,
103/// while at the same time, new tasks can be created and dynamically added to the queue.
104///
105/// In the following example, the queue is created with three pre-populated tasks.
106/// Every task might potentially lead to new tasks.
107/// These new tasks are also added to the back of the queue,
108/// to be popped later and potentially add new tasks to the queue.
109///
110/// ```
111/// use orx_concurrent_queue::ConcurrentQueue;
112/// use std::sync::atomic::{AtomicUsize, Ordering};
113///
114/// struct Task {
115/// micros: usize,
116/// }
117///
118/// impl Task {
119/// fn perform(&self) {
120/// use std::{thread::sleep, time::Duration};
121/// sleep(Duration::from_micros(self.micros as u64));
122/// }
123///
124/// fn child_tasks(&self) -> impl ExactSizeIterator<Item = Task> {
125/// let range = match self.micros < 5 {
126/// true => 0..0,
127/// false => 0..self.micros,
128/// };
129///
130/// range.rev().take(5).map(|micros| Self { micros })
131/// }
132/// }
133///
134/// let queue = ConcurrentQueue::new();
135/// for micros in [10, 15, 10] {
136/// queue.push(Task { micros });
137/// }
138///
139/// let num_performed_tasks = AtomicUsize::new(queue.len());
140///
141/// let num_threads = 8;
142/// std::thread::scope(|s| {
143/// for _ in 0..num_threads {
144/// s.spawn(|| {
145/// // keep popping a task from front of the queue
146/// // as long as the queue is not empty
147/// while let Some(task) = queue.pop() {
148/// // create children tasks, add to back
149/// queue.extend(task.child_tasks());
150///
151/// // perform the popped task
152/// task.perform();
153///
154/// _ = num_performed_tasks.fetch_add(1, Ordering::Relaxed);
155/// }
156/// });
157/// }
158/// });
159///
160/// assert_eq!(num_performed_tasks.load(Ordering::Relaxed), 5046);
161/// ```
162pub struct ConcurrentQueue<T, P = DefaultConVec<T>>
163where
164 T: Send,
165 P: ConcurrentPinnedVec<T>,
166{
167 vec: P,
168 phantom: PhantomData<T>,
169 written: AtomicUsize,
170 write_reserved: AtomicUsize,
171 popped: AtomicUsize,
172}
173
174unsafe impl<T, P> Sync for ConcurrentQueue<T, P>
175where
176 T: Send,
177 P: ConcurrentPinnedVec<T>,
178{
179}
180
181impl<T, P> Drop for ConcurrentQueue<T, P>
182where
183 T: Send,
184 P: ConcurrentPinnedVec<T>,
185{
186 fn drop(&mut self) {
187 if core::mem::needs_drop::<T>() {
188 let popped = self.popped.load(Ordering::Relaxed);
189 let reserved = self.write_reserved.load(Ordering::Relaxed);
190 let written = self.written.load(Ordering::Relaxed);
191 assert_eq!(reserved, written);
192 for i in popped..written {
193 let ptr = unsafe { self.ptr(i) };
194 unsafe { ptr.drop_in_place() };
195 }
196 }
197 unsafe { self.vec.set_pinned_vec_len(0) };
198 }
199}
200
201impl<T, P> From<P> for ConcurrentQueue<T, P::ConPinnedVec>
202where
203 T: Send,
204 P: IntoConcurrentPinnedVec<T>,
205{
206 fn from(vec: P) -> Self {
207 Self {
208 phantom: PhantomData,
209 written: vec.len().into(),
210 write_reserved: vec.len().into(),
211 popped: 0.into(),
212 vec: vec.into_concurrent(),
213 }
214 }
215}
216
217impl<T, P> ConcurrentQueue<T, P>
218where
219 T: Send,
220 P: ConcurrentPinnedVec<T>,
221{
222 /// Converts the bag into the underlying pinned vector.
223 ///
224 /// Whenever the second generic parameter is omitted, the underlying pinned vector is [`SplitVec`] with [`Doubling`] growth.
225 ///
226 /// [`SplitVec`]: orx_split_vec::SplitVec
227 /// [`Doubling`]: orx_split_vec::Doubling
228 ///
229 /// # Examples
230 ///
231 /// ```
232 /// use orx_concurrent_queue::ConcurrentQueue;
233 /// use orx_split_vec::SplitVec;
234 ///
235 /// let queue = ConcurrentQueue::new();
236 ///
237 /// queue.push(0); // [0]
238 /// queue.push(1); // [0, 1]
239 /// _ = queue.pop(); // [1]
240 /// queue.extend(2..7); // [1, 2, 3, 4, 5, 6]
241 /// _ = queue.pull(4).unwrap(); // [5, 6]
242 ///
243 /// let vec: SplitVec<i32> = queue.into_inner();
244 /// assert_eq!(vec, vec![5, 6]);
245 ///
246 /// let vec: Vec<i32> = vec.to_vec();
247 /// assert_eq!(vec, vec![5, 6]);
248 /// ```
249 pub fn into_inner(mut self) -> <P as ConcurrentPinnedVec<T>>::P
250 where
251 <P as ConcurrentPinnedVec<T>>::P:
252 PseudoDefault + IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
253 {
254 let vec: <P as ConcurrentPinnedVec<T>>::P = PseudoDefault::pseudo_default();
255 let mut vec = vec.into_concurrent();
256 core::mem::swap(&mut self.vec, &mut vec);
257
258 let a = self.popped.load(Ordering::Relaxed);
259 let b = self.written.load(Ordering::Relaxed);
260 let len = b - a;
261 if a > 0 {
262 let src = unsafe { vec.ptr_iter_unchecked(a..b) };
263 let dst = unsafe { vec.ptr_iter_unchecked(0..len) };
264 for (s, d) in src.zip(dst) {
265 unsafe { d.write(s.read()) };
266 }
267 }
268
269 for x in [&self.written, &self.write_reserved, &self.popped] {
270 x.store(0, Ordering::Relaxed);
271 }
272
273 unsafe { vec.into_inner(len) }
274 }
275
276 // shrink
277
278 /// Pops and returns the element in the front of the queue; returns None if the queue is empty.
279 ///
280 /// # Examples
281 ///
282 /// ```
283 /// use orx_concurrent_queue::*;
284 ///
285 /// let queue = ConcurrentQueue::new();
286 ///
287 /// queue.extend(1..4);
288 /// assert_eq!(queue.pop(), Some(1));
289 /// assert_eq!(queue.pop(), Some(2));
290 /// assert_eq!(queue.pop(), Some(3));
291 /// assert_eq!(queue.pop(), None);
292 /// ```
293 pub fn pop(&self) -> Option<T> {
294 let idx = self.popped.fetch_add(1, Ordering::Relaxed);
295
296 loop {
297 let written = self.written.load(Ordering::Acquire);
298 match idx < written {
299 true => return Some(unsafe { self.ptr(idx).read() }),
300 false => {
301 if comp_exch(&self.popped, idx + 1, idx).is_ok() {
302 return None;
303 }
304 }
305 }
306 }
307 }
308
309 /// Pulls `chunk_size` elements from the front of the queue:
310 ///
311 /// * returns None if `chunk_size` is zero,
312 /// * returns Some of an ExactSizeIterator with `len = chunk_size` if the queue has at least `chunk_size` items,
313 /// * returns Some of a non-empty ExactSizeIterator with `len` such that `0 < len < chunk_size` if the queue
314 /// has `len` elements,
315 /// * returns None if the queue is empty.
316 ///
317 /// Therefore, if the method returns a Some variant, the exact size iterator is not empty.
318 ///
319 /// Pulled elements are guaranteed to be consecutive elements in the queue.
320 ///
321 /// In order to reduce the number of concurrent state updates, `pull` with a large enough chunk size might be preferred over `pop` whenever possible.
322 ///
323 /// # Examples
324 ///
325 /// ```
326 /// use orx_concurrent_queue::*;
327 ///
328 /// let queue = ConcurrentQueue::new();
329 ///
330 /// queue.extend(1..6);
331 /// assert_eq!(
332 /// queue.pull(2).map(|x| x.collect::<Vec<_>>()),
333 /// Some(vec![1, 2])
334 /// );
335 /// assert_eq!(
336 /// queue.pull(7).map(|x| x.collect::<Vec<_>>()),
337 /// Some(vec![3, 4, 5])
338 /// );
339 /// assert_eq!(queue.pull(1).map(|x| x.collect::<Vec<_>>()), None);
340 /// ```
341 pub fn pull(&self, chunk_size: usize) -> Option<QueueIterOwned<'_, T, P>> {
342 match chunk_size > 0 {
343 true => {
344 let begin_idx = self.popped.fetch_add(chunk_size, Ordering::Relaxed);
345 let end_idx = begin_idx + chunk_size;
346
347 loop {
348 let written = self.written.load(Ordering::Acquire);
349
350 let has_none = begin_idx >= written;
351 let has_some = !has_none;
352 let has_all = end_idx <= written;
353
354 let range = match (has_some, has_all) {
355 (false, _) => match comp_exch(&self.popped, end_idx, begin_idx).is_ok() {
356 true => return None,
357 false => None,
358 },
359 (true, true) => Some(begin_idx..end_idx),
360 (true, false) => Some(begin_idx..written),
361 };
362
363 if let Some(range) = range {
364 let ok = match has_all {
365 true => true,
366 false => comp_exch(&self.popped, end_idx, range.end).is_ok(),
367 };
368
369 if ok {
370 let iter = unsafe { self.vec.ptr_iter_unchecked(range) };
371 return Some(QueueIterOwned::new(iter));
372 }
373 }
374 }
375 }
376 false => None,
377 }
378 }
379
380 // grow
381
382 /// Pushes the `value` to the back of the queue.
383 ///
384 /// # Examples
385 ///
386 /// ```
387 /// use orx_concurrent_queue::*;
388 ///
389 /// let queue = ConcurrentQueue::new();
390 ///
391 /// queue.push(1);
392 /// queue.push(2);
393 /// queue.push(3);
394 /// assert_eq!(queue.into_inner(), vec![1, 2, 3]);
395 /// ```
396 pub fn push(&self, value: T) {
397 let idx = self.write_reserved.fetch_add(1, Ordering::Relaxed);
398 self.assert_has_capacity_for(idx);
399
400 loop {
401 match WritePermit::for_one(self.vec.capacity(), idx) {
402 WritePermit::JustWrite => {
403 unsafe { self.ptr(idx).write(value) };
404 break;
405 }
406 WritePermit::GrowThenWrite => {
407 self.grow_to(idx + 1);
408 unsafe { self.ptr(idx).write(value) };
409 break;
410 }
411 WritePermit::Spin => {}
412 }
413 }
414
415 let num_written = idx + 1;
416 while comp_exch_weak(&self.written, idx, num_written).is_err() {}
417 }
418
419 /// Extends the queue by pushing `values` elements to the back of the queue.
420 ///
421 /// In order to reduce the number of concurrent state updates, `extend` might be preferred over `push` whenever possible.
422 ///
423 /// # Examples
424 ///
425 /// ```
426 /// use orx_concurrent_queue::ConcurrentQueue;
427 ///
428 /// let queue = ConcurrentQueue::new();
429 ///
430 /// queue.extend(1..3);
431 /// queue.extend(vec![3, 4, 5, 6]);
432 ///
433 /// assert_eq!(queue.into_inner(), vec![1, 2, 3, 4, 5, 6]);
434 /// ```
435 pub fn extend<I, Iter>(&self, values: I)
436 where
437 I: IntoIterator<Item = T, IntoIter = Iter>,
438 Iter: ExactSizeIterator<Item = T>,
439 {
440 let values = values.into_iter();
441 let num_items = values.len();
442
443 if num_items > 0 {
444 let begin_idx = self.write_reserved.fetch_add(num_items, Ordering::Relaxed);
445 let end_idx = begin_idx + num_items;
446 let last_idx = begin_idx + num_items - 1;
447 self.assert_has_capacity_for(last_idx);
448
449 loop {
450 match WritePermit::for_many(self.vec.capacity(), begin_idx, last_idx) {
451 WritePermit::JustWrite => {
452 let iter = unsafe { self.vec.ptr_iter_unchecked(begin_idx..end_idx) };
453 for (p, value) in iter.zip(values) {
454 unsafe { p.write(value) };
455 }
456 break;
457 }
458 WritePermit::GrowThenWrite => {
459 self.grow_to(end_idx);
460 let iter = unsafe { self.vec.ptr_iter_unchecked(begin_idx..end_idx) };
461 for (p, value) in iter.zip(values) {
462 unsafe { p.write(value) };
463 }
464 break;
465 }
466 WritePermit::Spin => {}
467 }
468 }
469
470 while comp_exch_weak(&self.written, begin_idx, end_idx).is_err() {}
471 }
472 }
473
474 // get
475
476 /// Returns the number of elements in the queue.
477 ///
478 /// # Examples
479 ///
480 /// ```
481 /// use orx_concurrent_queue::ConcurrentQueue;
482 ///
483 /// let queue = ConcurrentQueue::new();
484 ///
485 /// queue.push(1);
486 /// queue.push(2);
487 /// assert_eq!(queue.len(), 2);
488 ///
489 /// queue.extend(vec![3, 4, 5, 6]);
490 /// assert_eq!(queue.len(), 6);
491 ///
492 /// _ = queue.pop();
493 /// assert_eq!(queue.len(), 5);
494 ///
495 /// _ = queue.pull(4);
496 /// assert_eq!(queue.len(), 1);
497 /// ```
498 pub fn len(&self) -> usize {
499 self.written.load(Ordering::Relaxed) - self.popped.load(Ordering::Relaxed)
500 }
501
502 /// Returns true if the queue is empty, false otherwise.
503 ///
504 /// # Examples
505 ///
506 /// ```
507 /// use orx_concurrent_queue::ConcurrentQueue;
508 ///
509 /// let queue = ConcurrentQueue::new();
510 ///
511 /// assert!(queue.is_empty());
512 ///
513 /// queue.push(1);
514 /// queue.push(2);
515 /// assert!(!queue.is_empty());
516 ///
517 /// _ = queue.pull(4);
518 /// assert!(queue.is_empty());
519 /// ```
520 pub fn is_empty(&self) -> bool {
521 self.written.load(Ordering::Relaxed) == self.popped.load(Ordering::Relaxed)
522 }
523
524 /// Returns an iterator of references to items in the queue.
525 ///
526 /// # Examples
527 ///
528 /// ```
529 /// use orx_concurrent_queue::ConcurrentQueue;
530 ///
531 /// let mut queue = ConcurrentQueue::new();
532 ///
533 /// queue.push(1);
534 /// queue.push(2);
535 /// queue.push(3);
536 ///
537 /// let sum: i32 = queue.iter().sum();
538 /// assert_eq!(sum, 6);
539 /// ```
540 ///
541 /// # Safety
542 ///
543 /// Notice that this call requires a mutually exclusive `&mut self` reference.
544 /// This is due to the fact that iterators are lazy and they are not necessarily consumed immediately.
545 /// On the other hand, concurrent queue allows for popping elements from the queue with a shared reference.
546 /// This could've led to the following undefined behavior.
547 ///
548 /// To prevent this, `iter` requires a mutually exclusive reference, and hence, the following code does not compile.
549 ///
550 /// ```compile_fail
551 /// use orx_concurrent_queue::ConcurrentQueue;
552 ///
553 /// let queue = ConcurrentQueue::new();
554 ///
555 /// queue.push(1);
556 /// queue.push(2);
557 /// queue.push(3);
558 ///
559 /// let iter = queue.iter(); // iterator over elements 1, 2 and 3
560 ///
561 /// _ = queue.pop(); // 1 is removed
562 ///
563 /// let sum = iter.sum(); // UB
564 /// ```
565 pub fn iter(&mut self) -> impl ExactSizeIterator<Item = &T> {
566 QueueIterOfRef::<T, P>::new(self.ptr_iter())
567 }
568
569 /// Returns an iterator of mutable references to items in the queue.
570 ///
571 /// # Examples
572 ///
573 /// ```
574 /// use orx_concurrent_queue::ConcurrentQueue;
575 ///
576 /// let mut queue = ConcurrentQueue::new();
577 ///
578 /// queue.push(1);
579 /// queue.push(2);
580 /// queue.push(3);
581 ///
582 /// for x in queue.iter_mut() {
583 /// *x += 10;
584 /// }
585 ///
586 /// assert_eq!(queue.into_inner(), vec![11, 12, 13]);
587 /// ```
588 pub fn iter_mut(&mut self) -> impl ExactSizeIterator<Item = &mut T> {
589 QueueIterOfMut::<T, P>::new(self.ptr_iter())
590 }
591
592 // helpers
593
594 #[inline(always)]
595 unsafe fn ptr(&self, idx: usize) -> *mut T {
596 unsafe { self.vec.get_ptr_mut(idx) }
597 }
598
599 #[inline(always)]
600 fn assert_has_capacity_for(&self, idx: usize) {
601 assert!(
602 idx < self.vec.max_capacity(),
603 "Out of capacity. Underlying pinned vector cannot grow any further while being concurrently safe."
604 );
605 }
606
607 fn grow_to(&self, new_capacity: usize) {
608 _ = self
609 .vec
610 .grow_to(new_capacity)
611 .expect("The underlying pinned vector reached its capacity and failed to grow");
612 }
613
614 fn valid_range(&mut self) -> Range<usize> {
615 self.popped.load(Ordering::Relaxed)..self.written.load(Ordering::Relaxed)
616 }
617
618 pub(crate) fn ptr_iter(&mut self) -> P::PtrIter<'_> {
619 let range = self.valid_range();
620 // SAFETY: with a mut ref, we ensure that the range contains all and only valid values
621 unsafe { self.vec.ptr_iter_unchecked(range) }
622 }
623}