orx_concurrent_queue/queue.rs
1use crate::{
2 atomic_utils::{comp_exch, comp_exch_weak},
3 common_traits::iter::{QueueIterOfMut, QueueIterOfRef, QueueIterOwned},
4 write_permit::WritePermit,
5};
6use core::{
7 marker::PhantomData,
8 ops::Range,
9 sync::atomic::{AtomicUsize, Ordering},
10};
11use orx_fixed_vec::{ConcurrentFixedVec, FixedVec};
12use orx_pinned_vec::{ConcurrentPinnedVec, IntoConcurrentPinnedVec};
13use orx_split_vec::{ConcurrentSplitVec, Doubling, Linear, SplitVec, prelude::PseudoDefault};
14
15type DefaultPinnedVec<T> = SplitVec<T, Doubling>;
16
17/// Default concurrent pinned vector used as the underlying storage of the concurrent queue.
18pub type DefaultConPinnedVec<T> = <DefaultPinnedVec<T> as IntoConcurrentPinnedVec<T>>::ConPinnedVec;
19
20impl<T> Default for ConcurrentQueue<T, DefaultConPinnedVec<T>>
21where
22 T: Send,
23{
24 fn default() -> Self {
25 Self::new()
26 }
27}
28
29impl<T> ConcurrentQueue<T, DefaultConPinnedVec<T>>
30where
31 T: Send,
32{
33 /// Creates a new empty concurrent queue.
34 ///
35 /// This queue is backed with default concurrent pinned vec, which is the concurrent version of [`SplitVec`] with [`Doubling`] growth
36 /// (shorthand for [`with_doubling_growth`]).
37 ///
38 /// In order to create a concurrent queue backed with a particular [`PinnedVec`], you may use the `From` trait.
39 ///
40 /// # Examples
41 ///
42 /// ```
43 /// use orx_concurrent_queue::ConcurrentQueue;
44 /// use orx_split_vec::{SplitVec, Doubling, Linear};
45 /// use orx_fixed_vec::FixedVec;
46 ///
47 /// let bag: ConcurrentQueue<usize> = ConcurrentQueue::new();
48 /// // equivalent to:
49 /// let bag: ConcurrentQueue<usize> = ConcurrentQueue::with_doubling_growth();
50 ///
51 /// // in order to create a queue from a different pinned vec, use into, rather than new:
52 /// let bag: ConcurrentQueue<usize, _> = SplitVec::with_linear_growth_and_fragments_capacity(10, 64).into();
53 /// let bag: ConcurrentQueue<usize, _> = FixedVec::new(1000).into();
54 /// ```
55 ///
56 /// [`SplitVec`]: orx_split_vec::SplitVec
57 /// [`Doubling`]: orx_split_vec::Doubling
58 /// [`PinnedVec`]: orx_pinned_vec::PinnedVec
59 /// [`with_doubling_growth`]: ConcurrentQueue::with_doubling_growth
60 pub fn new() -> Self {
61 SplitVec::with_doubling_growth_and_max_concurrent_capacity().into()
62 }
63
64 /// Creates a new empty concurrent queue.
65 ///
66 /// This queue is backed with default concurrent pinned vec, which is the concurrent version of [`SplitVec`] with [`Doubling`] growth.
67 ///
68 /// # Examples
69 ///
70 /// ```
71 /// use orx_concurrent_queue::ConcurrentQueue;
72 /// use orx_split_vec::{SplitVec, ConcurrentSplitVec, Doubling, Linear};
73 /// use orx_fixed_vec::{FixedVec, ConcurrentFixedVec};
74 ///
75 /// let bag: ConcurrentQueue<usize> = ConcurrentQueue::new();
76 /// // equivalent to:
77 /// let bag: ConcurrentQueue<usize> = ConcurrentQueue::with_doubling_growth();
78 /// ```
79 ///
80 /// [`SplitVec`]: orx_split_vec::SplitVec
81 /// [`Doubling`]: orx_split_vec::Doubling
82 /// [`PinnedVec`]: orx_pinned_vec::PinnedVec
83 /// [`with_doubling_growth`]: ConcurrentQueue::with_doubling_growth
84 pub fn with_doubling_growth() -> Self {
85 SplitVec::with_doubling_growth_and_max_concurrent_capacity().into()
86 }
87}
88
89impl<T> ConcurrentQueue<T, ConcurrentFixedVec<T>>
90where
91 T: Send,
92{
93 /// Creates a new empty concurrent queue.
94 ///
95 /// This queue is backed with concurrent concurrent version of [`FixedVec`].
96 ///
97 /// # Panics
98 ///
99 /// This method does not panic; however, the queue created with a fixed capacity vector
100 /// might panic during growth.
101 /// If the total number of elements pushed to this queue exceeds the parameter `fixed_capacity`,
102 /// the vector cannot grow concurrently and panics.
103 /// Please use the other variants to work with a thread safe dynamic capacity.
104 ///
105 /// # Examples
106 ///
107 /// ```
108 /// use orx_concurrent_queue::ConcurrentQueue;
109 /// use orx_fixed_vec::{FixedVec};
110 ///
111 /// let bag: ConcurrentQueue<usize, _> = ConcurrentQueue::with_fixed_capacity(1024);
112 /// // equivalent to:
113 /// let bag: ConcurrentQueue<usize, _> = FixedVec::new(1024).into();
114 /// ```
115 ///
116 /// [`FixedVec`]: orx_fixed_vec::FixedVec
117 pub fn with_fixed_capacity(fixed_capacity: usize) -> Self {
118 FixedVec::new(fixed_capacity).into()
119 }
120}
121
122impl<T> ConcurrentQueue<T, ConcurrentSplitVec<T, Linear>>
123where
124 T: Send,
125{
126 /// Creates a new empty concurrent queue.
127 ///
128 /// This queue is backed with concurrent concurrent version of [`SplitVec`] with [`Linear`] growth.
129 ///
130 /// # Panics
131 ///
132 /// This method does not panic; however, the queue created with a linear growth vector
133 /// might panic during growth.
134 /// Unlike `FixedVec` backed queue created by [`with_fixed_capacity`], this queue does not pre-allocate;
135 /// however, it has an upper bound on how much it can grow.
136 /// This upper bound is determined as follows:
137 ///
138 /// * Each fragment of the split vector will have a capacity of `2 ^ constant_fragment_capacity_exponent`.
139 /// * And the concurrent split vector can have at most `fragments_capacity` capacity.
140 ///
141 /// Therefore, this queue cannot grow beyond `fragments_capacity * 2 ^ constant_fragment_capacity_exponent` elements.
142 ///
143 /// For instance, if the queue is created with
144 /// * `with_linear_growth(10, 64)`, its maximum capacity will be 64x1024 = 65,536,
145 /// * `with_linear_growth(10, 1024)`, its maximum capacity will be 64x1024 = 1,048,576.
146 ///
147 /// If the total number of elements pushed to this queue exceeds this upper bound,
148 /// the vector cannot grow concurrently and panics.
149 ///
150 /// [`with_fixed_capacity`]: ConcurrentQueue::with_fixed_capacity
151 ///
152 /// # Examples
153 ///
154 /// ```
155 /// use orx_concurrent_queue::ConcurrentQueue;
156 /// use orx_split_vec::{SplitVec};
157 ///
158 /// let bag: ConcurrentQueue<usize, _> = ConcurrentQueue::with_linear_growth(10, 64);
159 /// // equivalent to:
160 /// let bag: ConcurrentQueue<usize, _> = SplitVec::with_linear_growth_and_fragments_capacity(10, 64).into();
161 /// ```
162 pub fn with_linear_growth(
163 constant_fragment_capacity_exponent: usize,
164 fragments_capacity: usize,
165 ) -> Self {
166 SplitVec::with_linear_growth_and_fragments_capacity(
167 constant_fragment_capacity_exponent,
168 fragments_capacity,
169 )
170 .into()
171 }
172}
173
174/// A high performance and convenient thread safe queue that can concurrently
175/// grow and shrink with [`push`], [`extend`], [`pop`] and [`pull`] capabilities.
176///
177/// [`push`]: crate::ConcurrentQueue::push
178/// [`extend`]: crate::ConcurrentQueue::extend
179/// [`pop`]: crate::ConcurrentQueue::pop
180/// [`pull`]: crate::ConcurrentQueue::pull
181///
182/// # Examples
183///
184/// The following example demonstrates a basic usage of the queue within a synchronous program.
185/// Note that push, extend, pop and pull methods can be called with a shared reference `&self`.
186/// This allows to use the queue conveniently in a concurrent program.
187///
188/// ```
189/// use orx_concurrent_queue::ConcurrentQueue;
190///
191/// let queue = ConcurrentQueue::new();
192///
193/// queue.push(0); // [0]
194/// queue.push(1); // [0, 1]
195///
196/// let x = queue.pop(); // [1]
197/// assert_eq!(x, Some(0));
198///
199/// queue.extend(2..7); // [1, 2, 3, 4, 5, 6]
200///
201/// let x: Vec<_> = queue.pull(4).unwrap().collect(); // [5, 6]
202/// assert_eq!(x, vec![1, 2, 3, 4]);
203///
204/// assert_eq!(queue.len(), 2);
205///
206/// let vec = queue.into_inner();
207/// assert_eq!(vec, vec![5, 6]);
208/// ```
209/// The following example demonstrates the main purpose of the concurrent queue:
210/// to simultaneously push to and pop from the queue.
211/// This enables a parallel program where tasks can be handled by multiple threads,
212/// while at the same time, new tasks can be created and dynamically added to the queue.
213///
214/// In the following example, the queue is created with three pre-populated tasks.
215/// Every task might potentially lead to new tasks.
216/// These new tasks are also added to the back of the queue,
217/// to be popped later and potentially add new tasks to the queue.
218///
219/// ```
220/// use orx_concurrent_queue::ConcurrentQueue;
221/// use std::sync::atomic::{AtomicUsize, Ordering};
222///
223/// struct Task {
224/// micros: usize,
225/// }
226///
227/// impl Task {
228/// fn perform(&self) {
229/// use std::{thread::sleep, time::Duration};
230/// sleep(Duration::from_micros(self.micros as u64));
231/// }
232///
233/// fn child_tasks(&self) -> impl ExactSizeIterator<Item = Task> {
234/// let range = match self.micros < 5 {
235/// true => 0..0,
236/// false => 0..self.micros,
237/// };
238///
239/// range.rev().take(5).map(|micros| Self { micros })
240/// }
241/// }
242///
243/// let queue = ConcurrentQueue::new();
244/// for micros in [10, 15, 10] {
245/// queue.push(Task { micros });
246/// }
247///
248/// let num_performed_tasks = AtomicUsize::new(queue.len());
249///
250/// let num_threads = 8;
251/// std::thread::scope(|s| {
252/// for _ in 0..num_threads {
253/// s.spawn(|| {
254/// // keep popping a task from front of the queue
255/// // as long as the queue is not empty
256/// while let Some(task) = queue.pop() {
257/// // create children tasks, add to back
258/// queue.extend(task.child_tasks());
259///
260/// // perform the popped task
261/// task.perform();
262///
263/// _ = num_performed_tasks.fetch_add(1, Ordering::Relaxed);
264/// }
265/// });
266/// }
267/// });
268///
269/// assert_eq!(num_performed_tasks.load(Ordering::Relaxed), 5046);
270/// ```
271pub struct ConcurrentQueue<T, P = DefaultConPinnedVec<T>>
272where
273 T: Send,
274 P: ConcurrentPinnedVec<T>,
275{
276 vec: P,
277 phantom: PhantomData<T>,
278 written: AtomicUsize,
279 write_reserved: AtomicUsize,
280 popped: AtomicUsize,
281}
282
283unsafe impl<T, P> Sync for ConcurrentQueue<T, P>
284where
285 T: Send,
286 P: ConcurrentPinnedVec<T>,
287{
288}
289
290impl<T, P> Drop for ConcurrentQueue<T, P>
291where
292 T: Send,
293 P: ConcurrentPinnedVec<T>,
294{
295 fn drop(&mut self) {
296 if core::mem::needs_drop::<T>() {
297 let popped = self.popped.load(Ordering::Relaxed);
298 let written = self.written.load(Ordering::Relaxed);
299 for i in popped..written {
300 let ptr = unsafe { self.ptr(i) };
301 unsafe { ptr.drop_in_place() };
302 }
303 }
304 unsafe { self.vec.set_pinned_vec_len(0) };
305 }
306}
307
308impl<T, P> From<P> for ConcurrentQueue<T, P::ConPinnedVec>
309where
310 T: Send,
311 P: IntoConcurrentPinnedVec<T>,
312{
313 fn from(vec: P) -> Self {
314 Self {
315 phantom: PhantomData,
316 written: vec.len().into(),
317 write_reserved: vec.len().into(),
318 popped: 0.into(),
319 vec: vec.into_concurrent(),
320 }
321 }
322}
323
324impl<T, P> ConcurrentQueue<T, P>
325where
326 T: Send,
327 P: ConcurrentPinnedVec<T>,
328{
329 /// Converts the bag into the underlying pinned vector.
330 ///
331 /// Whenever the second generic parameter is omitted, the underlying pinned vector is [`SplitVec`] with [`Doubling`] growth.
332 ///
333 /// [`SplitVec`]: orx_split_vec::SplitVec
334 /// [`Doubling`]: orx_split_vec::Doubling
335 ///
336 /// # Examples
337 ///
338 /// ```
339 /// use orx_concurrent_queue::ConcurrentQueue;
340 /// use orx_split_vec::SplitVec;
341 ///
342 /// let queue = ConcurrentQueue::new();
343 ///
344 /// queue.push(0); // [0]
345 /// queue.push(1); // [0, 1]
346 /// _ = queue.pop(); // [1]
347 /// queue.extend(2..7); // [1, 2, 3, 4, 5, 6]
348 /// _ = queue.pull(4).unwrap(); // [5, 6]
349 ///
350 /// let vec: SplitVec<i32> = queue.into_inner();
351 /// assert_eq!(vec, vec![5, 6]);
352 ///
353 /// let vec: Vec<i32> = vec.to_vec();
354 /// assert_eq!(vec, vec![5, 6]);
355 /// ```
356 pub fn into_inner(mut self) -> <P as ConcurrentPinnedVec<T>>::P
357 where
358 <P as ConcurrentPinnedVec<T>>::P:
359 PseudoDefault + IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
360 {
361 let vec: <P as ConcurrentPinnedVec<T>>::P = PseudoDefault::pseudo_default();
362 let mut vec = vec.into_concurrent();
363 core::mem::swap(&mut self.vec, &mut vec);
364
365 let a = self.popped.load(Ordering::Relaxed);
366 let b = self.written.load(Ordering::Relaxed);
367 let len = b.saturating_sub(a);
368 if a > 0 {
369 let src = unsafe { vec.ptr_iter_unchecked(a..b) };
370 let dst = unsafe { vec.ptr_iter_unchecked(0..len) };
371 for (s, d) in src.zip(dst) {
372 unsafe { d.write(s.read()) };
373 }
374 }
375
376 for x in [&self.written, &self.write_reserved, &self.popped] {
377 x.store(0, Ordering::Relaxed);
378 }
379
380 unsafe { vec.into_inner(len) }
381 }
382
383 // shrink
384
385 /// Pops and returns the element in the front of the queue; returns None if the queue is empty.
386 ///
387 /// # Examples
388 ///
389 /// ```
390 /// use orx_concurrent_queue::*;
391 ///
392 /// let queue = ConcurrentQueue::new();
393 ///
394 /// queue.extend(1..4);
395 /// assert_eq!(queue.pop(), Some(1));
396 /// assert_eq!(queue.pop(), Some(2));
397 /// assert_eq!(queue.pop(), Some(3));
398 /// assert_eq!(queue.pop(), None);
399 /// ```
400 pub fn pop(&self) -> Option<T> {
401 let idx = self.popped.fetch_add(1, Ordering::Relaxed);
402
403 loop {
404 let written = self.written.load(Ordering::Acquire);
405 match idx < written {
406 true => return Some(unsafe { self.ptr(idx).read() }),
407 false => {
408 if comp_exch(&self.popped, idx + 1, idx).is_ok() {
409 return None;
410 }
411 }
412 }
413 }
414 }
415
416 /// Pulls `chunk_size` elements from the front of the queue:
417 ///
418 /// * returns None if `chunk_size` is zero,
419 /// * returns Some of an ExactSizeIterator with `len = chunk_size` if the queue has at least `chunk_size` items,
420 /// * returns Some of a non-empty ExactSizeIterator with `len` such that `0 < len < chunk_size` if the queue
421 /// has `len` elements,
422 /// * returns None if the queue is empty.
423 ///
424 /// Therefore, if the method returns a Some variant, the exact size iterator is not empty.
425 ///
426 /// Pulled elements are guaranteed to be consecutive elements in the queue.
427 ///
428 /// In order to reduce the number of concurrent state updates, `pull` with a large enough chunk size might be preferred over `pop` whenever possible.
429 ///
430 /// # Examples
431 ///
432 /// ```
433 /// use orx_concurrent_queue::*;
434 ///
435 /// let queue = ConcurrentQueue::new();
436 ///
437 /// queue.extend(1..6);
438 /// assert_eq!(
439 /// queue.pull(2).map(|x| x.collect::<Vec<_>>()),
440 /// Some(vec![1, 2])
441 /// );
442 /// assert_eq!(
443 /// queue.pull(7).map(|x| x.collect::<Vec<_>>()),
444 /// Some(vec![3, 4, 5])
445 /// );
446 /// assert_eq!(queue.pull(1).map(|x| x.collect::<Vec<_>>()), None);
447 /// ```
448 pub fn pull(&self, chunk_size: usize) -> Option<QueueIterOwned<'_, T, P>> {
449 match chunk_size > 0 {
450 true => {
451 let begin_idx = self.popped.fetch_add(chunk_size, Ordering::Relaxed);
452 let end_idx = begin_idx + chunk_size;
453
454 loop {
455 let written = self.written.load(Ordering::Acquire);
456
457 let has_none = begin_idx >= written;
458 let has_some = !has_none;
459 let has_all = end_idx <= written;
460
461 let range = match (has_some, has_all) {
462 (false, _) => match comp_exch(&self.popped, end_idx, begin_idx).is_ok() {
463 true => return None,
464 false => None,
465 },
466 (true, true) => Some(begin_idx..end_idx),
467 (true, false) => Some(begin_idx..written),
468 };
469
470 if let Some(range) = range {
471 let ok = match has_all {
472 true => true,
473 false => comp_exch(&self.popped, end_idx, range.end).is_ok(),
474 };
475
476 if ok {
477 let iter = unsafe { self.vec.ptr_iter_unchecked(range) };
478 return Some(QueueIterOwned::new(iter));
479 }
480 }
481 }
482 }
483 false => None,
484 }
485 }
486
487 // shrink with idx
488
489 /// Pops and returns the element in the front of the queue together with its index;
490 /// returns None if the queue is empty.
491 ///
492 /// # Examples
493 ///
494 /// ```
495 /// use orx_concurrent_queue::*;
496 ///
497 /// let queue = ConcurrentQueue::new();
498 ///
499 /// queue.extend(1..4);
500 /// assert_eq!(queue.pop_with_idx(), Some((0, 1)));
501 /// assert_eq!(queue.pop_with_idx(), Some((1, 2)));
502 /// assert_eq!(queue.pop_with_idx(), Some((2, 3)));
503 /// assert_eq!(queue.pop_with_idx(), None);
504 /// ```
505 pub fn pop_with_idx(&self) -> Option<(usize, T)> {
506 let idx = self.popped.fetch_add(1, Ordering::Relaxed);
507
508 loop {
509 let written = self.written.load(Ordering::Acquire);
510 match idx < written {
511 true => return Some((idx, unsafe { self.ptr(idx).read() })),
512 false => {
513 if comp_exch(&self.popped, idx + 1, idx).is_ok() {
514 return None;
515 }
516 }
517 }
518 }
519 }
520
521 /// Pulls `chunk_size` elements from the front of the queue together with the index of the first pulled element:
522 ///
523 /// * returns None if `chunk_size` is zero,
524 /// * returns Some of an ExactSizeIterator with `len = chunk_size` if the queue has at least `chunk_size` items,
525 /// * returns Some of a non-empty ExactSizeIterator with `len` such that `0 < len < chunk_size` if the queue
526 /// has `len` elements,
527 /// * returns None if the queue is empty.
528 ///
529 /// Therefore, if the method returns a Some variant, the exact size iterator is not empty.
530 ///
531 /// Pulled elements are guaranteed to be consecutive elements in the queue. Therefore, knowing the index of the first pulled element,
532 /// indices of all pulled elements can be known.
533 ///
534 /// In order to reduce the number of concurrent state updates, `pull` with a large enough chunk size might be preferred over `pop` whenever possible.
535 ///
536 /// # Examples
537 ///
538 /// ```
539 /// use orx_concurrent_queue::*;
540 ///
541 /// let queue = ConcurrentQueue::new();
542 ///
543 /// queue.extend(1..6);
544 /// assert_eq!(
545 /// queue.pull_with_idx(2).map(|(i, x)| x.enumerate().map(|(j, x)| (i + j, x)).collect::<Vec<_>>()),
546 /// Some(vec![(0, 1), (1, 2)])
547 /// );
548 /// assert_eq!(
549 /// queue.pull_with_idx(7).map(|(i, x)| x.enumerate().map(|(j, x)| (i + j, x)).collect::<Vec<_>>()),
550 /// Some(vec![(2, 3), (3, 4), (4, 5)])
551 /// );
552 /// assert_eq!(queue.pull_with_idx(1).map(|(i, x)| x.enumerate().map(|(j, x)| (i + j, x)).collect::<Vec<_>>()), None);
553 /// ```
554 pub fn pull_with_idx(&self, chunk_size: usize) -> Option<(usize, QueueIterOwned<'_, T, P>)> {
555 match chunk_size > 0 {
556 true => {
557 let begin_idx = self.popped.fetch_add(chunk_size, Ordering::Relaxed);
558 let end_idx = begin_idx + chunk_size;
559
560 loop {
561 let written = self.written.load(Ordering::Acquire);
562
563 let has_none = begin_idx >= written;
564 let has_some = !has_none;
565 let has_all = end_idx <= written;
566
567 let range = match (has_some, has_all) {
568 (false, _) => match comp_exch(&self.popped, end_idx, begin_idx).is_ok() {
569 true => return None,
570 false => None,
571 },
572 (true, true) => Some(begin_idx..end_idx),
573 (true, false) => Some(begin_idx..written),
574 };
575
576 if let Some(range) = range {
577 let ok = match has_all {
578 true => true,
579 false => comp_exch(&self.popped, end_idx, range.end).is_ok(),
580 };
581
582 if ok {
583 let iter = unsafe { self.vec.ptr_iter_unchecked(range) };
584 return Some((begin_idx, QueueIterOwned::new(iter)));
585 }
586 }
587 }
588 }
589 false => None,
590 }
591 }
592
593 // grow
594
595 /// Pushes the `value` to the back of the queue.
596 ///
597 /// # Examples
598 ///
599 /// ```
600 /// use orx_concurrent_queue::*;
601 ///
602 /// let queue = ConcurrentQueue::new();
603 ///
604 /// queue.push(1);
605 /// queue.push(2);
606 /// queue.push(3);
607 /// assert_eq!(queue.into_inner(), vec![1, 2, 3]);
608 /// ```
609 pub fn push(&self, value: T) {
610 let idx = self.write_reserved.fetch_add(1, Ordering::Relaxed);
611 self.assert_has_capacity_for(idx);
612
613 loop {
614 match WritePermit::for_one(self.vec.capacity(), idx) {
615 WritePermit::JustWrite => {
616 unsafe { self.ptr(idx).write(value) };
617 break;
618 }
619 WritePermit::GrowThenWrite => {
620 self.grow_to(idx + 1);
621 unsafe { self.ptr(idx).write(value) };
622 break;
623 }
624 WritePermit::Spin => {}
625 }
626 }
627
628 let num_written = idx + 1;
629 while comp_exch_weak(&self.written, idx, num_written).is_err() {}
630 }
631
632 /// Extends the queue by pushing `values` elements to the back of the queue.
633 ///
634 /// In order to reduce the number of concurrent state updates, `extend` might be preferred over `push` whenever possible.
635 ///
636 /// # Examples
637 ///
638 /// ```
639 /// use orx_concurrent_queue::ConcurrentQueue;
640 ///
641 /// let queue = ConcurrentQueue::new();
642 ///
643 /// queue.extend(1..3);
644 /// queue.extend(vec![3, 4, 5, 6]);
645 ///
646 /// assert_eq!(queue.into_inner(), vec![1, 2, 3, 4, 5, 6]);
647 /// ```
648 pub fn extend<I, Iter>(&self, values: I)
649 where
650 I: IntoIterator<Item = T, IntoIter = Iter>,
651 Iter: ExactSizeIterator<Item = T>,
652 {
653 let values = values.into_iter();
654 let num_items = values.len();
655
656 if num_items > 0 {
657 let begin_idx = self.write_reserved.fetch_add(num_items, Ordering::Relaxed);
658 let end_idx = begin_idx + num_items;
659 let last_idx = begin_idx + num_items - 1;
660 self.assert_has_capacity_for(last_idx);
661
662 loop {
663 match WritePermit::for_many(self.vec.capacity(), begin_idx, last_idx) {
664 WritePermit::JustWrite => {
665 let iter = unsafe { self.vec.ptr_iter_unchecked(begin_idx..end_idx) };
666 for (p, value) in iter.zip(values) {
667 unsafe { p.write(value) };
668 }
669 break;
670 }
671 WritePermit::GrowThenWrite => {
672 self.grow_to(end_idx);
673 let iter = unsafe { self.vec.ptr_iter_unchecked(begin_idx..end_idx) };
674 for (p, value) in iter.zip(values) {
675 unsafe { p.write(value) };
676 }
677 break;
678 }
679 WritePermit::Spin => {}
680 }
681 }
682
683 while comp_exch_weak(&self.written, begin_idx, end_idx).is_err() {}
684 }
685 }
686
687 // get
688
689 /// Returns the number of elements in the queue.
690 ///
691 /// Importantly note that `len` is a shorthand for:
692 ///
693 /// ```ignore
694 /// let written = self.num_written(Ordering::Relaxed);
695 /// let popped = self.num_popped(Ordering::Relaxed);
696 /// written - popped
697 /// ```
698 ///
699 /// When a different ordering is required, you may write your own `len` method
700 /// using [`num_written`] and [`num_popped`] methods.
701 ///
702 /// [`num_written`]: ConcurrentQueue::num_written
703 /// [`num_popped`]: ConcurrentQueue::num_popped
704 ///
705 /// # Examples
706 ///
707 /// ```
708 /// use orx_concurrent_queue::ConcurrentQueue;
709 ///
710 /// let queue = ConcurrentQueue::new();
711 ///
712 /// queue.push(1);
713 /// queue.push(2);
714 /// assert_eq!(queue.len(), 2);
715 ///
716 /// queue.extend(vec![3, 4, 5, 6]);
717 /// assert_eq!(queue.len(), 6);
718 ///
719 /// _ = queue.pop();
720 /// assert_eq!(queue.len(), 5);
721 ///
722 /// _ = queue.pull(4);
723 /// assert_eq!(queue.len(), 1);
724 /// ```
725 #[inline(always)]
726 pub fn len(&self) -> usize {
727 self.written
728 .load(Ordering::Relaxed)
729 .saturating_sub(self.popped.load(Ordering::Relaxed))
730 }
731
732 /// Returns the total number of positions written; i.e., total of
733 /// number of times we pushed and sum of lengths of iterators that
734 /// we extended the queue with.
735 ///
736 /// See [`num_write_reserved`] to get the number of positions which are
737 /// reserved to be written.
738 ///
739 /// Note that in a synchronous program, number of reserved positions
740 /// will be equal to the number of written positions.
741 ///
742 /// In a concurrent program; however, it is possible to observe that
743 /// `num_write_reserved >= num_written` since we might observe the
744 /// counts while writing of some elements are in progress.
745 ///
746 /// However, we can never observe `num_write_reserved < num_written`.
747 ///
748 /// [`num_written`]: ConcurrentQueue::num_written
749 ///
750 /// # Examples
751 ///
752 /// ```
753 /// use orx_concurrent_queue::*;
754 /// use std::sync::atomic::Ordering;
755 ///
756 /// let queue = ConcurrentQueue::new();
757 ///
758 /// assert_eq!(queue.num_written(Ordering::Relaxed), 0);
759 ///
760 /// queue.push(1);
761 /// assert_eq!(queue.num_written(Ordering::Relaxed), 1);
762 ///
763 /// queue.extend([2, 3, 4]);
764 /// assert_eq!(queue.num_written(Ordering::Relaxed), 4);
765 ///
766 /// _ = queue.pop();
767 /// assert_eq!(queue.num_written(Ordering::Relaxed), 4);
768 ///
769 /// _ = queue.pull(2);
770 /// assert_eq!(queue.num_written(Ordering::Relaxed), 4);
771 ///
772 /// _ = queue.pull(10); // only 1 is pulled
773 /// assert_eq!(queue.num_written(Ordering::Relaxed), 4);
774 ///
775 /// _ = queue.pop(); // None
776 /// assert_eq!(queue.num_written(Ordering::Relaxed), 4);
777 /// ```
778 #[inline(always)]
779 pub fn num_written(&self, order: Ordering) -> usize {
780 self.written.load(order)
781 }
782
783 /// Returns the total number of positions reserved to be written.
784 ///
785 /// See [`num_written`] to get the number of elements which are
786 /// completely written.
787 ///
788 /// Note that in a synchronous program, number of reserved positions
789 /// will be equal to the number of written positions.
790 ///
791 /// In a concurrent program; however, it is possible to observe that
792 /// `num_write_reserved >= num_written` since we might observe the
793 /// counts while writing of some elements are in progress.
794 ///
795 /// However, we can never observe `num_write_reserved < num_written`.
796 ///
797 /// [`num_written`]: ConcurrentQueue::num_written
798 ///
799 /// # Examples
800 ///
801 /// ```
802 /// use orx_concurrent_queue::*;
803 /// use std::sync::atomic::Ordering;
804 ///
805 /// let queue = ConcurrentQueue::new();
806 ///
807 /// assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 0);
808 ///
809 /// queue.push(1);
810 /// assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 1);
811 ///
812 /// queue.extend([2, 3, 4]);
813 /// assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 4);
814 ///
815 /// _ = queue.pop();
816 /// assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 4);
817 ///
818 /// _ = queue.pull(2);
819 /// assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 4);
820 ///
821 /// _ = queue.pull(10); // only 1 is pulled
822 /// assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 4);
823 ///
824 /// _ = queue.pop(); // None
825 /// assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 4);
826 /// ```
827 #[inline(always)]
828 pub fn num_write_reserved(&self, order: Ordering) -> usize {
829 self.write_reserved.load(order)
830 }
831
832 /// Returns the number of popped elements so far.
833 ///
834 /// # Examples
835 ///
836 /// ```
837 /// use orx_concurrent_queue::*;
838 /// use std::sync::atomic::Ordering;
839 ///
840 /// let queue = ConcurrentQueue::new();
841 ///
842 /// assert_eq!(queue.num_popped(Ordering::Relaxed), 0);
843 ///
844 /// queue.push(1);
845 /// queue.extend([2, 3, 4]);
846 /// assert_eq!(queue.num_popped(Ordering::Relaxed), 0);
847 ///
848 /// _ = queue.pop();
849 /// assert_eq!(queue.num_popped(Ordering::Relaxed), 1);
850 ///
851 /// _ = queue.pull(2);
852 /// assert_eq!(queue.num_popped(Ordering::Relaxed), 3);
853 ///
854 /// _ = queue.pull(10); // only 1 is pulled
855 /// assert_eq!(queue.num_popped(Ordering::Relaxed), 4);
856 ///
857 /// _ = queue.pop(); // None
858 /// assert_eq!(queue.num_popped(Ordering::Relaxed), 4);
859 /// ```
860 #[inline(always)]
861 pub fn num_popped(&self, order: Ordering) -> usize {
862 self.popped.load(order)
863 }
864
865 /// Returns true if the queue is empty, false otherwise.
866 ///
867 /// # Examples
868 ///
869 /// ```
870 /// use orx_concurrent_queue::ConcurrentQueue;
871 ///
872 /// let queue = ConcurrentQueue::new();
873 ///
874 /// assert!(queue.is_empty());
875 ///
876 /// queue.push(1);
877 /// queue.push(2);
878 /// assert!(!queue.is_empty());
879 ///
880 /// _ = queue.pull(4);
881 /// assert!(queue.is_empty());
882 /// ```
883 pub fn is_empty(&self) -> bool {
884 self.written.load(Ordering::Relaxed) == self.popped.load(Ordering::Relaxed)
885 }
886
887 /// Returns an iterator of references to items in the queue.
888 ///
889 /// # Examples
890 ///
891 /// ```
892 /// use orx_concurrent_queue::ConcurrentQueue;
893 ///
894 /// let mut queue = ConcurrentQueue::new();
895 ///
896 /// queue.push(1);
897 /// queue.push(2);
898 /// queue.push(3);
899 ///
900 /// let sum: i32 = queue.iter().sum();
901 /// assert_eq!(sum, 6);
902 /// ```
903 ///
904 /// # Safety
905 ///
906 /// Notice that this call requires a mutually exclusive `&mut self` reference.
907 /// This is due to the fact that iterators are lazy and they are not necessarily consumed immediately.
908 /// On the other hand, concurrent queue allows for popping elements from the queue with a shared reference.
909 /// This could've led to the following undefined behavior.
910 ///
911 /// To prevent this, `iter` requires a mutually exclusive reference, and hence, the following code does not compile.
912 ///
913 /// ```compile_fail
914 /// use orx_concurrent_queue::ConcurrentQueue;
915 ///
916 /// let queue = ConcurrentQueue::new();
917 ///
918 /// queue.push(1);
919 /// queue.push(2);
920 /// queue.push(3);
921 ///
922 /// let iter = queue.iter(); // iterator over elements 1, 2 and 3
923 ///
924 /// _ = queue.pop(); // 1 is removed
925 ///
926 /// let sum = iter.sum(); // UB
927 /// ```
928 pub fn iter(&mut self) -> impl ExactSizeIterator<Item = &T> {
929 QueueIterOfRef::<T, P>::new(self.ptr_iter())
930 }
931
932 /// Returns an iterator of mutable references to items in the queue.
933 ///
934 /// # Examples
935 ///
936 /// ```
937 /// use orx_concurrent_queue::ConcurrentQueue;
938 ///
939 /// let mut queue = ConcurrentQueue::new();
940 ///
941 /// queue.push(1);
942 /// queue.push(2);
943 /// queue.push(3);
944 ///
945 /// for x in queue.iter_mut() {
946 /// *x += 10;
947 /// }
948 ///
949 /// assert_eq!(queue.into_inner(), vec![11, 12, 13]);
950 /// ```
951 pub fn iter_mut(&mut self) -> impl ExactSizeIterator<Item = &mut T> {
952 QueueIterOfMut::<T, P>::new(self.ptr_iter())
953 }
954
955 // helpers
956
957 #[inline(always)]
958 unsafe fn ptr(&self, idx: usize) -> *mut T {
959 unsafe { self.vec.get_ptr_mut(idx) }
960 }
961
962 #[inline(always)]
963 fn assert_has_capacity_for(&self, idx: usize) {
964 assert!(
965 idx < self.vec.max_capacity(),
966 "Out of capacity. Underlying pinned vector cannot grow any further while being concurrently safe."
967 );
968 }
969
970 fn grow_to(&self, new_capacity: usize) {
971 _ = self
972 .vec
973 .grow_to(new_capacity)
974 .expect("The underlying pinned vector reached its capacity and failed to grow");
975 }
976
977 pub(super) fn valid_range(&mut self) -> Range<usize> {
978 self.popped.load(Ordering::Relaxed)..self.written.load(Ordering::Relaxed)
979 }
980
981 pub(super) fn ptr_iter(&mut self) -> P::PtrIter<'_> {
982 let range = self.valid_range();
983 // SAFETY: with a mut ref, we ensure that the range contains all and only valid values
984 unsafe { self.vec.ptr_iter_unchecked(range) }
985 }
986
987 /// Destructs the concurrent queue into its inner pieces:
988 /// * underlying concurrent pinned vector,
989 /// * number of written elements, and
990 /// * number of popped elements.
991 ///
992 /// # Safety
993 ///
994 /// Note that the destruction operation of the queue is safe.
995 /// However, it disconnects the concurrent pinned vector from the information
996 /// of which elements are taken out and which are still to be dropped.
997 /// Therefore, the caller is responsible to drop all elements within the range
998 /// `popped..written`.
999 pub unsafe fn destruct(mut self) -> (P, usize, usize)
1000 where
1001 <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
1002 {
1003 let popped = self.popped.load(Ordering::Relaxed);
1004 let write_reserved = self.write_reserved.load(Ordering::Relaxed);
1005 let written = self.written.load(Ordering::Relaxed);
1006 debug_assert_eq!(written, write_reserved);
1007 debug_assert!(written >= popped);
1008
1009 let vec: <P as ConcurrentPinnedVec<T>>::P = PseudoDefault::pseudo_default();
1010 let mut vec = vec.into_concurrent();
1011 core::mem::swap(&mut self.vec, &mut vec);
1012
1013 self.popped.store(0, Ordering::Relaxed);
1014 self.write_reserved.store(0, Ordering::Relaxed);
1015 self.written.store(0, Ordering::Relaxed);
1016
1017 (vec, written, popped)
1018 }
1019}