maniac_runtime/runtime/deque.rs
1use std::alloc::{Layout, alloc_zeroed, handle_alloc_error};
2use std::boxed::Box;
3use std::cell::{Cell, UnsafeCell};
4use std::cmp;
5use std::fmt;
6use std::marker::PhantomData;
7use std::mem::{self, MaybeUninit};
8use std::ptr;
9use std::sync::Arc;
10use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
11
12use crate::utils::CachePadded;
13use crossbeam_epoch::{self as epoch, Atomic, Owned};
14use crossbeam_utils::Backoff;
15
16// Minimum buffer capacity.
17const MIN_CAP: usize = 64;
18// Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`.
19const MAX_BATCH: usize = 32;
20// If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets
21// deallocated as soon as possible.
22const FLUSH_THRESHOLD_BYTES: usize = 1 << 10;
23
24/// A buffer that holds tasks in a worker queue.
25///
26/// This is just a pointer to the buffer and its length - dropping an instance of this struct will
27/// *not* deallocate the buffer.
28struct Buffer<T> {
29 /// Pointer to the allocated memory.
30 ptr: *mut T,
31
32 /// Capacity of the buffer. Always a power of two.
33 cap: usize,
34}
35
36unsafe impl<T> Send for Buffer<T> {}
37
38impl<T> Buffer<T> {
39 /// Allocates a new buffer with the specified capacity.
40 fn alloc(cap: usize) -> Buffer<T> {
41 debug_assert_eq!(cap, cap.next_power_of_two());
42
43 let ptr = Box::into_raw(
44 (0..cap)
45 .map(|_| MaybeUninit::<T>::uninit())
46 .collect::<Box<[_]>>(),
47 )
48 .cast::<T>();
49
50 Buffer { ptr, cap }
51 }
52
53 /// Deallocates the buffer.
54 unsafe fn dealloc(self) {
55 drop(unsafe {
56 Box::from_raw(ptr::slice_from_raw_parts_mut(
57 self.ptr.cast::<MaybeUninit<T>>(),
58 self.cap,
59 ))
60 });
61 }
62
63 /// Returns a pointer to the task at the specified `index`.
64 unsafe fn at(&self, index: isize) -> *mut T {
65 // `self.cap` is always a power of two.
66 // We do all the loads at `MaybeUninit` because we might realize, after loading, that we
67 // don't actually have the right to access this memory.
68 unsafe { self.ptr.offset(index & (self.cap - 1) as isize) }
69 }
70
71 /// Writes `task` into the specified `index`.
72 ///
73 /// This method might be concurrently called with another `read` at the same index, which is
74 /// technically speaking a data race and therefore UB. We should use an atomic store here, but
75 /// that would be more expensive and difficult to implement generically for all types `T`.
76 /// Hence, as a hack, we use a volatile write instead.
77 unsafe fn write(&self, index: isize, task: MaybeUninit<T>) {
78 unsafe { ptr::write_volatile(self.at(index).cast::<MaybeUninit<T>>(), task) }
79 }
80
81 /// Reads a task from the specified `index`.
82 ///
83 /// This method might be concurrently called with another `write` at the same index, which is
84 /// technically speaking a data race and therefore UB. We should use an atomic load here, but
85 /// that would be more expensive and difficult to implement generically for all types `T`.
86 /// Hence, as a hack, we use a volatile load instead.
87 unsafe fn read(&self, index: isize) -> MaybeUninit<T> {
88 unsafe { ptr::read_volatile(self.at(index).cast::<MaybeUninit<T>>()) }
89 }
90}
91
92impl<T> Clone for Buffer<T> {
93 fn clone(&self) -> Buffer<T> {
94 *self
95 }
96}
97
98impl<T> Copy for Buffer<T> {}
99
100/// Internal queue data shared between the worker and stealers.
101///
102/// The implementation is based on the following work:
103///
104/// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev]
105/// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models.
106/// PPoPP 2013.][weak-mem]
107/// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++
108/// atomics. OOPSLA 2013.][checker]
109///
110/// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974
111/// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524
112/// [checker]: https://dl.acm.org/citation.cfm?id=2509514
113struct Inner<T> {
114 /// The front index.
115 front: AtomicIsize,
116
117 /// The back index.
118 back: AtomicIsize,
119
120 /// The underlying buffer.
121 buffer: CachePadded<Atomic<Buffer<T>>>,
122}
123
124impl<T> Drop for Inner<T> {
125 fn drop(&mut self) {
126 // Load the back index, front index, and buffer.
127 let b = *self.back.get_mut();
128 let f = *self.front.get_mut();
129
130 unsafe {
131 let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected());
132
133 // Go through the buffer from front to back and drop all tasks in the queue.
134 let mut i = f;
135 while i != b {
136 buffer.deref().at(i).drop_in_place();
137 i = i.wrapping_add(1);
138 }
139
140 // Free the memory allocated by the buffer.
141 buffer.into_owned().into_box().dealloc();
142 }
143 }
144}
145
146/// Worker queue flavor: FIFO or LIFO.
147#[derive(Clone, Copy, Debug, Eq, PartialEq)]
148enum Flavor {
149 /// The first-in first-out flavor.
150 Fifo,
151
152 /// The last-in first-out flavor.
153 Lifo,
154}
155
156/// A worker queue.
157///
158/// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal
159/// tasks from it. Task schedulers typically create a single worker queue per thread.
160///
161/// # Examples
162///
163/// A FIFO worker:
164///
165/// ```
166/// use crossbeam_deque::{Steal, Worker};
167///
168/// let w = Worker::new_fifo();
169/// let s = w.stealer();
170///
171/// w.push(1);
172/// w.push(2);
173/// w.push(3);
174///
175/// assert_eq!(s.steal(), Steal::Success(1));
176/// assert_eq!(w.pop(), Some(2));
177/// assert_eq!(w.pop(), Some(3));
178/// ```
179///
180/// A LIFO worker:
181///
182/// ```
183/// use crossbeam_deque::{Steal, Worker};
184///
185/// let w = Worker::new_lifo();
186/// let s = w.stealer();
187///
188/// w.push(1);
189/// w.push(2);
190/// w.push(3);
191///
192/// assert_eq!(s.steal(), Steal::Success(1));
193/// assert_eq!(w.pop(), Some(3));
194/// assert_eq!(w.pop(), Some(2));
195/// ```
196pub struct Worker<T> {
197 /// A reference to the inner representation of the queue.
198 inner: Arc<CachePadded<Inner<T>>>,
199
200 /// A copy of `inner.buffer` for quick access.
201 buffer: Cell<Buffer<T>>,
202
203 /// The flavor of the queue.
204 flavor: Flavor,
205 // Indicates that the worker cannot be shared among threads.
206 // _marker: PhantomData<*mut ()>, // !Send + !Sync
207}
208
209unsafe impl<T: Send> Send for Worker<T> {}
210unsafe impl<T: Send> Sync for Worker<T> {}
211
212impl<T> Worker<T> {
213 /// Creates a FIFO worker queue.
214 ///
215 /// Tasks are pushed and popped from opposite ends.
216 ///
217 /// # Examples
218 ///
219 /// ```
220 /// use crossbeam_deque::Worker;
221 ///
222 /// let w = Worker::<i32>::new_fifo();
223 /// ```
224 pub fn new_fifo() -> Worker<T> {
225 let buffer = Buffer::alloc(MIN_CAP);
226
227 let inner = Arc::new(CachePadded::new(Inner {
228 front: AtomicIsize::new(0),
229 back: AtomicIsize::new(0),
230 buffer: CachePadded::new(Atomic::new(buffer)),
231 }));
232
233 Worker {
234 inner,
235 buffer: Cell::new(buffer),
236 flavor: Flavor::Fifo,
237 // _marker: PhantomData,
238 }
239 }
240
241 /// Creates a LIFO worker queue.
242 ///
243 /// Tasks are pushed and popped from the same end.
244 ///
245 /// # Examples
246 ///
247 /// ```
248 /// use crossbeam_deque::Worker;
249 ///
250 /// let w = Worker::<i32>::new_lifo();
251 /// ```
252 pub fn new_lifo() -> Worker<T> {
253 let buffer = Buffer::alloc(MIN_CAP);
254
255 let inner = Arc::new(CachePadded::new(Inner {
256 front: AtomicIsize::new(0),
257 back: AtomicIsize::new(0),
258 buffer: CachePadded::new(Atomic::new(buffer)),
259 }));
260
261 Worker {
262 inner,
263 buffer: Cell::new(buffer),
264 flavor: Flavor::Lifo,
265 // _marker: PhantomData,
266 }
267 }
268
269 /// Creates a stealer for this queue.
270 ///
271 /// The returned stealer can be shared among threads and cloned.
272 ///
273 /// # Examples
274 ///
275 /// ```
276 /// use crossbeam_deque::Worker;
277 ///
278 /// let w = Worker::<i32>::new_lifo();
279 /// let s = w.stealer();
280 /// ```
281 pub fn stealer(&self) -> Stealer<T> {
282 Stealer {
283 inner: self.inner.clone(),
284 flavor: self.flavor,
285 }
286 }
287
288 /// Resizes the internal buffer to the new capacity of `new_cap`.
289 #[cold]
290 unsafe fn resize(&self, new_cap: usize) {
291 // Load the back index, front index, and buffer.
292 let b = self.inner.back.load(Ordering::Relaxed);
293 let f = self.inner.front.load(Ordering::Relaxed);
294 let buffer = self.buffer.get();
295
296 // Allocate a new buffer and copy data from the old buffer to the new one.
297 let new = Buffer::alloc(new_cap);
298 let mut i = f;
299 while i != b {
300 unsafe { ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1) };
301 i = i.wrapping_add(1);
302 }
303
304 let guard = &epoch::pin();
305
306 // Replace the old buffer with the new one.
307 self.buffer.replace(new);
308 let old =
309 self.inner
310 .buffer
311 .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard);
312
313 // Destroy the old buffer later.
314 unsafe { guard.defer_unchecked(move || old.into_owned().into_box().dealloc()) };
315
316 // If the buffer is very large, then flush the thread-local garbage in order to deallocate
317 // it as soon as possible.
318 if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES {
319 guard.flush();
320 }
321 }
322
323 /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the
324 /// buffer.
325 fn reserve(&self, reserve_cap: usize) {
326 if reserve_cap > 0 {
327 // Compute the current length.
328 let b = self.inner.back.load(Ordering::Relaxed);
329 let f = self.inner.front.load(Ordering::SeqCst);
330 let len = b.wrapping_sub(f) as usize;
331
332 // The current capacity.
333 let cap = self.buffer.get().cap;
334
335 // Is there enough capacity to push `reserve_cap` tasks?
336 if cap - len < reserve_cap {
337 // Keep doubling the capacity as much as is needed.
338 let mut new_cap = cap * 2;
339 while new_cap - len < reserve_cap {
340 new_cap *= 2;
341 }
342
343 // Resize the buffer.
344 unsafe {
345 self.resize(new_cap);
346 }
347 }
348 }
349 }
350
351 /// Returns `true` if the queue is empty.
352 ///
353 /// ```
354 /// use crossbeam_deque::Worker;
355 ///
356 /// let w = Worker::new_lifo();
357 ///
358 /// assert!(w.is_empty());
359 /// w.push(1);
360 /// assert!(!w.is_empty());
361 /// ```
362 pub fn is_empty(&self) -> bool {
363 let b = self.inner.back.load(Ordering::Relaxed);
364 let f = self.inner.front.load(Ordering::SeqCst);
365 b.wrapping_sub(f) <= 0
366 }
367
368 /// Returns the number of tasks in the deque.
369 ///
370 /// ```
371 /// use crossbeam_deque::Worker;
372 ///
373 /// let w = Worker::new_lifo();
374 ///
375 /// assert_eq!(w.len(), 0);
376 /// w.push(1);
377 /// assert_eq!(w.len(), 1);
378 /// w.push(1);
379 /// assert_eq!(w.len(), 2);
380 /// ```
381 pub fn len(&self) -> usize {
382 let b = self.inner.back.load(Ordering::Relaxed);
383 let f = self.inner.front.load(Ordering::SeqCst);
384 b.wrapping_sub(f).max(0) as usize
385 }
386
387 /// Pushes a task into the queue.
388 ///
389 /// # Examples
390 ///
391 /// ```
392 /// use crossbeam_deque::Worker;
393 ///
394 /// let w = Worker::new_lifo();
395 /// w.push(1);
396 /// w.push(2);
397 /// ```
398 fn push_core(&self, task: T) -> bool {
399 // Load the back index, front index, and buffer.
400 let b = self.inner.back.load(Ordering::Relaxed);
401 let f = self.inner.front.load(Ordering::Acquire);
402 let mut buffer = self.buffer.get();
403
404 // Calculate the length of the queue.
405 let len = b.wrapping_sub(f);
406 let was_empty = len <= 0;
407
408 // Is the queue full?
409 if len >= buffer.cap as isize {
410 // Yes. Grow the underlying buffer.
411 unsafe {
412 self.resize(2 * buffer.cap);
413 }
414 buffer = self.buffer.get();
415 }
416
417 // Write `task` into the slot.
418 unsafe {
419 buffer.write(b, MaybeUninit::new(task));
420 }
421
422 atomic::fence(Ordering::Release);
423
424 // Increment the back index.
425 //
426 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
427 // races because it doesn't understand fences.
428 self.inner.back.store(b.wrapping_add(1), Ordering::Release);
429
430 was_empty
431 }
432
433 pub fn push(&self, task: T) {
434 let _ = self.push_core(task);
435 }
436
437 pub fn push_with_status(&self, task: T) -> bool {
438 self.push_core(task)
439 }
440
441 /// Pops a task from the queue.
442 ///
443 /// # Examples
444 ///
445 /// ```
446 /// use crossbeam_deque::Worker;
447 ///
448 /// let w = Worker::new_fifo();
449 /// w.push(1);
450 /// w.push(2);
451 ///
452 /// assert_eq!(w.pop(), Some(1));
453 /// assert_eq!(w.pop(), Some(2));
454 /// assert_eq!(w.pop(), None);
455 /// ```
456 fn pop_core(&self) -> (Option<T>, bool) {
457 // Load the back and front index.
458 let b = self.inner.back.load(Ordering::Relaxed);
459 let f = self.inner.front.load(Ordering::Relaxed);
460
461 // Calculate the length of the queue.
462 let len = b.wrapping_sub(f);
463
464 // Is the queue empty?
465 if len <= 0 {
466 return (None, true);
467 }
468
469 match self.flavor {
470 // Pop from the front of the queue.
471 Flavor::Fifo => {
472 // Try incrementing the front index to pop the task.
473 let f = self.inner.front.fetch_add(1, Ordering::SeqCst);
474 let new_f = f.wrapping_add(1);
475
476 if b.wrapping_sub(new_f) < 0 {
477 self.inner.front.store(f, Ordering::Relaxed);
478 return (None, true);
479 }
480
481 unsafe {
482 // Read the popped task.
483 let buffer = self.buffer.get();
484 let task = buffer.read(f).assume_init();
485
486 // Shrink the buffer if `len - 1` is less than one fourth of the capacity.
487 if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
488 self.resize(buffer.cap / 2);
489 }
490
491 (Some(task), b.wrapping_sub(new_f) <= 0)
492 }
493 }
494
495 // Pop from the back of the queue.
496 Flavor::Lifo => {
497 // Decrement the back index.
498 let b = b.wrapping_sub(1);
499 self.inner.back.store(b, Ordering::Relaxed);
500
501 atomic::fence(Ordering::SeqCst);
502
503 // Load the front index.
504 let f = self.inner.front.load(Ordering::Relaxed);
505
506 // Compute the length after the back index was decremented.
507 let len = b.wrapping_sub(f);
508
509 if len < 0 {
510 // The queue is empty. Restore the back index to the original task.
511 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
512 (None, true)
513 } else {
514 // Read the task to be popped.
515 let buffer = self.buffer.get();
516 let mut task = unsafe { Some(buffer.read(b)) };
517 let mut was_last = false;
518
519 // Are we popping the last task from the queue?
520 if len == 0 {
521 // Try incrementing the front index.
522 if self
523 .inner
524 .front
525 .compare_exchange(
526 f,
527 f.wrapping_add(1),
528 Ordering::SeqCst,
529 Ordering::Relaxed,
530 )
531 .is_err()
532 {
533 // Failed. We didn't pop anything. Reset to `None`.
534 task.take();
535 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
536 return (None, false);
537 }
538
539 // Restore the back index to the original task.
540 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
541 was_last = true;
542 } else {
543 // Shrink the buffer if `len` is less than one fourth of the capacity.
544 if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 {
545 unsafe {
546 self.resize(buffer.cap / 2);
547 }
548 }
549 }
550
551 let result = task.take().map(|t| unsafe { t.assume_init() });
552 let was_last = was_last && result.is_some();
553 (result, was_last)
554 }
555 }
556 }
557 }
558
559 pub fn pop(&self) -> Option<T> {
560 self.pop_core().0
561 }
562
563 pub fn pop_with_status(&self) -> (Option<T>, bool) {
564 self.pop_core()
565 }
566}
567
568impl<T> fmt::Debug for Worker<T> {
569 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
570 f.pad("Worker { .. }")
571 }
572}
573
574/// A stealer handle of a worker queue.
575///
576/// Stealers can be shared among threads.
577///
578/// Task schedulers typically have a single worker queue per worker thread.
579///
580/// # Examples
581///
582/// ```
583/// use crossbeam_deque::{Steal, Worker};
584///
585/// let w = Worker::new_lifo();
586/// w.push(1);
587/// w.push(2);
588///
589/// let s = w.stealer();
590/// assert_eq!(s.steal(), Steal::Success(1));
591/// assert_eq!(s.steal(), Steal::Success(2));
592/// assert_eq!(s.steal(), Steal::Empty);
593/// ```
594pub struct Stealer<T> {
595 /// A reference to the inner representation of the queue.
596 inner: Arc<CachePadded<Inner<T>>>,
597
598 /// The flavor of the queue.
599 flavor: Flavor,
600}
601
602unsafe impl<T: Send> Send for Stealer<T> {}
603unsafe impl<T: Send> Sync for Stealer<T> {}
604
605impl<T> Stealer<T> {
606 /// Returns `true` if the queue is empty.
607 ///
608 /// ```
609 /// use crossbeam_deque::Worker;
610 ///
611 /// let w = Worker::new_lifo();
612 /// let s = w.stealer();
613 ///
614 /// assert!(s.is_empty());
615 /// w.push(1);
616 /// assert!(!s.is_empty());
617 /// ```
618 pub fn is_empty(&self) -> bool {
619 let f = self.inner.front.load(Ordering::Acquire);
620 atomic::fence(Ordering::SeqCst);
621 let b = self.inner.back.load(Ordering::Acquire);
622 b.wrapping_sub(f) <= 0
623 }
624
625 /// Returns the number of tasks in the deque.
626 ///
627 /// ```
628 /// use crossbeam_deque::Worker;
629 ///
630 /// let w = Worker::new_lifo();
631 /// let s = w.stealer();
632 ///
633 /// assert_eq!(s.len(), 0);
634 /// w.push(1);
635 /// assert_eq!(s.len(), 1);
636 /// w.push(2);
637 /// assert_eq!(s.len(), 2);
638 /// ```
639 pub fn len(&self) -> usize {
640 let f = self.inner.front.load(Ordering::Acquire);
641 atomic::fence(Ordering::SeqCst);
642 let b = self.inner.back.load(Ordering::Acquire);
643 b.wrapping_sub(f).max(0) as usize
644 }
645
646 /// Steals a task from the queue.
647 ///
648 /// # Examples
649 ///
650 /// ```
651 /// use crossbeam_deque::{Steal, Worker};
652 ///
653 /// let w = Worker::new_lifo();
654 /// w.push(1);
655 /// w.push(2);
656 ///
657 /// let s = w.stealer();
658 /// assert_eq!(s.steal(), Steal::Success(1));
659 /// assert_eq!(s.steal(), Steal::Success(2));
660 /// ```
661 fn steal_core(&self) -> StealStatus<T> {
662 // Load the front index.
663 let f = self.inner.front.load(Ordering::Acquire);
664
665 // A SeqCst fence is needed here.
666 //
667 // If the current thread is already pinned (reentrantly), we must manually issue the
668 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
669 // have to.
670 if epoch::is_pinned() {
671 atomic::fence(Ordering::SeqCst);
672 }
673
674 let guard = &epoch::pin();
675
676 // Load the back index.
677 let b = self.inner.back.load(Ordering::Acquire);
678
679 // Is the queue empty?
680 if b.wrapping_sub(f) <= 0 {
681 return StealStatus::Empty;
682 }
683
684 // Load the buffer and read the task at the front.
685 let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
686 let task = unsafe { buffer.deref().read(f) };
687
688 // Try incrementing the front index to steal the task.
689 // If the buffer has been swapped or the increment fails, we retry.
690 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
691 || self
692 .inner
693 .front
694 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
695 .is_err()
696 {
697 // We didn't steal this task, forget it.
698 return StealStatus::Retry;
699 }
700
701 let was_last = b.wrapping_sub(f.wrapping_add(1)) <= 0;
702 StealStatus::Success {
703 task: unsafe { task.assume_init() },
704 was_last,
705 }
706 }
707
708 pub fn steal(&self) -> Steal<T> {
709 match self.steal_core() {
710 StealStatus::Empty => Steal::Empty,
711 StealStatus::Retry => Steal::Retry,
712 StealStatus::Success { task, .. } => Steal::Success(task),
713 }
714 }
715
716 pub fn steal_with_status(&self) -> StealStatus<T> {
717 self.steal_core()
718 }
719
720 /// Steals a batch of tasks and pushes them into another worker.
721 ///
722 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
723 /// steal around half of the tasks in the queue, but also not more than some constant limit.
724 ///
725 /// # Examples
726 ///
727 /// ```
728 /// use crossbeam_deque::Worker;
729 ///
730 /// let w1 = Worker::new_fifo();
731 /// w1.push(1);
732 /// w1.push(2);
733 /// w1.push(3);
734 /// w1.push(4);
735 ///
736 /// let s = w1.stealer();
737 /// let w2 = Worker::new_fifo();
738 ///
739 /// let _ = s.steal_batch(&w2);
740 /// assert_eq!(w2.pop(), Some(1));
741 /// assert_eq!(w2.pop(), Some(2));
742 /// ```
743 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
744 self.steal_batch_with_limit(dest, MAX_BATCH)
745 }
746
747 /// Steals no more than `limit` of tasks and pushes them into another worker.
748 ///
749 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
750 /// steal around half of the tasks in the queue, but also not more than the given limit.
751 ///
752 /// # Examples
753 ///
754 /// ```
755 /// use crossbeam_deque::Worker;
756 ///
757 /// let w1 = Worker::new_fifo();
758 /// w1.push(1);
759 /// w1.push(2);
760 /// w1.push(3);
761 /// w1.push(4);
762 /// w1.push(5);
763 /// w1.push(6);
764 ///
765 /// let s = w1.stealer();
766 /// let w2 = Worker::new_fifo();
767 ///
768 /// let _ = s.steal_batch_with_limit(&w2, 2);
769 /// assert_eq!(w2.pop(), Some(1));
770 /// assert_eq!(w2.pop(), Some(2));
771 /// assert_eq!(w2.pop(), None);
772 ///
773 /// w1.push(7);
774 /// w1.push(8);
775 /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
776 /// // half of the elements are currently popped, but the number of popped elements is considered
777 /// // an implementation detail that may be changed in the future.
778 /// let _ = s.steal_batch_with_limit(&w2, std::usize::MAX);
779 /// assert_eq!(w2.len(), 3);
780 /// ```
781 pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
782 assert!(limit > 0);
783 if Arc::ptr_eq(&self.inner, &dest.inner) {
784 if dest.is_empty() {
785 return Steal::Empty;
786 } else {
787 return Steal::Success(());
788 }
789 }
790
791 // Load the front index.
792 let mut f = self.inner.front.load(Ordering::Acquire);
793
794 // A SeqCst fence is needed here.
795 //
796 // If the current thread is already pinned (reentrantly), we must manually issue the
797 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
798 // have to.
799 if epoch::is_pinned() {
800 atomic::fence(Ordering::SeqCst);
801 }
802
803 let guard = &epoch::pin();
804
805 // Load the back index.
806 let b = self.inner.back.load(Ordering::Acquire);
807
808 // Is the queue empty?
809 let len = b.wrapping_sub(f);
810 if len <= 0 {
811 return Steal::Empty;
812 }
813
814 // Reserve capacity for the stolen batch.
815 let batch_size = cmp::min((len as usize + 1) / 2, limit);
816 dest.reserve(batch_size);
817 let mut batch_size = batch_size as isize;
818
819 // Get the destination buffer and back index.
820 let dest_buffer = dest.buffer.get();
821 let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
822
823 // Load the buffer.
824 let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
825
826 match self.flavor {
827 // Steal a batch of tasks from the front at once.
828 Flavor::Fifo => {
829 // Copy the batch from the source to the destination buffer.
830 match dest.flavor {
831 Flavor::Fifo => {
832 for i in 0..batch_size {
833 unsafe {
834 let task = buffer.deref().read(f.wrapping_add(i));
835 dest_buffer.write(dest_b.wrapping_add(i), task);
836 }
837 }
838 }
839 Flavor::Lifo => {
840 for i in 0..batch_size {
841 unsafe {
842 let task = buffer.deref().read(f.wrapping_add(i));
843 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
844 }
845 }
846 }
847 }
848
849 // Try incrementing the front index to steal the batch.
850 // If the buffer has been swapped or the increment fails, we retry.
851 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
852 || self
853 .inner
854 .front
855 .compare_exchange(
856 f,
857 f.wrapping_add(batch_size),
858 Ordering::SeqCst,
859 Ordering::Relaxed,
860 )
861 .is_err()
862 {
863 return Steal::Retry;
864 }
865
866 dest_b = dest_b.wrapping_add(batch_size);
867 }
868
869 // Steal a batch of tasks from the front one by one.
870 Flavor::Lifo => {
871 // This loop may modify the batch_size, which triggers a clippy lint warning.
872 // Use a new variable to avoid the warning, and to make it clear we aren't
873 // modifying the loop exit condition during iteration.
874 let original_batch_size = batch_size;
875
876 for i in 0..original_batch_size {
877 // If this is not the first steal, check whether the queue is empty.
878 if i > 0 {
879 // We've already got the current front index. Now execute the fence to
880 // synchronize with other threads.
881 atomic::fence(Ordering::SeqCst);
882
883 // Load the back index.
884 let b = self.inner.back.load(Ordering::Acquire);
885
886 // Is the queue empty?
887 if b.wrapping_sub(f) <= 0 {
888 batch_size = i;
889 break;
890 }
891 }
892
893 // Read the task at the front.
894 let task = unsafe { buffer.deref().read(f) };
895
896 // Try incrementing the front index to steal the task.
897 // If the buffer has been swapped or the increment fails, we retry.
898 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
899 || self
900 .inner
901 .front
902 .compare_exchange(
903 f,
904 f.wrapping_add(1),
905 Ordering::SeqCst,
906 Ordering::Relaxed,
907 )
908 .is_err()
909 {
910 // We didn't steal this task, forget it and break from the loop.
911 batch_size = i;
912 break;
913 }
914
915 // Write the stolen task into the destination buffer.
916 unsafe {
917 dest_buffer.write(dest_b, task);
918 }
919
920 // Move the source front index and the destination back index one step forward.
921 f = f.wrapping_add(1);
922 dest_b = dest_b.wrapping_add(1);
923 }
924
925 // If we didn't steal anything, the operation needs to be retried.
926 if batch_size == 0 {
927 return Steal::Retry;
928 }
929
930 // If stealing into a FIFO queue, stolen tasks need to be reversed.
931 if dest.flavor == Flavor::Fifo {
932 for i in 0..batch_size / 2 {
933 unsafe {
934 let i1 = dest_b.wrapping_sub(batch_size - i);
935 let i2 = dest_b.wrapping_sub(i + 1);
936 let t1 = dest_buffer.read(i1);
937 let t2 = dest_buffer.read(i2);
938 dest_buffer.write(i1, t2);
939 dest_buffer.write(i2, t1);
940 }
941 }
942 }
943 }
944 }
945
946 atomic::fence(Ordering::Release);
947
948 // Update the back index in the destination queue.
949 //
950 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
951 // races because it doesn't understand fences.
952 dest.inner.back.store(dest_b, Ordering::Release);
953
954 // Return with success.
955 Steal::Success(())
956 }
957
958 /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker.
959 ///
960 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
961 /// steal around half of the tasks in the queue, but also not more than some constant limit.
962 ///
963 /// # Examples
964 ///
965 /// ```
966 /// use crossbeam_deque::{Steal, Worker};
967 ///
968 /// let w1 = Worker::new_fifo();
969 /// w1.push(1);
970 /// w1.push(2);
971 /// w1.push(3);
972 /// w1.push(4);
973 ///
974 /// let s = w1.stealer();
975 /// let w2 = Worker::new_fifo();
976 ///
977 /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1));
978 /// assert_eq!(w2.pop(), Some(2));
979 /// ```
980 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
981 self.steal_batch_with_limit_and_pop(dest, MAX_BATCH)
982 }
983
984 /// Steals no more than `limit` of tasks, pushes them into another worker, and pops a task from
985 /// that worker.
986 ///
987 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
988 /// steal around half of the tasks in the queue, but also not more than the given limit.
989 ///
990 /// # Examples
991 ///
992 /// ```
993 /// use crossbeam_deque::{Steal, Worker};
994 ///
995 /// let w1 = Worker::new_fifo();
996 /// w1.push(1);
997 /// w1.push(2);
998 /// w1.push(3);
999 /// w1.push(4);
1000 /// w1.push(5);
1001 /// w1.push(6);
1002 ///
1003 /// let s = w1.stealer();
1004 /// let w2 = Worker::new_fifo();
1005 ///
1006 /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, 2), Steal::Success(1));
1007 /// assert_eq!(w2.pop(), Some(2));
1008 /// assert_eq!(w2.pop(), None);
1009 ///
1010 /// w1.push(7);
1011 /// w1.push(8);
1012 /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1013 /// // half of the elements are currently popped, but the number of popped elements is considered
1014 /// // an implementation detail that may be changed in the future.
1015 /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, std::usize::MAX), Steal::Success(3));
1016 /// assert_eq!(w2.pop(), Some(4));
1017 /// assert_eq!(w2.pop(), Some(5));
1018 /// assert_eq!(w2.pop(), None);
1019 /// ```
1020 pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
1021 assert!(limit > 0);
1022 if Arc::ptr_eq(&self.inner, &dest.inner) {
1023 match dest.pop() {
1024 None => return Steal::Empty,
1025 Some(task) => return Steal::Success(task),
1026 }
1027 }
1028
1029 // Load the front index.
1030 let mut f = self.inner.front.load(Ordering::Acquire);
1031
1032 // A SeqCst fence is needed here.
1033 //
1034 // If the current thread is already pinned (reentrantly), we must manually issue the
1035 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
1036 // have to.
1037 if epoch::is_pinned() {
1038 atomic::fence(Ordering::SeqCst);
1039 }
1040
1041 let guard = &epoch::pin();
1042
1043 // Load the back index.
1044 let b = self.inner.back.load(Ordering::Acquire);
1045
1046 // Is the queue empty?
1047 let len = b.wrapping_sub(f);
1048 if len <= 0 {
1049 return Steal::Empty;
1050 }
1051
1052 // Reserve capacity for the stolen batch.
1053 let batch_size = cmp::min((len as usize - 1) / 2, limit - 1);
1054 dest.reserve(batch_size);
1055 let mut batch_size = batch_size as isize;
1056
1057 // Get the destination buffer and back index.
1058 let dest_buffer = dest.buffer.get();
1059 let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
1060
1061 // Load the buffer
1062 let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
1063
1064 // Read the task at the front.
1065 let mut task = unsafe { buffer.deref().read(f) };
1066
1067 match self.flavor {
1068 // Steal a batch of tasks from the front at once.
1069 Flavor::Fifo => {
1070 // Copy the batch from the source to the destination buffer.
1071 match dest.flavor {
1072 Flavor::Fifo => {
1073 for i in 0..batch_size {
1074 unsafe {
1075 let task = buffer.deref().read(f.wrapping_add(i + 1));
1076 dest_buffer.write(dest_b.wrapping_add(i), task);
1077 }
1078 }
1079 }
1080 Flavor::Lifo => {
1081 for i in 0..batch_size {
1082 unsafe {
1083 let task = buffer.deref().read(f.wrapping_add(i + 1));
1084 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
1085 }
1086 }
1087 }
1088 }
1089
1090 // Try incrementing the front index to steal the task.
1091 // If the buffer has been swapped or the increment fails, we retry.
1092 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1093 || self
1094 .inner
1095 .front
1096 .compare_exchange(
1097 f,
1098 f.wrapping_add(batch_size + 1),
1099 Ordering::SeqCst,
1100 Ordering::Relaxed,
1101 )
1102 .is_err()
1103 {
1104 // We didn't steal this task, forget it.
1105 return Steal::Retry;
1106 }
1107
1108 dest_b = dest_b.wrapping_add(batch_size);
1109 }
1110
1111 // Steal a batch of tasks from the front one by one.
1112 Flavor::Lifo => {
1113 // Try incrementing the front index to steal the task.
1114 if self
1115 .inner
1116 .front
1117 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
1118 .is_err()
1119 {
1120 // We didn't steal this task, forget it.
1121 return Steal::Retry;
1122 }
1123
1124 // Move the front index one step forward.
1125 f = f.wrapping_add(1);
1126
1127 // Repeat the same procedure for the batch steals.
1128 //
1129 // This loop may modify the batch_size, which triggers a clippy lint warning.
1130 // Use a new variable to avoid the warning, and to make it clear we aren't
1131 // modifying the loop exit condition during iteration.
1132 let original_batch_size = batch_size;
1133 for i in 0..original_batch_size {
1134 // We've already got the current front index. Now execute the fence to
1135 // synchronize with other threads.
1136 atomic::fence(Ordering::SeqCst);
1137
1138 // Load the back index.
1139 let b = self.inner.back.load(Ordering::Acquire);
1140
1141 // Is the queue empty?
1142 if b.wrapping_sub(f) <= 0 {
1143 batch_size = i;
1144 break;
1145 }
1146
1147 // Read the task at the front.
1148 let tmp = unsafe { buffer.deref().read(f) };
1149
1150 // Try incrementing the front index to steal the task.
1151 // If the buffer has been swapped or the increment fails, we retry.
1152 if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1153 || self
1154 .inner
1155 .front
1156 .compare_exchange(
1157 f,
1158 f.wrapping_add(1),
1159 Ordering::SeqCst,
1160 Ordering::Relaxed,
1161 )
1162 .is_err()
1163 {
1164 // We didn't steal this task, forget it and break from the loop.
1165 batch_size = i;
1166 break;
1167 }
1168
1169 // Write the previously stolen task into the destination buffer.
1170 unsafe {
1171 dest_buffer.write(dest_b, mem::replace(&mut task, tmp));
1172 }
1173
1174 // Move the source front index and the destination back index one step forward.
1175 f = f.wrapping_add(1);
1176 dest_b = dest_b.wrapping_add(1);
1177 }
1178
1179 // If stealing into a FIFO queue, stolen tasks need to be reversed.
1180 if dest.flavor == Flavor::Fifo {
1181 for i in 0..batch_size / 2 {
1182 unsafe {
1183 let i1 = dest_b.wrapping_sub(batch_size - i);
1184 let i2 = dest_b.wrapping_sub(i + 1);
1185 let t1 = dest_buffer.read(i1);
1186 let t2 = dest_buffer.read(i2);
1187 dest_buffer.write(i1, t2);
1188 dest_buffer.write(i2, t1);
1189 }
1190 }
1191 }
1192 }
1193 }
1194
1195 atomic::fence(Ordering::Release);
1196
1197 // Update the back index in the destination queue.
1198 //
1199 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
1200 // races because it doesn't understand fences.
1201 dest.inner.back.store(dest_b, Ordering::Release);
1202
1203 // Return with success.
1204 Steal::Success(unsafe { task.assume_init() })
1205 }
1206}
1207
1208impl<T> Clone for Stealer<T> {
1209 fn clone(&self) -> Stealer<T> {
1210 Stealer {
1211 inner: self.inner.clone(),
1212 flavor: self.flavor,
1213 }
1214 }
1215}
1216
1217impl<T> fmt::Debug for Stealer<T> {
1218 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1219 f.pad("Stealer { .. }")
1220 }
1221}
1222
1223// Bits indicating the state of a slot:
1224// * If a task has been written into the slot, `WRITE` is set.
1225// * If a task has been read from the slot, `READ` is set.
1226// * If the block is being destroyed, `DESTROY` is set.
1227const WRITE: usize = 1;
1228const READ: usize = 2;
1229const DESTROY: usize = 4;
1230
1231// Each block covers one "lap" of indices.
1232const LAP: usize = 64;
1233// The maximum number of values a block can hold.
1234const BLOCK_CAP: usize = LAP - 1;
1235// How many lower bits are reserved for metadata.
1236const SHIFT: usize = 1;
1237// Indicates that the block is not the last one.
1238const HAS_NEXT: usize = 1;
1239
1240/// A slot in a block.
1241struct Slot<T> {
1242 /// The task.
1243 task: UnsafeCell<MaybeUninit<T>>,
1244
1245 /// The state of the slot.
1246 state: AtomicUsize,
1247}
1248
1249impl<T> Slot<T> {
1250 /// Waits until a task is written into the slot.
1251 fn wait_write(&self) {
1252 let backoff = Backoff::new();
1253 while self.state.load(Ordering::Acquire) & WRITE == 0 {
1254 backoff.snooze();
1255 }
1256 }
1257}
1258
1259/// A block in a linked list.
1260///
1261/// Each block in the list can hold up to `BLOCK_CAP` values.
1262struct Block<T> {
1263 /// The next block in the linked list.
1264 next: AtomicPtr<Block<T>>,
1265
1266 /// Slots for values.
1267 slots: [Slot<T>; BLOCK_CAP],
1268}
1269
1270impl<T> Block<T> {
1271 const LAYOUT: Layout = {
1272 let layout = Layout::new::<Self>();
1273 assert!(
1274 layout.size() != 0,
1275 "Block should never be zero-sized, as it has an AtomicPtr field"
1276 );
1277 layout
1278 };
1279
1280 /// Creates an empty block.
1281 fn new() -> Box<Self> {
1282 // SAFETY: layout is not zero-sized
1283 let ptr = unsafe { alloc_zeroed(Self::LAYOUT) };
1284 // Handle allocation failure
1285 if ptr.is_null() {
1286 handle_alloc_error(Self::LAYOUT)
1287 }
1288 // SAFETY: This is safe because:
1289 // [1] `Block::next` (AtomicPtr) may be safely zero initialized.
1290 // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
1291 // [3] `Slot::task` (UnsafeCell) may be safely zero initialized because it
1292 // holds a MaybeUninit.
1293 // [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
1294 // TODO: unsafe { Box::new_zeroed().assume_init() }
1295 unsafe { Box::from_raw(ptr.cast()) }
1296 }
1297
1298 /// Waits until the next pointer is set.
1299 fn wait_next(&self) -> *mut Block<T> {
1300 let backoff = Backoff::new();
1301 loop {
1302 let next = self.next.load(Ordering::Acquire);
1303 if !next.is_null() {
1304 return next;
1305 }
1306 backoff.snooze();
1307 }
1308 }
1309
1310 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
1311 unsafe fn destroy(this: *mut Block<T>, count: usize) {
1312 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
1313 // begun destruction of the block.
1314 for i in (0..count).rev() {
1315 let slot = unsafe { (*this).slots.get_unchecked(i) };
1316
1317 // Mark the `DESTROY` bit if a thread is still using the slot.
1318 if slot.state.load(Ordering::Acquire) & READ == 0
1319 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
1320 {
1321 // If a thread is still using the slot, it will continue destruction of the block.
1322 return;
1323 }
1324 }
1325
1326 // No thread is using the block, now it is safe to destroy it.
1327 drop(unsafe { Box::from_raw(this) });
1328 }
1329}
1330
1331/// A position in a queue.
1332struct Position<T> {
1333 /// The index in the queue.
1334 index: AtomicUsize,
1335
1336 /// The block in the linked list.
1337 block: AtomicPtr<Block<T>>,
1338}
1339
1340/// An injector queue.
1341///
1342/// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have
1343/// a single injector queue, which is the entry point for new tasks.
1344///
1345/// # Examples
1346///
1347/// ```
1348/// use crossbeam_deque::{Injector, Steal};
1349///
1350/// let q = Injector::new();
1351/// q.push(1);
1352/// q.push(2);
1353///
1354/// assert_eq!(q.steal(), Steal::Success(1));
1355/// assert_eq!(q.steal(), Steal::Success(2));
1356/// assert_eq!(q.steal(), Steal::Empty);
1357/// ```
1358pub struct Injector<T> {
1359 /// The head of the queue.
1360 head: CachePadded<Position<T>>,
1361
1362 /// The tail of the queue.
1363 tail: CachePadded<Position<T>>,
1364
1365 /// Indicates that dropping a `Injector<T>` may drop values of type `T`.
1366 _marker: PhantomData<T>,
1367}
1368
1369unsafe impl<T: Send> Send for Injector<T> {}
1370unsafe impl<T: Send> Sync for Injector<T> {}
1371
1372impl<T> Default for Injector<T> {
1373 fn default() -> Self {
1374 let block = Box::into_raw(Block::<T>::new());
1375 Self {
1376 head: CachePadded::new(Position {
1377 block: AtomicPtr::new(block),
1378 index: AtomicUsize::new(0),
1379 }),
1380 tail: CachePadded::new(Position {
1381 block: AtomicPtr::new(block),
1382 index: AtomicUsize::new(0),
1383 }),
1384 _marker: PhantomData,
1385 }
1386 }
1387}
1388
1389impl<T> Injector<T> {
1390 /// Creates a new injector queue.
1391 ///
1392 /// # Examples
1393 ///
1394 /// ```
1395 /// use crossbeam_deque::Injector;
1396 ///
1397 /// let q = Injector::<i32>::new();
1398 /// ```
1399 pub fn new() -> Injector<T> {
1400 Self::default()
1401 }
1402
1403 /// Pushes a task into the queue.
1404 ///
1405 /// # Examples
1406 ///
1407 /// ```
1408 /// use crossbeam_deque::Injector;
1409 ///
1410 /// let w = Injector::new();
1411 /// w.push(1);
1412 /// w.push(2);
1413 /// ```
1414 pub fn push(&self, task: T) {
1415 let backoff = Backoff::new();
1416 let mut tail = self.tail.index.load(Ordering::Acquire);
1417 let mut block = self.tail.block.load(Ordering::Acquire);
1418 let mut next_block = None;
1419
1420 loop {
1421 // Calculate the offset of the index into the block.
1422 let offset = (tail >> SHIFT) % LAP;
1423
1424 // If we reached the end of the block, wait until the next one is installed.
1425 if offset == BLOCK_CAP {
1426 backoff.snooze();
1427 tail = self.tail.index.load(Ordering::Acquire);
1428 block = self.tail.block.load(Ordering::Acquire);
1429 continue;
1430 }
1431
1432 // If we're going to have to install the next block, allocate it in advance in order to
1433 // make the wait for other threads as short as possible.
1434 if offset + 1 == BLOCK_CAP && next_block.is_none() {
1435 next_block = Some(Block::<T>::new());
1436 }
1437
1438 let new_tail = tail + (1 << SHIFT);
1439
1440 // Try advancing the tail forward.
1441 match self.tail.index.compare_exchange_weak(
1442 tail,
1443 new_tail,
1444 Ordering::SeqCst,
1445 Ordering::Acquire,
1446 ) {
1447 Ok(_) => unsafe {
1448 // If we've reached the end of the block, install the next one.
1449 if offset + 1 == BLOCK_CAP {
1450 let next_block = Box::into_raw(next_block.unwrap());
1451 let next_index = new_tail.wrapping_add(1 << SHIFT);
1452
1453 self.tail.block.store(next_block, Ordering::Release);
1454 self.tail.index.store(next_index, Ordering::Release);
1455 (*block).next.store(next_block, Ordering::Release);
1456 }
1457
1458 // Write the task into the slot.
1459 let slot = (*block).slots.get_unchecked(offset);
1460 slot.task.get().write(MaybeUninit::new(task));
1461 slot.state.fetch_or(WRITE, Ordering::Release);
1462
1463 return;
1464 },
1465 Err(t) => {
1466 tail = t;
1467 block = self.tail.block.load(Ordering::Acquire);
1468 backoff.spin();
1469 }
1470 }
1471 }
1472 }
1473
1474 /// Steals a task from the queue.
1475 ///
1476 /// # Examples
1477 ///
1478 /// ```
1479 /// use crossbeam_deque::{Injector, Steal};
1480 ///
1481 /// let q = Injector::new();
1482 /// q.push(1);
1483 /// q.push(2);
1484 ///
1485 /// assert_eq!(q.steal(), Steal::Success(1));
1486 /// assert_eq!(q.steal(), Steal::Success(2));
1487 /// assert_eq!(q.steal(), Steal::Empty);
1488 /// ```
1489 pub fn steal(&self) -> Steal<T> {
1490 let mut head;
1491 let mut block;
1492 let mut offset;
1493
1494 let backoff = Backoff::new();
1495 loop {
1496 head = self.head.index.load(Ordering::Acquire);
1497 block = self.head.block.load(Ordering::Acquire);
1498
1499 // Calculate the offset of the index into the block.
1500 offset = (head >> SHIFT) % LAP;
1501
1502 // If we reached the end of the block, wait until the next one is installed.
1503 if offset == BLOCK_CAP {
1504 backoff.snooze();
1505 } else {
1506 break;
1507 }
1508 }
1509
1510 let mut new_head = head + (1 << SHIFT);
1511
1512 if new_head & HAS_NEXT == 0 {
1513 atomic::fence(Ordering::SeqCst);
1514 let tail = self.tail.index.load(Ordering::Relaxed);
1515
1516 // If the tail equals the head, that means the queue is empty.
1517 if head >> SHIFT == tail >> SHIFT {
1518 return Steal::Empty;
1519 }
1520
1521 // If head and tail are not in the same block, set `HAS_NEXT` in head.
1522 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1523 new_head |= HAS_NEXT;
1524 }
1525 }
1526
1527 // Try moving the head index forward.
1528 if self
1529 .head
1530 .index
1531 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1532 .is_err()
1533 {
1534 return Steal::Retry;
1535 }
1536
1537 unsafe {
1538 // If we've reached the end of the block, move to the next one.
1539 if offset + 1 == BLOCK_CAP {
1540 let next = (*block).wait_next();
1541 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1542 if !(*next).next.load(Ordering::Relaxed).is_null() {
1543 next_index |= HAS_NEXT;
1544 }
1545
1546 self.head.block.store(next, Ordering::Release);
1547 self.head.index.store(next_index, Ordering::Release);
1548 }
1549
1550 // Read the task.
1551 let slot = (*block).slots.get_unchecked(offset);
1552 slot.wait_write();
1553 let task = slot.task.get().read().assume_init();
1554
1555 // Destroy the block if we've reached the end, or if another thread wanted to destroy
1556 // but couldn't because we were busy reading from the slot.
1557 if (offset + 1 == BLOCK_CAP)
1558 || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0)
1559 {
1560 Block::destroy(block, offset);
1561 }
1562
1563 Steal::Success(task)
1564 }
1565 }
1566
1567 /// Steals a batch of tasks and pushes them into a worker.
1568 ///
1569 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1570 /// steal around half of the tasks in the queue, but also not more than some constant limit.
1571 ///
1572 /// # Examples
1573 ///
1574 /// ```
1575 /// use crossbeam_deque::{Injector, Worker};
1576 ///
1577 /// let q = Injector::new();
1578 /// q.push(1);
1579 /// q.push(2);
1580 /// q.push(3);
1581 /// q.push(4);
1582 ///
1583 /// let w = Worker::new_fifo();
1584 /// let _ = q.steal_batch(&w);
1585 /// assert_eq!(w.pop(), Some(1));
1586 /// assert_eq!(w.pop(), Some(2));
1587 /// ```
1588 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
1589 self.steal_batch_with_limit(dest, MAX_BATCH)
1590 }
1591
1592 /// Steals no more than of tasks and pushes them into a worker.
1593 ///
1594 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1595 /// steal around half of the tasks in the queue, but also not more than some constant limit.
1596 ///
1597 /// # Examples
1598 ///
1599 /// ```
1600 /// use crossbeam_deque::{Injector, Worker};
1601 ///
1602 /// let q = Injector::new();
1603 /// q.push(1);
1604 /// q.push(2);
1605 /// q.push(3);
1606 /// q.push(4);
1607 /// q.push(5);
1608 /// q.push(6);
1609 ///
1610 /// let w = Worker::new_fifo();
1611 /// let _ = q.steal_batch_with_limit(&w, 2);
1612 /// assert_eq!(w.pop(), Some(1));
1613 /// assert_eq!(w.pop(), Some(2));
1614 /// assert_eq!(w.pop(), None);
1615 ///
1616 /// q.push(7);
1617 /// q.push(8);
1618 /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1619 /// // half of the elements are currently popped, but the number of popped elements is considered
1620 /// // an implementation detail that may be changed in the future.
1621 /// let _ = q.steal_batch_with_limit(&w, std::usize::MAX);
1622 /// assert_eq!(w.len(), 3);
1623 /// ```
1624 pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
1625 assert!(limit > 0);
1626 let mut head;
1627 let mut block;
1628 let mut offset;
1629
1630 let backoff = Backoff::new();
1631 loop {
1632 head = self.head.index.load(Ordering::Acquire);
1633 block = self.head.block.load(Ordering::Acquire);
1634
1635 // Calculate the offset of the index into the block.
1636 offset = (head >> SHIFT) % LAP;
1637
1638 // If we reached the end of the block, wait until the next one is installed.
1639 if offset == BLOCK_CAP {
1640 backoff.snooze();
1641 } else {
1642 break;
1643 }
1644 }
1645
1646 let mut new_head = head;
1647 let advance;
1648
1649 if new_head & HAS_NEXT == 0 {
1650 atomic::fence(Ordering::SeqCst);
1651 let tail = self.tail.index.load(Ordering::Relaxed);
1652
1653 // If the tail equals the head, that means the queue is empty.
1654 if head >> SHIFT == tail >> SHIFT {
1655 return Steal::Empty;
1656 }
1657
1658 // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate
1659 // the right batch size to steal.
1660 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1661 new_head |= HAS_NEXT;
1662 // We can steal all tasks till the end of the block.
1663 advance = (BLOCK_CAP - offset).min(limit);
1664 } else {
1665 let len = (tail - head) >> SHIFT;
1666 // Steal half of the available tasks.
1667 advance = ((len + 1) / 2).min(limit);
1668 }
1669 } else {
1670 // We can steal all tasks till the end of the block.
1671 advance = (BLOCK_CAP - offset).min(limit);
1672 }
1673
1674 new_head += advance << SHIFT;
1675 let new_offset = offset + advance;
1676
1677 // Try moving the head index forward.
1678 if self
1679 .head
1680 .index
1681 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1682 .is_err()
1683 {
1684 return Steal::Retry;
1685 }
1686
1687 // Reserve capacity for the stolen batch.
1688 let batch_size = new_offset - offset;
1689 dest.reserve(batch_size);
1690
1691 // Get the destination buffer and back index.
1692 let dest_buffer = dest.buffer.get();
1693 let dest_b = dest.inner.back.load(Ordering::Relaxed);
1694
1695 unsafe {
1696 // If we've reached the end of the block, move to the next one.
1697 if new_offset == BLOCK_CAP {
1698 let next = (*block).wait_next();
1699 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1700 if !(*next).next.load(Ordering::Relaxed).is_null() {
1701 next_index |= HAS_NEXT;
1702 }
1703
1704 self.head.block.store(next, Ordering::Release);
1705 self.head.index.store(next_index, Ordering::Release);
1706 }
1707
1708 // Copy values from the injector into the destination queue.
1709 match dest.flavor {
1710 Flavor::Fifo => {
1711 for i in 0..batch_size {
1712 // Read the task.
1713 let slot = (*block).slots.get_unchecked(offset + i);
1714 slot.wait_write();
1715 let task = slot.task.get().read();
1716
1717 // Write it into the destination queue.
1718 dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1719 }
1720 }
1721
1722 Flavor::Lifo => {
1723 for i in 0..batch_size {
1724 // Read the task.
1725 let slot = (*block).slots.get_unchecked(offset + i);
1726 slot.wait_write();
1727 let task = slot.task.get().read();
1728
1729 // Write it into the destination queue.
1730 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1731 }
1732 }
1733 }
1734
1735 atomic::fence(Ordering::Release);
1736
1737 // Update the back index in the destination queue.
1738 //
1739 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1740 // data races because it doesn't understand fences.
1741 dest.inner
1742 .back
1743 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1744
1745 // Destroy the block if we've reached the end, or if another thread wanted to destroy
1746 // but couldn't because we were busy reading from the slot.
1747 if new_offset == BLOCK_CAP {
1748 Block::destroy(block, offset);
1749 } else {
1750 for i in offset..new_offset {
1751 let slot = (*block).slots.get_unchecked(i);
1752
1753 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1754 Block::destroy(block, offset);
1755 break;
1756 }
1757 }
1758 }
1759
1760 Steal::Success(())
1761 }
1762 }
1763
1764 /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker.
1765 ///
1766 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1767 /// steal around half of the tasks in the queue, but also not more than some constant limit.
1768 ///
1769 /// # Examples
1770 ///
1771 /// ```
1772 /// use crossbeam_deque::{Injector, Steal, Worker};
1773 ///
1774 /// let q = Injector::new();
1775 /// q.push(1);
1776 /// q.push(2);
1777 /// q.push(3);
1778 /// q.push(4);
1779 ///
1780 /// let w = Worker::new_fifo();
1781 /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1));
1782 /// assert_eq!(w.pop(), Some(2));
1783 /// ```
1784 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
1785 // TODO: we use `MAX_BATCH + 1` as the hard limit for Injecter as the performance is slightly
1786 // better, but we may change it in the future to be compatible with the same method in Stealer.
1787 self.steal_batch_with_limit_and_pop(dest, MAX_BATCH + 1)
1788 }
1789
1790 /// Steals no more than `limit` of tasks, pushes them into a worker, and pops a task from that worker.
1791 ///
1792 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1793 /// steal around half of the tasks in the queue, but also not more than the given limit.
1794 ///
1795 /// # Examples
1796 ///
1797 /// ```
1798 /// use crossbeam_deque::{Injector, Steal, Worker};
1799 ///
1800 /// let q = Injector::new();
1801 /// q.push(1);
1802 /// q.push(2);
1803 /// q.push(3);
1804 /// q.push(4);
1805 /// q.push(5);
1806 /// q.push(6);
1807 ///
1808 /// let w = Worker::new_fifo();
1809 /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, 2), Steal::Success(1));
1810 /// assert_eq!(w.pop(), Some(2));
1811 /// assert_eq!(w.pop(), None);
1812 ///
1813 /// q.push(7);
1814 /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1815 /// // half of the elements are currently popped, but the number of popped elements is considered
1816 /// // an implementation detail that may be changed in the future.
1817 /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, std::usize::MAX), Steal::Success(3));
1818 /// assert_eq!(w.pop(), Some(4));
1819 /// assert_eq!(w.pop(), Some(5));
1820 /// assert_eq!(w.pop(), None);
1821 /// ```
1822 pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
1823 assert!(limit > 0);
1824 let mut head;
1825 let mut block;
1826 let mut offset;
1827
1828 let backoff = Backoff::new();
1829 loop {
1830 head = self.head.index.load(Ordering::Acquire);
1831 block = self.head.block.load(Ordering::Acquire);
1832
1833 // Calculate the offset of the index into the block.
1834 offset = (head >> SHIFT) % LAP;
1835
1836 // If we reached the end of the block, wait until the next one is installed.
1837 if offset == BLOCK_CAP {
1838 backoff.snooze();
1839 } else {
1840 break;
1841 }
1842 }
1843
1844 let mut new_head = head;
1845 let advance;
1846
1847 if new_head & HAS_NEXT == 0 {
1848 atomic::fence(Ordering::SeqCst);
1849 let tail = self.tail.index.load(Ordering::Relaxed);
1850
1851 // If the tail equals the head, that means the queue is empty.
1852 if head >> SHIFT == tail >> SHIFT {
1853 return Steal::Empty;
1854 }
1855
1856 // If head and tail are not in the same block, set `HAS_NEXT` in head.
1857 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1858 new_head |= HAS_NEXT;
1859 // We can steal all tasks till the end of the block.
1860 advance = (BLOCK_CAP - offset).min(limit);
1861 } else {
1862 let len = (tail - head) >> SHIFT;
1863 // Steal half of the available tasks.
1864 advance = ((len + 1) / 2).min(limit);
1865 }
1866 } else {
1867 // We can steal all tasks till the end of the block.
1868 advance = (BLOCK_CAP - offset).min(limit);
1869 }
1870
1871 new_head += advance << SHIFT;
1872 let new_offset = offset + advance;
1873
1874 // Try moving the head index forward.
1875 if self
1876 .head
1877 .index
1878 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1879 .is_err()
1880 {
1881 return Steal::Retry;
1882 }
1883
1884 // Reserve capacity for the stolen batch.
1885 let batch_size = new_offset - offset - 1;
1886 dest.reserve(batch_size);
1887
1888 // Get the destination buffer and back index.
1889 let dest_buffer = dest.buffer.get();
1890 let dest_b = dest.inner.back.load(Ordering::Relaxed);
1891
1892 unsafe {
1893 // If we've reached the end of the block, move to the next one.
1894 if new_offset == BLOCK_CAP {
1895 let next = (*block).wait_next();
1896 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1897 if !(*next).next.load(Ordering::Relaxed).is_null() {
1898 next_index |= HAS_NEXT;
1899 }
1900
1901 self.head.block.store(next, Ordering::Release);
1902 self.head.index.store(next_index, Ordering::Release);
1903 }
1904
1905 // Read the task.
1906 let slot = (*block).slots.get_unchecked(offset);
1907 slot.wait_write();
1908 let task = slot.task.get().read();
1909
1910 match dest.flavor {
1911 Flavor::Fifo => {
1912 // Copy values from the injector into the destination queue.
1913 for i in 0..batch_size {
1914 // Read the task.
1915 let slot = (*block).slots.get_unchecked(offset + i + 1);
1916 slot.wait_write();
1917 let task = slot.task.get().read();
1918
1919 // Write it into the destination queue.
1920 dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1921 }
1922 }
1923
1924 Flavor::Lifo => {
1925 // Copy values from the injector into the destination queue.
1926 for i in 0..batch_size {
1927 // Read the task.
1928 let slot = (*block).slots.get_unchecked(offset + i + 1);
1929 slot.wait_write();
1930 let task = slot.task.get().read();
1931
1932 // Write it into the destination queue.
1933 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1934 }
1935 }
1936 }
1937
1938 atomic::fence(Ordering::Release);
1939
1940 // Update the back index in the destination queue.
1941 //
1942 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1943 // data races because it doesn't understand fences.
1944 dest.inner
1945 .back
1946 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1947
1948 // Destroy the block if we've reached the end, or if another thread wanted to destroy
1949 // but couldn't because we were busy reading from the slot.
1950 if new_offset == BLOCK_CAP {
1951 Block::destroy(block, offset);
1952 } else {
1953 for i in offset..new_offset {
1954 let slot = (*block).slots.get_unchecked(i);
1955
1956 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1957 Block::destroy(block, offset);
1958 break;
1959 }
1960 }
1961 }
1962
1963 Steal::Success(task.assume_init())
1964 }
1965 }
1966
1967 /// Returns `true` if the queue is empty.
1968 ///
1969 /// # Examples
1970 ///
1971 /// ```
1972 /// use crossbeam_deque::Injector;
1973 ///
1974 /// let q = Injector::new();
1975 ///
1976 /// assert!(q.is_empty());
1977 /// q.push(1);
1978 /// assert!(!q.is_empty());
1979 /// ```
1980 pub fn is_empty(&self) -> bool {
1981 let head = self.head.index.load(Ordering::SeqCst);
1982 let tail = self.tail.index.load(Ordering::SeqCst);
1983 head >> SHIFT == tail >> SHIFT
1984 }
1985
1986 /// Returns the number of tasks in the queue.
1987 ///
1988 /// # Examples
1989 ///
1990 /// ```
1991 /// use crossbeam_deque::Injector;
1992 ///
1993 /// let q = Injector::new();
1994 ///
1995 /// assert_eq!(q.len(), 0);
1996 /// q.push(1);
1997 /// assert_eq!(q.len(), 1);
1998 /// q.push(1);
1999 /// assert_eq!(q.len(), 2);
2000 /// ```
2001 pub fn len(&self) -> usize {
2002 loop {
2003 // Load the tail index, then load the head index.
2004 let mut tail = self.tail.index.load(Ordering::SeqCst);
2005 let mut head = self.head.index.load(Ordering::SeqCst);
2006
2007 // If the tail index didn't change, we've got consistent indices to work with.
2008 if self.tail.index.load(Ordering::SeqCst) == tail {
2009 // Erase the lower bits.
2010 tail &= !((1 << SHIFT) - 1);
2011 head &= !((1 << SHIFT) - 1);
2012
2013 // Fix up indices if they fall onto block ends.
2014 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
2015 tail = tail.wrapping_add(1 << SHIFT);
2016 }
2017 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
2018 head = head.wrapping_add(1 << SHIFT);
2019 }
2020
2021 // Rotate indices so that head falls into the first block.
2022 let lap = (head >> SHIFT) / LAP;
2023 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
2024 head = head.wrapping_sub((lap * LAP) << SHIFT);
2025
2026 // Remove the lower bits.
2027 tail >>= SHIFT;
2028 head >>= SHIFT;
2029
2030 // Return the difference minus the number of blocks between tail and head.
2031 return tail - head - tail / LAP;
2032 }
2033 }
2034 }
2035}
2036
2037impl<T> Drop for Injector<T> {
2038 fn drop(&mut self) {
2039 let mut head = *self.head.index.get_mut();
2040 let mut tail = *self.tail.index.get_mut();
2041 let mut block = *self.head.block.get_mut();
2042
2043 // Erase the lower bits.
2044 head &= !((1 << SHIFT) - 1);
2045 tail &= !((1 << SHIFT) - 1);
2046
2047 unsafe {
2048 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
2049 while head != tail {
2050 let offset = (head >> SHIFT) % LAP;
2051
2052 if offset < BLOCK_CAP {
2053 // Drop the task in the slot.
2054 let slot = (*block).slots.get_unchecked(offset);
2055 (*slot.task.get()).assume_init_drop();
2056 } else {
2057 // Deallocate the block and move to the next one.
2058 let next = *(*block).next.get_mut();
2059 drop(Box::from_raw(block));
2060 block = next;
2061 }
2062
2063 head = head.wrapping_add(1 << SHIFT);
2064 }
2065
2066 // Deallocate the last remaining block.
2067 drop(Box::from_raw(block));
2068 }
2069 }
2070}
2071
2072impl<T> fmt::Debug for Injector<T> {
2073 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2074 f.pad("Worker { .. }")
2075 }
2076}
2077
2078/// Possible outcomes of a steal operation.
2079///
2080/// # Examples
2081///
2082/// There are lots of ways to chain results of steal operations together:
2083///
2084/// ```
2085/// use crossbeam_deque::Steal::{self, Empty, Retry, Success};
2086///
2087/// let collect = |v: Vec<Steal<i32>>| v.into_iter().collect::<Steal<i32>>();
2088///
2089/// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty);
2090/// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry);
2091/// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1));
2092///
2093/// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry);
2094/// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1));
2095/// ```
2096#[must_use]
2097#[derive(PartialEq, Eq, Copy, Clone)]
2098pub enum Steal<T> {
2099 /// The queue was empty at the time of stealing.
2100 Empty,
2101
2102 /// At least one task was successfully stolen.
2103 Success(T),
2104
2105 /// The steal operation needs to be retried.
2106 Retry,
2107}
2108pub enum StealStatus<T> {
2109 Empty,
2110 Retry,
2111 Success { task: T, was_last: bool },
2112}
2113
2114impl<T> Steal<T> {
2115 /// Returns `true` if the queue was empty at the time of stealing.
2116 ///
2117 /// # Examples
2118 ///
2119 /// ```
2120 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2121 ///
2122 /// assert!(!Success(7).is_empty());
2123 /// assert!(!Retry::<i32>.is_empty());
2124 ///
2125 /// assert!(Empty::<i32>.is_empty());
2126 /// ```
2127 pub fn is_empty(&self) -> bool {
2128 match self {
2129 Steal::Empty => true,
2130 _ => false,
2131 }
2132 }
2133
2134 /// Returns `true` if at least one task was stolen.
2135 ///
2136 /// # Examples
2137 ///
2138 /// ```
2139 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2140 ///
2141 /// assert!(!Empty::<i32>.is_success());
2142 /// assert!(!Retry::<i32>.is_success());
2143 ///
2144 /// assert!(Success(7).is_success());
2145 /// ```
2146 pub fn is_success(&self) -> bool {
2147 match self {
2148 Steal::Success(_) => true,
2149 _ => false,
2150 }
2151 }
2152
2153 /// Returns `true` if the steal operation needs to be retried.
2154 ///
2155 /// # Examples
2156 ///
2157 /// ```
2158 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2159 ///
2160 /// assert!(!Empty::<i32>.is_retry());
2161 /// assert!(!Success(7).is_retry());
2162 ///
2163 /// assert!(Retry::<i32>.is_retry());
2164 /// ```
2165 pub fn is_retry(&self) -> bool {
2166 match self {
2167 Steal::Retry => true,
2168 _ => false,
2169 }
2170 }
2171
2172 /// Returns the result of the operation, if successful.
2173 ///
2174 /// # Examples
2175 ///
2176 /// ```
2177 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2178 ///
2179 /// assert_eq!(Empty::<i32>.success(), None);
2180 /// assert_eq!(Retry::<i32>.success(), None);
2181 ///
2182 /// assert_eq!(Success(7).success(), Some(7));
2183 /// ```
2184 pub fn success(self) -> Option<T> {
2185 match self {
2186 Steal::Success(res) => Some(res),
2187 _ => None,
2188 }
2189 }
2190
2191 /// If no task was stolen, attempts another steal operation.
2192 ///
2193 /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then:
2194 ///
2195 /// * If the second steal resulted in `Success`, it is returned.
2196 /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned.
2197 /// * If both resulted in `None`, then `None` is returned.
2198 ///
2199 /// # Examples
2200 ///
2201 /// ```
2202 /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2203 ///
2204 /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1));
2205 /// assert_eq!(Retry.or_else(|| Success(2)), Success(2));
2206 ///
2207 /// assert_eq!(Retry.or_else(|| Empty), Retry::<i32>);
2208 /// assert_eq!(Empty.or_else(|| Retry), Retry::<i32>);
2209 ///
2210 /// assert_eq!(Empty.or_else(|| Empty), Empty::<i32>);
2211 /// ```
2212 pub fn or_else<F>(self, f: F) -> Steal<T>
2213 where
2214 F: FnOnce() -> Steal<T>,
2215 {
2216 match self {
2217 Steal::Empty => f(),
2218 Steal::Success(_) => self,
2219 Steal::Retry => {
2220 if let Steal::Success(res) = f() {
2221 Steal::Success(res)
2222 } else {
2223 Steal::Retry
2224 }
2225 }
2226 }
2227 }
2228}
2229
2230impl<T> fmt::Debug for Steal<T> {
2231 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2232 match self {
2233 Steal::Empty => f.pad("Empty"),
2234 Steal::Success(_) => f.pad("Success(..)"),
2235 Steal::Retry => f.pad("Retry"),
2236 }
2237 }
2238}
2239
2240impl<T> FromIterator<Steal<T>> for Steal<T> {
2241 /// Consumes items until a `Success` is found and returns it.
2242 ///
2243 /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`.
2244 /// Otherwise, `Empty` is returned.
2245 fn from_iter<I>(iter: I) -> Steal<T>
2246 where
2247 I: IntoIterator<Item = Steal<T>>,
2248 {
2249 let mut retry = false;
2250 for s in iter {
2251 match &s {
2252 Steal::Empty => {}
2253 Steal::Success(_) => return s,
2254 Steal::Retry => retry = true,
2255 }
2256 }
2257
2258 if retry { Steal::Retry } else { Steal::Empty }
2259 }
2260}