maniac_runtime/runtime/
deque.rs

1use std::alloc::{Layout, alloc_zeroed, handle_alloc_error};
2use std::boxed::Box;
3use std::cell::{Cell, UnsafeCell};
4use std::cmp;
5use std::fmt;
6use std::marker::PhantomData;
7use std::mem::{self, MaybeUninit};
8use std::ptr;
9use std::sync::Arc;
10use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
11
12use crate::utils::CachePadded;
13use crossbeam_epoch::{self as epoch, Atomic, Owned};
14use crossbeam_utils::Backoff;
15
16// Minimum buffer capacity.
17const MIN_CAP: usize = 64;
18// Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`.
19const MAX_BATCH: usize = 32;
20// If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets
21// deallocated as soon as possible.
22const FLUSH_THRESHOLD_BYTES: usize = 1 << 10;
23
24/// A buffer that holds tasks in a worker queue.
25///
26/// This is just a pointer to the buffer and its length - dropping an instance of this struct will
27/// *not* deallocate the buffer.
28struct Buffer<T> {
29    /// Pointer to the allocated memory.
30    ptr: *mut T,
31
32    /// Capacity of the buffer. Always a power of two.
33    cap: usize,
34}
35
36unsafe impl<T> Send for Buffer<T> {}
37
38impl<T> Buffer<T> {
39    /// Allocates a new buffer with the specified capacity.
40    fn alloc(cap: usize) -> Buffer<T> {
41        debug_assert_eq!(cap, cap.next_power_of_two());
42
43        let ptr = Box::into_raw(
44            (0..cap)
45                .map(|_| MaybeUninit::<T>::uninit())
46                .collect::<Box<[_]>>(),
47        )
48        .cast::<T>();
49
50        Buffer { ptr, cap }
51    }
52
53    /// Deallocates the buffer.
54    unsafe fn dealloc(self) {
55        drop(unsafe {
56            Box::from_raw(ptr::slice_from_raw_parts_mut(
57                self.ptr.cast::<MaybeUninit<T>>(),
58                self.cap,
59            ))
60        });
61    }
62
63    /// Returns a pointer to the task at the specified `index`.
64    unsafe fn at(&self, index: isize) -> *mut T {
65        // `self.cap` is always a power of two.
66        // We do all the loads at `MaybeUninit` because we might realize, after loading, that we
67        // don't actually have the right to access this memory.
68        unsafe { self.ptr.offset(index & (self.cap - 1) as isize) }
69    }
70
71    /// Writes `task` into the specified `index`.
72    ///
73    /// This method might be concurrently called with another `read` at the same index, which is
74    /// technically speaking a data race and therefore UB. We should use an atomic store here, but
75    /// that would be more expensive and difficult to implement generically for all types `T`.
76    /// Hence, as a hack, we use a volatile write instead.
77    unsafe fn write(&self, index: isize, task: MaybeUninit<T>) {
78        unsafe { ptr::write_volatile(self.at(index).cast::<MaybeUninit<T>>(), task) }
79    }
80
81    /// Reads a task from the specified `index`.
82    ///
83    /// This method might be concurrently called with another `write` at the same index, which is
84    /// technically speaking a data race and therefore UB. We should use an atomic load here, but
85    /// that would be more expensive and difficult to implement generically for all types `T`.
86    /// Hence, as a hack, we use a volatile load instead.
87    unsafe fn read(&self, index: isize) -> MaybeUninit<T> {
88        unsafe { ptr::read_volatile(self.at(index).cast::<MaybeUninit<T>>()) }
89    }
90}
91
92impl<T> Clone for Buffer<T> {
93    fn clone(&self) -> Buffer<T> {
94        *self
95    }
96}
97
98impl<T> Copy for Buffer<T> {}
99
100/// Internal queue data shared between the worker and stealers.
101///
102/// The implementation is based on the following work:
103///
104/// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev]
105/// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models.
106///    PPoPP 2013.][weak-mem]
107/// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++
108///    atomics. OOPSLA 2013.][checker]
109///
110/// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974
111/// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524
112/// [checker]: https://dl.acm.org/citation.cfm?id=2509514
113struct Inner<T> {
114    /// The front index.
115    front: AtomicIsize,
116
117    /// The back index.
118    back: AtomicIsize,
119
120    /// The underlying buffer.
121    buffer: CachePadded<Atomic<Buffer<T>>>,
122}
123
124impl<T> Drop for Inner<T> {
125    fn drop(&mut self) {
126        // Load the back index, front index, and buffer.
127        let b = *self.back.get_mut();
128        let f = *self.front.get_mut();
129
130        unsafe {
131            let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected());
132
133            // Go through the buffer from front to back and drop all tasks in the queue.
134            let mut i = f;
135            while i != b {
136                buffer.deref().at(i).drop_in_place();
137                i = i.wrapping_add(1);
138            }
139
140            // Free the memory allocated by the buffer.
141            buffer.into_owned().into_box().dealloc();
142        }
143    }
144}
145
146/// Worker queue flavor: FIFO or LIFO.
147#[derive(Clone, Copy, Debug, Eq, PartialEq)]
148enum Flavor {
149    /// The first-in first-out flavor.
150    Fifo,
151
152    /// The last-in first-out flavor.
153    Lifo,
154}
155
156/// A worker queue.
157///
158/// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal
159/// tasks from it. Task schedulers typically create a single worker queue per thread.
160///
161/// # Examples
162///
163/// A FIFO worker:
164///
165/// ```
166/// use crossbeam_deque::{Steal, Worker};
167///
168/// let w = Worker::new_fifo();
169/// let s = w.stealer();
170///
171/// w.push(1);
172/// w.push(2);
173/// w.push(3);
174///
175/// assert_eq!(s.steal(), Steal::Success(1));
176/// assert_eq!(w.pop(), Some(2));
177/// assert_eq!(w.pop(), Some(3));
178/// ```
179///
180/// A LIFO worker:
181///
182/// ```
183/// use crossbeam_deque::{Steal, Worker};
184///
185/// let w = Worker::new_lifo();
186/// let s = w.stealer();
187///
188/// w.push(1);
189/// w.push(2);
190/// w.push(3);
191///
192/// assert_eq!(s.steal(), Steal::Success(1));
193/// assert_eq!(w.pop(), Some(3));
194/// assert_eq!(w.pop(), Some(2));
195/// ```
196pub struct Worker<T> {
197    /// A reference to the inner representation of the queue.
198    inner: Arc<CachePadded<Inner<T>>>,
199
200    /// A copy of `inner.buffer` for quick access.
201    buffer: Cell<Buffer<T>>,
202
203    /// The flavor of the queue.
204    flavor: Flavor,
205    // Indicates that the worker cannot be shared among threads.
206    // _marker: PhantomData<*mut ()>, // !Send + !Sync
207}
208
209unsafe impl<T: Send> Send for Worker<T> {}
210unsafe impl<T: Send> Sync for Worker<T> {}
211
212impl<T> Worker<T> {
213    /// Creates a FIFO worker queue.
214    ///
215    /// Tasks are pushed and popped from opposite ends.
216    ///
217    /// # Examples
218    ///
219    /// ```
220    /// use crossbeam_deque::Worker;
221    ///
222    /// let w = Worker::<i32>::new_fifo();
223    /// ```
224    pub fn new_fifo() -> Worker<T> {
225        let buffer = Buffer::alloc(MIN_CAP);
226
227        let inner = Arc::new(CachePadded::new(Inner {
228            front: AtomicIsize::new(0),
229            back: AtomicIsize::new(0),
230            buffer: CachePadded::new(Atomic::new(buffer)),
231        }));
232
233        Worker {
234            inner,
235            buffer: Cell::new(buffer),
236            flavor: Flavor::Fifo,
237            // _marker: PhantomData,
238        }
239    }
240
241    /// Creates a LIFO worker queue.
242    ///
243    /// Tasks are pushed and popped from the same end.
244    ///
245    /// # Examples
246    ///
247    /// ```
248    /// use crossbeam_deque::Worker;
249    ///
250    /// let w = Worker::<i32>::new_lifo();
251    /// ```
252    pub fn new_lifo() -> Worker<T> {
253        let buffer = Buffer::alloc(MIN_CAP);
254
255        let inner = Arc::new(CachePadded::new(Inner {
256            front: AtomicIsize::new(0),
257            back: AtomicIsize::new(0),
258            buffer: CachePadded::new(Atomic::new(buffer)),
259        }));
260
261        Worker {
262            inner,
263            buffer: Cell::new(buffer),
264            flavor: Flavor::Lifo,
265            // _marker: PhantomData,
266        }
267    }
268
269    /// Creates a stealer for this queue.
270    ///
271    /// The returned stealer can be shared among threads and cloned.
272    ///
273    /// # Examples
274    ///
275    /// ```
276    /// use crossbeam_deque::Worker;
277    ///
278    /// let w = Worker::<i32>::new_lifo();
279    /// let s = w.stealer();
280    /// ```
281    pub fn stealer(&self) -> Stealer<T> {
282        Stealer {
283            inner: self.inner.clone(),
284            flavor: self.flavor,
285        }
286    }
287
288    /// Resizes the internal buffer to the new capacity of `new_cap`.
289    #[cold]
290    unsafe fn resize(&self, new_cap: usize) {
291        // Load the back index, front index, and buffer.
292        let b = self.inner.back.load(Ordering::Relaxed);
293        let f = self.inner.front.load(Ordering::Relaxed);
294        let buffer = self.buffer.get();
295
296        // Allocate a new buffer and copy data from the old buffer to the new one.
297        let new = Buffer::alloc(new_cap);
298        let mut i = f;
299        while i != b {
300            unsafe { ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1) };
301            i = i.wrapping_add(1);
302        }
303
304        let guard = &epoch::pin();
305
306        // Replace the old buffer with the new one.
307        self.buffer.replace(new);
308        let old =
309            self.inner
310                .buffer
311                .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard);
312
313        // Destroy the old buffer later.
314        unsafe { guard.defer_unchecked(move || old.into_owned().into_box().dealloc()) };
315
316        // If the buffer is very large, then flush the thread-local garbage in order to deallocate
317        // it as soon as possible.
318        if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES {
319            guard.flush();
320        }
321    }
322
323    /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the
324    /// buffer.
325    fn reserve(&self, reserve_cap: usize) {
326        if reserve_cap > 0 {
327            // Compute the current length.
328            let b = self.inner.back.load(Ordering::Relaxed);
329            let f = self.inner.front.load(Ordering::SeqCst);
330            let len = b.wrapping_sub(f) as usize;
331
332            // The current capacity.
333            let cap = self.buffer.get().cap;
334
335            // Is there enough capacity to push `reserve_cap` tasks?
336            if cap - len < reserve_cap {
337                // Keep doubling the capacity as much as is needed.
338                let mut new_cap = cap * 2;
339                while new_cap - len < reserve_cap {
340                    new_cap *= 2;
341                }
342
343                // Resize the buffer.
344                unsafe {
345                    self.resize(new_cap);
346                }
347            }
348        }
349    }
350
351    /// Returns `true` if the queue is empty.
352    ///
353    /// ```
354    /// use crossbeam_deque::Worker;
355    ///
356    /// let w = Worker::new_lifo();
357    ///
358    /// assert!(w.is_empty());
359    /// w.push(1);
360    /// assert!(!w.is_empty());
361    /// ```
362    pub fn is_empty(&self) -> bool {
363        let b = self.inner.back.load(Ordering::Relaxed);
364        let f = self.inner.front.load(Ordering::SeqCst);
365        b.wrapping_sub(f) <= 0
366    }
367
368    /// Returns the number of tasks in the deque.
369    ///
370    /// ```
371    /// use crossbeam_deque::Worker;
372    ///
373    /// let w = Worker::new_lifo();
374    ///
375    /// assert_eq!(w.len(), 0);
376    /// w.push(1);
377    /// assert_eq!(w.len(), 1);
378    /// w.push(1);
379    /// assert_eq!(w.len(), 2);
380    /// ```
381    pub fn len(&self) -> usize {
382        let b = self.inner.back.load(Ordering::Relaxed);
383        let f = self.inner.front.load(Ordering::SeqCst);
384        b.wrapping_sub(f).max(0) as usize
385    }
386
387    /// Pushes a task into the queue.
388    ///
389    /// # Examples
390    ///
391    /// ```
392    /// use crossbeam_deque::Worker;
393    ///
394    /// let w = Worker::new_lifo();
395    /// w.push(1);
396    /// w.push(2);
397    /// ```
398    fn push_core(&self, task: T) -> bool {
399        // Load the back index, front index, and buffer.
400        let b = self.inner.back.load(Ordering::Relaxed);
401        let f = self.inner.front.load(Ordering::Acquire);
402        let mut buffer = self.buffer.get();
403
404        // Calculate the length of the queue.
405        let len = b.wrapping_sub(f);
406        let was_empty = len <= 0;
407
408        // Is the queue full?
409        if len >= buffer.cap as isize {
410            // Yes. Grow the underlying buffer.
411            unsafe {
412                self.resize(2 * buffer.cap);
413            }
414            buffer = self.buffer.get();
415        }
416
417        // Write `task` into the slot.
418        unsafe {
419            buffer.write(b, MaybeUninit::new(task));
420        }
421
422        atomic::fence(Ordering::Release);
423
424        // Increment the back index.
425        //
426        // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
427        // races because it doesn't understand fences.
428        self.inner.back.store(b.wrapping_add(1), Ordering::Release);
429
430        was_empty
431    }
432
433    pub fn push(&self, task: T) {
434        let _ = self.push_core(task);
435    }
436
437    pub fn push_with_status(&self, task: T) -> bool {
438        self.push_core(task)
439    }
440
441    /// Pops a task from the queue.
442    ///
443    /// # Examples
444    ///
445    /// ```
446    /// use crossbeam_deque::Worker;
447    ///
448    /// let w = Worker::new_fifo();
449    /// w.push(1);
450    /// w.push(2);
451    ///
452    /// assert_eq!(w.pop(), Some(1));
453    /// assert_eq!(w.pop(), Some(2));
454    /// assert_eq!(w.pop(), None);
455    /// ```
456    fn pop_core(&self) -> (Option<T>, bool) {
457        // Load the back and front index.
458        let b = self.inner.back.load(Ordering::Relaxed);
459        let f = self.inner.front.load(Ordering::Relaxed);
460
461        // Calculate the length of the queue.
462        let len = b.wrapping_sub(f);
463
464        // Is the queue empty?
465        if len <= 0 {
466            return (None, true);
467        }
468
469        match self.flavor {
470            // Pop from the front of the queue.
471            Flavor::Fifo => {
472                // Try incrementing the front index to pop the task.
473                let f = self.inner.front.fetch_add(1, Ordering::SeqCst);
474                let new_f = f.wrapping_add(1);
475
476                if b.wrapping_sub(new_f) < 0 {
477                    self.inner.front.store(f, Ordering::Relaxed);
478                    return (None, true);
479                }
480
481                unsafe {
482                    // Read the popped task.
483                    let buffer = self.buffer.get();
484                    let task = buffer.read(f).assume_init();
485
486                    // Shrink the buffer if `len - 1` is less than one fourth of the capacity.
487                    if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
488                        self.resize(buffer.cap / 2);
489                    }
490
491                    (Some(task), b.wrapping_sub(new_f) <= 0)
492                }
493            }
494
495            // Pop from the back of the queue.
496            Flavor::Lifo => {
497                // Decrement the back index.
498                let b = b.wrapping_sub(1);
499                self.inner.back.store(b, Ordering::Relaxed);
500
501                atomic::fence(Ordering::SeqCst);
502
503                // Load the front index.
504                let f = self.inner.front.load(Ordering::Relaxed);
505
506                // Compute the length after the back index was decremented.
507                let len = b.wrapping_sub(f);
508
509                if len < 0 {
510                    // The queue is empty. Restore the back index to the original task.
511                    self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
512                    (None, true)
513                } else {
514                    // Read the task to be popped.
515                    let buffer = self.buffer.get();
516                    let mut task = unsafe { Some(buffer.read(b)) };
517                    let mut was_last = false;
518
519                    // Are we popping the last task from the queue?
520                    if len == 0 {
521                        // Try incrementing the front index.
522                        if self
523                            .inner
524                            .front
525                            .compare_exchange(
526                                f,
527                                f.wrapping_add(1),
528                                Ordering::SeqCst,
529                                Ordering::Relaxed,
530                            )
531                            .is_err()
532                        {
533                            // Failed. We didn't pop anything. Reset to `None`.
534                            task.take();
535                            self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
536                            return (None, false);
537                        }
538
539                        // Restore the back index to the original task.
540                        self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
541                        was_last = true;
542                    } else {
543                        // Shrink the buffer if `len` is less than one fourth of the capacity.
544                        if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 {
545                            unsafe {
546                                self.resize(buffer.cap / 2);
547                            }
548                        }
549                    }
550
551                    let result = task.take().map(|t| unsafe { t.assume_init() });
552                    let was_last = was_last && result.is_some();
553                    (result, was_last)
554                }
555            }
556        }
557    }
558
559    pub fn pop(&self) -> Option<T> {
560        self.pop_core().0
561    }
562
563    pub fn pop_with_status(&self) -> (Option<T>, bool) {
564        self.pop_core()
565    }
566}
567
568impl<T> fmt::Debug for Worker<T> {
569    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
570        f.pad("Worker { .. }")
571    }
572}
573
574/// A stealer handle of a worker queue.
575///
576/// Stealers can be shared among threads.
577///
578/// Task schedulers typically have a single worker queue per worker thread.
579///
580/// # Examples
581///
582/// ```
583/// use crossbeam_deque::{Steal, Worker};
584///
585/// let w = Worker::new_lifo();
586/// w.push(1);
587/// w.push(2);
588///
589/// let s = w.stealer();
590/// assert_eq!(s.steal(), Steal::Success(1));
591/// assert_eq!(s.steal(), Steal::Success(2));
592/// assert_eq!(s.steal(), Steal::Empty);
593/// ```
594pub struct Stealer<T> {
595    /// A reference to the inner representation of the queue.
596    inner: Arc<CachePadded<Inner<T>>>,
597
598    /// The flavor of the queue.
599    flavor: Flavor,
600}
601
602unsafe impl<T: Send> Send for Stealer<T> {}
603unsafe impl<T: Send> Sync for Stealer<T> {}
604
605impl<T> Stealer<T> {
606    /// Returns `true` if the queue is empty.
607    ///
608    /// ```
609    /// use crossbeam_deque::Worker;
610    ///
611    /// let w = Worker::new_lifo();
612    /// let s = w.stealer();
613    ///
614    /// assert!(s.is_empty());
615    /// w.push(1);
616    /// assert!(!s.is_empty());
617    /// ```
618    pub fn is_empty(&self) -> bool {
619        let f = self.inner.front.load(Ordering::Acquire);
620        atomic::fence(Ordering::SeqCst);
621        let b = self.inner.back.load(Ordering::Acquire);
622        b.wrapping_sub(f) <= 0
623    }
624
625    /// Returns the number of tasks in the deque.
626    ///
627    /// ```
628    /// use crossbeam_deque::Worker;
629    ///
630    /// let w = Worker::new_lifo();
631    /// let s = w.stealer();
632    ///
633    /// assert_eq!(s.len(), 0);
634    /// w.push(1);
635    /// assert_eq!(s.len(), 1);
636    /// w.push(2);
637    /// assert_eq!(s.len(), 2);
638    /// ```
639    pub fn len(&self) -> usize {
640        let f = self.inner.front.load(Ordering::Acquire);
641        atomic::fence(Ordering::SeqCst);
642        let b = self.inner.back.load(Ordering::Acquire);
643        b.wrapping_sub(f).max(0) as usize
644    }
645
646    /// Steals a task from the queue.
647    ///
648    /// # Examples
649    ///
650    /// ```
651    /// use crossbeam_deque::{Steal, Worker};
652    ///
653    /// let w = Worker::new_lifo();
654    /// w.push(1);
655    /// w.push(2);
656    ///
657    /// let s = w.stealer();
658    /// assert_eq!(s.steal(), Steal::Success(1));
659    /// assert_eq!(s.steal(), Steal::Success(2));
660    /// ```
661    fn steal_core(&self) -> StealStatus<T> {
662        // Load the front index.
663        let f = self.inner.front.load(Ordering::Acquire);
664
665        // A SeqCst fence is needed here.
666        //
667        // If the current thread is already pinned (reentrantly), we must manually issue the
668        // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
669        // have to.
670        if epoch::is_pinned() {
671            atomic::fence(Ordering::SeqCst);
672        }
673
674        let guard = &epoch::pin();
675
676        // Load the back index.
677        let b = self.inner.back.load(Ordering::Acquire);
678
679        // Is the queue empty?
680        if b.wrapping_sub(f) <= 0 {
681            return StealStatus::Empty;
682        }
683
684        // Load the buffer and read the task at the front.
685        let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
686        let task = unsafe { buffer.deref().read(f) };
687
688        // Try incrementing the front index to steal the task.
689        // If the buffer has been swapped or the increment fails, we retry.
690        if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
691            || self
692                .inner
693                .front
694                .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
695                .is_err()
696        {
697            // We didn't steal this task, forget it.
698            return StealStatus::Retry;
699        }
700
701        let was_last = b.wrapping_sub(f.wrapping_add(1)) <= 0;
702        StealStatus::Success {
703            task: unsafe { task.assume_init() },
704            was_last,
705        }
706    }
707
708    pub fn steal(&self) -> Steal<T> {
709        match self.steal_core() {
710            StealStatus::Empty => Steal::Empty,
711            StealStatus::Retry => Steal::Retry,
712            StealStatus::Success { task, .. } => Steal::Success(task),
713        }
714    }
715
716    pub fn steal_with_status(&self) -> StealStatus<T> {
717        self.steal_core()
718    }
719
720    /// Steals a batch of tasks and pushes them into another worker.
721    ///
722    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
723    /// steal around half of the tasks in the queue, but also not more than some constant limit.
724    ///
725    /// # Examples
726    ///
727    /// ```
728    /// use crossbeam_deque::Worker;
729    ///
730    /// let w1 = Worker::new_fifo();
731    /// w1.push(1);
732    /// w1.push(2);
733    /// w1.push(3);
734    /// w1.push(4);
735    ///
736    /// let s = w1.stealer();
737    /// let w2 = Worker::new_fifo();
738    ///
739    /// let _ = s.steal_batch(&w2);
740    /// assert_eq!(w2.pop(), Some(1));
741    /// assert_eq!(w2.pop(), Some(2));
742    /// ```
743    pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
744        self.steal_batch_with_limit(dest, MAX_BATCH)
745    }
746
747    /// Steals no more than `limit` of tasks and pushes them into another worker.
748    ///
749    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
750    /// steal around half of the tasks in the queue, but also not more than the given limit.
751    ///
752    /// # Examples
753    ///
754    /// ```
755    /// use crossbeam_deque::Worker;
756    ///
757    /// let w1 = Worker::new_fifo();
758    /// w1.push(1);
759    /// w1.push(2);
760    /// w1.push(3);
761    /// w1.push(4);
762    /// w1.push(5);
763    /// w1.push(6);
764    ///
765    /// let s = w1.stealer();
766    /// let w2 = Worker::new_fifo();
767    ///
768    /// let _ = s.steal_batch_with_limit(&w2, 2);
769    /// assert_eq!(w2.pop(), Some(1));
770    /// assert_eq!(w2.pop(), Some(2));
771    /// assert_eq!(w2.pop(), None);
772    ///
773    /// w1.push(7);
774    /// w1.push(8);
775    /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
776    /// // half of the elements are currently popped, but the number of popped elements is considered
777    /// // an implementation detail that may be changed in the future.
778    /// let _ = s.steal_batch_with_limit(&w2, std::usize::MAX);
779    /// assert_eq!(w2.len(), 3);
780    /// ```
781    pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
782        assert!(limit > 0);
783        if Arc::ptr_eq(&self.inner, &dest.inner) {
784            if dest.is_empty() {
785                return Steal::Empty;
786            } else {
787                return Steal::Success(());
788            }
789        }
790
791        // Load the front index.
792        let mut f = self.inner.front.load(Ordering::Acquire);
793
794        // A SeqCst fence is needed here.
795        //
796        // If the current thread is already pinned (reentrantly), we must manually issue the
797        // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
798        // have to.
799        if epoch::is_pinned() {
800            atomic::fence(Ordering::SeqCst);
801        }
802
803        let guard = &epoch::pin();
804
805        // Load the back index.
806        let b = self.inner.back.load(Ordering::Acquire);
807
808        // Is the queue empty?
809        let len = b.wrapping_sub(f);
810        if len <= 0 {
811            return Steal::Empty;
812        }
813
814        // Reserve capacity for the stolen batch.
815        let batch_size = cmp::min((len as usize + 1) / 2, limit);
816        dest.reserve(batch_size);
817        let mut batch_size = batch_size as isize;
818
819        // Get the destination buffer and back index.
820        let dest_buffer = dest.buffer.get();
821        let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
822
823        // Load the buffer.
824        let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
825
826        match self.flavor {
827            // Steal a batch of tasks from the front at once.
828            Flavor::Fifo => {
829                // Copy the batch from the source to the destination buffer.
830                match dest.flavor {
831                    Flavor::Fifo => {
832                        for i in 0..batch_size {
833                            unsafe {
834                                let task = buffer.deref().read(f.wrapping_add(i));
835                                dest_buffer.write(dest_b.wrapping_add(i), task);
836                            }
837                        }
838                    }
839                    Flavor::Lifo => {
840                        for i in 0..batch_size {
841                            unsafe {
842                                let task = buffer.deref().read(f.wrapping_add(i));
843                                dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
844                            }
845                        }
846                    }
847                }
848
849                // Try incrementing the front index to steal the batch.
850                // If the buffer has been swapped or the increment fails, we retry.
851                if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
852                    || self
853                        .inner
854                        .front
855                        .compare_exchange(
856                            f,
857                            f.wrapping_add(batch_size),
858                            Ordering::SeqCst,
859                            Ordering::Relaxed,
860                        )
861                        .is_err()
862                {
863                    return Steal::Retry;
864                }
865
866                dest_b = dest_b.wrapping_add(batch_size);
867            }
868
869            // Steal a batch of tasks from the front one by one.
870            Flavor::Lifo => {
871                // This loop may modify the batch_size, which triggers a clippy lint warning.
872                // Use a new variable to avoid the warning, and to make it clear we aren't
873                // modifying the loop exit condition during iteration.
874                let original_batch_size = batch_size;
875
876                for i in 0..original_batch_size {
877                    // If this is not the first steal, check whether the queue is empty.
878                    if i > 0 {
879                        // We've already got the current front index. Now execute the fence to
880                        // synchronize with other threads.
881                        atomic::fence(Ordering::SeqCst);
882
883                        // Load the back index.
884                        let b = self.inner.back.load(Ordering::Acquire);
885
886                        // Is the queue empty?
887                        if b.wrapping_sub(f) <= 0 {
888                            batch_size = i;
889                            break;
890                        }
891                    }
892
893                    // Read the task at the front.
894                    let task = unsafe { buffer.deref().read(f) };
895
896                    // Try incrementing the front index to steal the task.
897                    // If the buffer has been swapped or the increment fails, we retry.
898                    if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
899                        || self
900                            .inner
901                            .front
902                            .compare_exchange(
903                                f,
904                                f.wrapping_add(1),
905                                Ordering::SeqCst,
906                                Ordering::Relaxed,
907                            )
908                            .is_err()
909                    {
910                        // We didn't steal this task, forget it and break from the loop.
911                        batch_size = i;
912                        break;
913                    }
914
915                    // Write the stolen task into the destination buffer.
916                    unsafe {
917                        dest_buffer.write(dest_b, task);
918                    }
919
920                    // Move the source front index and the destination back index one step forward.
921                    f = f.wrapping_add(1);
922                    dest_b = dest_b.wrapping_add(1);
923                }
924
925                // If we didn't steal anything, the operation needs to be retried.
926                if batch_size == 0 {
927                    return Steal::Retry;
928                }
929
930                // If stealing into a FIFO queue, stolen tasks need to be reversed.
931                if dest.flavor == Flavor::Fifo {
932                    for i in 0..batch_size / 2 {
933                        unsafe {
934                            let i1 = dest_b.wrapping_sub(batch_size - i);
935                            let i2 = dest_b.wrapping_sub(i + 1);
936                            let t1 = dest_buffer.read(i1);
937                            let t2 = dest_buffer.read(i2);
938                            dest_buffer.write(i1, t2);
939                            dest_buffer.write(i2, t1);
940                        }
941                    }
942                }
943            }
944        }
945
946        atomic::fence(Ordering::Release);
947
948        // Update the back index in the destination queue.
949        //
950        // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
951        // races because it doesn't understand fences.
952        dest.inner.back.store(dest_b, Ordering::Release);
953
954        // Return with success.
955        Steal::Success(())
956    }
957
958    /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker.
959    ///
960    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
961    /// steal around half of the tasks in the queue, but also not more than some constant limit.
962    ///
963    /// # Examples
964    ///
965    /// ```
966    /// use crossbeam_deque::{Steal, Worker};
967    ///
968    /// let w1 = Worker::new_fifo();
969    /// w1.push(1);
970    /// w1.push(2);
971    /// w1.push(3);
972    /// w1.push(4);
973    ///
974    /// let s = w1.stealer();
975    /// let w2 = Worker::new_fifo();
976    ///
977    /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1));
978    /// assert_eq!(w2.pop(), Some(2));
979    /// ```
980    pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
981        self.steal_batch_with_limit_and_pop(dest, MAX_BATCH)
982    }
983
984    /// Steals no more than `limit` of tasks, pushes them into another worker, and pops a task from
985    /// that worker.
986    ///
987    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
988    /// steal around half of the tasks in the queue, but also not more than the given limit.
989    ///
990    /// # Examples
991    ///
992    /// ```
993    /// use crossbeam_deque::{Steal, Worker};
994    ///
995    /// let w1 = Worker::new_fifo();
996    /// w1.push(1);
997    /// w1.push(2);
998    /// w1.push(3);
999    /// w1.push(4);
1000    /// w1.push(5);
1001    /// w1.push(6);
1002    ///
1003    /// let s = w1.stealer();
1004    /// let w2 = Worker::new_fifo();
1005    ///
1006    /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, 2), Steal::Success(1));
1007    /// assert_eq!(w2.pop(), Some(2));
1008    /// assert_eq!(w2.pop(), None);
1009    ///
1010    /// w1.push(7);
1011    /// w1.push(8);
1012    /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1013    /// // half of the elements are currently popped, but the number of popped elements is considered
1014    /// // an implementation detail that may be changed in the future.
1015    /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, std::usize::MAX), Steal::Success(3));
1016    /// assert_eq!(w2.pop(), Some(4));
1017    /// assert_eq!(w2.pop(), Some(5));
1018    /// assert_eq!(w2.pop(), None);
1019    /// ```
1020    pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
1021        assert!(limit > 0);
1022        if Arc::ptr_eq(&self.inner, &dest.inner) {
1023            match dest.pop() {
1024                None => return Steal::Empty,
1025                Some(task) => return Steal::Success(task),
1026            }
1027        }
1028
1029        // Load the front index.
1030        let mut f = self.inner.front.load(Ordering::Acquire);
1031
1032        // A SeqCst fence is needed here.
1033        //
1034        // If the current thread is already pinned (reentrantly), we must manually issue the
1035        // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
1036        // have to.
1037        if epoch::is_pinned() {
1038            atomic::fence(Ordering::SeqCst);
1039        }
1040
1041        let guard = &epoch::pin();
1042
1043        // Load the back index.
1044        let b = self.inner.back.load(Ordering::Acquire);
1045
1046        // Is the queue empty?
1047        let len = b.wrapping_sub(f);
1048        if len <= 0 {
1049            return Steal::Empty;
1050        }
1051
1052        // Reserve capacity for the stolen batch.
1053        let batch_size = cmp::min((len as usize - 1) / 2, limit - 1);
1054        dest.reserve(batch_size);
1055        let mut batch_size = batch_size as isize;
1056
1057        // Get the destination buffer and back index.
1058        let dest_buffer = dest.buffer.get();
1059        let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
1060
1061        // Load the buffer
1062        let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
1063
1064        // Read the task at the front.
1065        let mut task = unsafe { buffer.deref().read(f) };
1066
1067        match self.flavor {
1068            // Steal a batch of tasks from the front at once.
1069            Flavor::Fifo => {
1070                // Copy the batch from the source to the destination buffer.
1071                match dest.flavor {
1072                    Flavor::Fifo => {
1073                        for i in 0..batch_size {
1074                            unsafe {
1075                                let task = buffer.deref().read(f.wrapping_add(i + 1));
1076                                dest_buffer.write(dest_b.wrapping_add(i), task);
1077                            }
1078                        }
1079                    }
1080                    Flavor::Lifo => {
1081                        for i in 0..batch_size {
1082                            unsafe {
1083                                let task = buffer.deref().read(f.wrapping_add(i + 1));
1084                                dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
1085                            }
1086                        }
1087                    }
1088                }
1089
1090                // Try incrementing the front index to steal the task.
1091                // If the buffer has been swapped or the increment fails, we retry.
1092                if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1093                    || self
1094                        .inner
1095                        .front
1096                        .compare_exchange(
1097                            f,
1098                            f.wrapping_add(batch_size + 1),
1099                            Ordering::SeqCst,
1100                            Ordering::Relaxed,
1101                        )
1102                        .is_err()
1103                {
1104                    // We didn't steal this task, forget it.
1105                    return Steal::Retry;
1106                }
1107
1108                dest_b = dest_b.wrapping_add(batch_size);
1109            }
1110
1111            // Steal a batch of tasks from the front one by one.
1112            Flavor::Lifo => {
1113                // Try incrementing the front index to steal the task.
1114                if self
1115                    .inner
1116                    .front
1117                    .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
1118                    .is_err()
1119                {
1120                    // We didn't steal this task, forget it.
1121                    return Steal::Retry;
1122                }
1123
1124                // Move the front index one step forward.
1125                f = f.wrapping_add(1);
1126
1127                // Repeat the same procedure for the batch steals.
1128                //
1129                // This loop may modify the batch_size, which triggers a clippy lint warning.
1130                // Use a new variable to avoid the warning, and to make it clear we aren't
1131                // modifying the loop exit condition during iteration.
1132                let original_batch_size = batch_size;
1133                for i in 0..original_batch_size {
1134                    // We've already got the current front index. Now execute the fence to
1135                    // synchronize with other threads.
1136                    atomic::fence(Ordering::SeqCst);
1137
1138                    // Load the back index.
1139                    let b = self.inner.back.load(Ordering::Acquire);
1140
1141                    // Is the queue empty?
1142                    if b.wrapping_sub(f) <= 0 {
1143                        batch_size = i;
1144                        break;
1145                    }
1146
1147                    // Read the task at the front.
1148                    let tmp = unsafe { buffer.deref().read(f) };
1149
1150                    // Try incrementing the front index to steal the task.
1151                    // If the buffer has been swapped or the increment fails, we retry.
1152                    if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1153                        || self
1154                            .inner
1155                            .front
1156                            .compare_exchange(
1157                                f,
1158                                f.wrapping_add(1),
1159                                Ordering::SeqCst,
1160                                Ordering::Relaxed,
1161                            )
1162                            .is_err()
1163                    {
1164                        // We didn't steal this task, forget it and break from the loop.
1165                        batch_size = i;
1166                        break;
1167                    }
1168
1169                    // Write the previously stolen task into the destination buffer.
1170                    unsafe {
1171                        dest_buffer.write(dest_b, mem::replace(&mut task, tmp));
1172                    }
1173
1174                    // Move the source front index and the destination back index one step forward.
1175                    f = f.wrapping_add(1);
1176                    dest_b = dest_b.wrapping_add(1);
1177                }
1178
1179                // If stealing into a FIFO queue, stolen tasks need to be reversed.
1180                if dest.flavor == Flavor::Fifo {
1181                    for i in 0..batch_size / 2 {
1182                        unsafe {
1183                            let i1 = dest_b.wrapping_sub(batch_size - i);
1184                            let i2 = dest_b.wrapping_sub(i + 1);
1185                            let t1 = dest_buffer.read(i1);
1186                            let t2 = dest_buffer.read(i2);
1187                            dest_buffer.write(i1, t2);
1188                            dest_buffer.write(i2, t1);
1189                        }
1190                    }
1191                }
1192            }
1193        }
1194
1195        atomic::fence(Ordering::Release);
1196
1197        // Update the back index in the destination queue.
1198        //
1199        // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
1200        // races because it doesn't understand fences.
1201        dest.inner.back.store(dest_b, Ordering::Release);
1202
1203        // Return with success.
1204        Steal::Success(unsafe { task.assume_init() })
1205    }
1206}
1207
1208impl<T> Clone for Stealer<T> {
1209    fn clone(&self) -> Stealer<T> {
1210        Stealer {
1211            inner: self.inner.clone(),
1212            flavor: self.flavor,
1213        }
1214    }
1215}
1216
1217impl<T> fmt::Debug for Stealer<T> {
1218    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1219        f.pad("Stealer { .. }")
1220    }
1221}
1222
1223// Bits indicating the state of a slot:
1224// * If a task has been written into the slot, `WRITE` is set.
1225// * If a task has been read from the slot, `READ` is set.
1226// * If the block is being destroyed, `DESTROY` is set.
1227const WRITE: usize = 1;
1228const READ: usize = 2;
1229const DESTROY: usize = 4;
1230
1231// Each block covers one "lap" of indices.
1232const LAP: usize = 64;
1233// The maximum number of values a block can hold.
1234const BLOCK_CAP: usize = LAP - 1;
1235// How many lower bits are reserved for metadata.
1236const SHIFT: usize = 1;
1237// Indicates that the block is not the last one.
1238const HAS_NEXT: usize = 1;
1239
1240/// A slot in a block.
1241struct Slot<T> {
1242    /// The task.
1243    task: UnsafeCell<MaybeUninit<T>>,
1244
1245    /// The state of the slot.
1246    state: AtomicUsize,
1247}
1248
1249impl<T> Slot<T> {
1250    /// Waits until a task is written into the slot.
1251    fn wait_write(&self) {
1252        let backoff = Backoff::new();
1253        while self.state.load(Ordering::Acquire) & WRITE == 0 {
1254            backoff.snooze();
1255        }
1256    }
1257}
1258
1259/// A block in a linked list.
1260///
1261/// Each block in the list can hold up to `BLOCK_CAP` values.
1262struct Block<T> {
1263    /// The next block in the linked list.
1264    next: AtomicPtr<Block<T>>,
1265
1266    /// Slots for values.
1267    slots: [Slot<T>; BLOCK_CAP],
1268}
1269
1270impl<T> Block<T> {
1271    const LAYOUT: Layout = {
1272        let layout = Layout::new::<Self>();
1273        assert!(
1274            layout.size() != 0,
1275            "Block should never be zero-sized, as it has an AtomicPtr field"
1276        );
1277        layout
1278    };
1279
1280    /// Creates an empty block.
1281    fn new() -> Box<Self> {
1282        // SAFETY: layout is not zero-sized
1283        let ptr = unsafe { alloc_zeroed(Self::LAYOUT) };
1284        // Handle allocation failure
1285        if ptr.is_null() {
1286            handle_alloc_error(Self::LAYOUT)
1287        }
1288        // SAFETY: This is safe because:
1289        //  [1] `Block::next` (AtomicPtr) may be safely zero initialized.
1290        //  [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
1291        //  [3] `Slot::task` (UnsafeCell) may be safely zero initialized because it
1292        //       holds a MaybeUninit.
1293        //  [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
1294        // TODO: unsafe { Box::new_zeroed().assume_init() }
1295        unsafe { Box::from_raw(ptr.cast()) }
1296    }
1297
1298    /// Waits until the next pointer is set.
1299    fn wait_next(&self) -> *mut Block<T> {
1300        let backoff = Backoff::new();
1301        loop {
1302            let next = self.next.load(Ordering::Acquire);
1303            if !next.is_null() {
1304                return next;
1305            }
1306            backoff.snooze();
1307        }
1308    }
1309
1310    /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
1311    unsafe fn destroy(this: *mut Block<T>, count: usize) {
1312        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
1313        // begun destruction of the block.
1314        for i in (0..count).rev() {
1315            let slot = unsafe { (*this).slots.get_unchecked(i) };
1316
1317            // Mark the `DESTROY` bit if a thread is still using the slot.
1318            if slot.state.load(Ordering::Acquire) & READ == 0
1319                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
1320            {
1321                // If a thread is still using the slot, it will continue destruction of the block.
1322                return;
1323            }
1324        }
1325
1326        // No thread is using the block, now it is safe to destroy it.
1327        drop(unsafe { Box::from_raw(this) });
1328    }
1329}
1330
1331/// A position in a queue.
1332struct Position<T> {
1333    /// The index in the queue.
1334    index: AtomicUsize,
1335
1336    /// The block in the linked list.
1337    block: AtomicPtr<Block<T>>,
1338}
1339
1340/// An injector queue.
1341///
1342/// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have
1343/// a single injector queue, which is the entry point for new tasks.
1344///
1345/// # Examples
1346///
1347/// ```
1348/// use crossbeam_deque::{Injector, Steal};
1349///
1350/// let q = Injector::new();
1351/// q.push(1);
1352/// q.push(2);
1353///
1354/// assert_eq!(q.steal(), Steal::Success(1));
1355/// assert_eq!(q.steal(), Steal::Success(2));
1356/// assert_eq!(q.steal(), Steal::Empty);
1357/// ```
1358pub struct Injector<T> {
1359    /// The head of the queue.
1360    head: CachePadded<Position<T>>,
1361
1362    /// The tail of the queue.
1363    tail: CachePadded<Position<T>>,
1364
1365    /// Indicates that dropping a `Injector<T>` may drop values of type `T`.
1366    _marker: PhantomData<T>,
1367}
1368
1369unsafe impl<T: Send> Send for Injector<T> {}
1370unsafe impl<T: Send> Sync for Injector<T> {}
1371
1372impl<T> Default for Injector<T> {
1373    fn default() -> Self {
1374        let block = Box::into_raw(Block::<T>::new());
1375        Self {
1376            head: CachePadded::new(Position {
1377                block: AtomicPtr::new(block),
1378                index: AtomicUsize::new(0),
1379            }),
1380            tail: CachePadded::new(Position {
1381                block: AtomicPtr::new(block),
1382                index: AtomicUsize::new(0),
1383            }),
1384            _marker: PhantomData,
1385        }
1386    }
1387}
1388
1389impl<T> Injector<T> {
1390    /// Creates a new injector queue.
1391    ///
1392    /// # Examples
1393    ///
1394    /// ```
1395    /// use crossbeam_deque::Injector;
1396    ///
1397    /// let q = Injector::<i32>::new();
1398    /// ```
1399    pub fn new() -> Injector<T> {
1400        Self::default()
1401    }
1402
1403    /// Pushes a task into the queue.
1404    ///
1405    /// # Examples
1406    ///
1407    /// ```
1408    /// use crossbeam_deque::Injector;
1409    ///
1410    /// let w = Injector::new();
1411    /// w.push(1);
1412    /// w.push(2);
1413    /// ```
1414    pub fn push(&self, task: T) {
1415        let backoff = Backoff::new();
1416        let mut tail = self.tail.index.load(Ordering::Acquire);
1417        let mut block = self.tail.block.load(Ordering::Acquire);
1418        let mut next_block = None;
1419
1420        loop {
1421            // Calculate the offset of the index into the block.
1422            let offset = (tail >> SHIFT) % LAP;
1423
1424            // If we reached the end of the block, wait until the next one is installed.
1425            if offset == BLOCK_CAP {
1426                backoff.snooze();
1427                tail = self.tail.index.load(Ordering::Acquire);
1428                block = self.tail.block.load(Ordering::Acquire);
1429                continue;
1430            }
1431
1432            // If we're going to have to install the next block, allocate it in advance in order to
1433            // make the wait for other threads as short as possible.
1434            if offset + 1 == BLOCK_CAP && next_block.is_none() {
1435                next_block = Some(Block::<T>::new());
1436            }
1437
1438            let new_tail = tail + (1 << SHIFT);
1439
1440            // Try advancing the tail forward.
1441            match self.tail.index.compare_exchange_weak(
1442                tail,
1443                new_tail,
1444                Ordering::SeqCst,
1445                Ordering::Acquire,
1446            ) {
1447                Ok(_) => unsafe {
1448                    // If we've reached the end of the block, install the next one.
1449                    if offset + 1 == BLOCK_CAP {
1450                        let next_block = Box::into_raw(next_block.unwrap());
1451                        let next_index = new_tail.wrapping_add(1 << SHIFT);
1452
1453                        self.tail.block.store(next_block, Ordering::Release);
1454                        self.tail.index.store(next_index, Ordering::Release);
1455                        (*block).next.store(next_block, Ordering::Release);
1456                    }
1457
1458                    // Write the task into the slot.
1459                    let slot = (*block).slots.get_unchecked(offset);
1460                    slot.task.get().write(MaybeUninit::new(task));
1461                    slot.state.fetch_or(WRITE, Ordering::Release);
1462
1463                    return;
1464                },
1465                Err(t) => {
1466                    tail = t;
1467                    block = self.tail.block.load(Ordering::Acquire);
1468                    backoff.spin();
1469                }
1470            }
1471        }
1472    }
1473
1474    /// Steals a task from the queue.
1475    ///
1476    /// # Examples
1477    ///
1478    /// ```
1479    /// use crossbeam_deque::{Injector, Steal};
1480    ///
1481    /// let q = Injector::new();
1482    /// q.push(1);
1483    /// q.push(2);
1484    ///
1485    /// assert_eq!(q.steal(), Steal::Success(1));
1486    /// assert_eq!(q.steal(), Steal::Success(2));
1487    /// assert_eq!(q.steal(), Steal::Empty);
1488    /// ```
1489    pub fn steal(&self) -> Steal<T> {
1490        let mut head;
1491        let mut block;
1492        let mut offset;
1493
1494        let backoff = Backoff::new();
1495        loop {
1496            head = self.head.index.load(Ordering::Acquire);
1497            block = self.head.block.load(Ordering::Acquire);
1498
1499            // Calculate the offset of the index into the block.
1500            offset = (head >> SHIFT) % LAP;
1501
1502            // If we reached the end of the block, wait until the next one is installed.
1503            if offset == BLOCK_CAP {
1504                backoff.snooze();
1505            } else {
1506                break;
1507            }
1508        }
1509
1510        let mut new_head = head + (1 << SHIFT);
1511
1512        if new_head & HAS_NEXT == 0 {
1513            atomic::fence(Ordering::SeqCst);
1514            let tail = self.tail.index.load(Ordering::Relaxed);
1515
1516            // If the tail equals the head, that means the queue is empty.
1517            if head >> SHIFT == tail >> SHIFT {
1518                return Steal::Empty;
1519            }
1520
1521            // If head and tail are not in the same block, set `HAS_NEXT` in head.
1522            if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1523                new_head |= HAS_NEXT;
1524            }
1525        }
1526
1527        // Try moving the head index forward.
1528        if self
1529            .head
1530            .index
1531            .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1532            .is_err()
1533        {
1534            return Steal::Retry;
1535        }
1536
1537        unsafe {
1538            // If we've reached the end of the block, move to the next one.
1539            if offset + 1 == BLOCK_CAP {
1540                let next = (*block).wait_next();
1541                let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1542                if !(*next).next.load(Ordering::Relaxed).is_null() {
1543                    next_index |= HAS_NEXT;
1544                }
1545
1546                self.head.block.store(next, Ordering::Release);
1547                self.head.index.store(next_index, Ordering::Release);
1548            }
1549
1550            // Read the task.
1551            let slot = (*block).slots.get_unchecked(offset);
1552            slot.wait_write();
1553            let task = slot.task.get().read().assume_init();
1554
1555            // Destroy the block if we've reached the end, or if another thread wanted to destroy
1556            // but couldn't because we were busy reading from the slot.
1557            if (offset + 1 == BLOCK_CAP)
1558                || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0)
1559            {
1560                Block::destroy(block, offset);
1561            }
1562
1563            Steal::Success(task)
1564        }
1565    }
1566
1567    /// Steals a batch of tasks and pushes them into a worker.
1568    ///
1569    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1570    /// steal around half of the tasks in the queue, but also not more than some constant limit.
1571    ///
1572    /// # Examples
1573    ///
1574    /// ```
1575    /// use crossbeam_deque::{Injector, Worker};
1576    ///
1577    /// let q = Injector::new();
1578    /// q.push(1);
1579    /// q.push(2);
1580    /// q.push(3);
1581    /// q.push(4);
1582    ///
1583    /// let w = Worker::new_fifo();
1584    /// let _ = q.steal_batch(&w);
1585    /// assert_eq!(w.pop(), Some(1));
1586    /// assert_eq!(w.pop(), Some(2));
1587    /// ```
1588    pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
1589        self.steal_batch_with_limit(dest, MAX_BATCH)
1590    }
1591
1592    /// Steals no more than of tasks and pushes them into a worker.
1593    ///
1594    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1595    /// steal around half of the tasks in the queue, but also not more than some constant limit.
1596    ///
1597    /// # Examples
1598    ///
1599    /// ```
1600    /// use crossbeam_deque::{Injector, Worker};
1601    ///
1602    /// let q = Injector::new();
1603    /// q.push(1);
1604    /// q.push(2);
1605    /// q.push(3);
1606    /// q.push(4);
1607    /// q.push(5);
1608    /// q.push(6);
1609    ///
1610    /// let w = Worker::new_fifo();
1611    /// let _ = q.steal_batch_with_limit(&w, 2);
1612    /// assert_eq!(w.pop(), Some(1));
1613    /// assert_eq!(w.pop(), Some(2));
1614    /// assert_eq!(w.pop(), None);
1615    ///
1616    /// q.push(7);
1617    /// q.push(8);
1618    /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1619    /// // half of the elements are currently popped, but the number of popped elements is considered
1620    /// // an implementation detail that may be changed in the future.
1621    /// let _ = q.steal_batch_with_limit(&w, std::usize::MAX);
1622    /// assert_eq!(w.len(), 3);
1623    /// ```
1624    pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
1625        assert!(limit > 0);
1626        let mut head;
1627        let mut block;
1628        let mut offset;
1629
1630        let backoff = Backoff::new();
1631        loop {
1632            head = self.head.index.load(Ordering::Acquire);
1633            block = self.head.block.load(Ordering::Acquire);
1634
1635            // Calculate the offset of the index into the block.
1636            offset = (head >> SHIFT) % LAP;
1637
1638            // If we reached the end of the block, wait until the next one is installed.
1639            if offset == BLOCK_CAP {
1640                backoff.snooze();
1641            } else {
1642                break;
1643            }
1644        }
1645
1646        let mut new_head = head;
1647        let advance;
1648
1649        if new_head & HAS_NEXT == 0 {
1650            atomic::fence(Ordering::SeqCst);
1651            let tail = self.tail.index.load(Ordering::Relaxed);
1652
1653            // If the tail equals the head, that means the queue is empty.
1654            if head >> SHIFT == tail >> SHIFT {
1655                return Steal::Empty;
1656            }
1657
1658            // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate
1659            // the right batch size to steal.
1660            if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1661                new_head |= HAS_NEXT;
1662                // We can steal all tasks till the end of the block.
1663                advance = (BLOCK_CAP - offset).min(limit);
1664            } else {
1665                let len = (tail - head) >> SHIFT;
1666                // Steal half of the available tasks.
1667                advance = ((len + 1) / 2).min(limit);
1668            }
1669        } else {
1670            // We can steal all tasks till the end of the block.
1671            advance = (BLOCK_CAP - offset).min(limit);
1672        }
1673
1674        new_head += advance << SHIFT;
1675        let new_offset = offset + advance;
1676
1677        // Try moving the head index forward.
1678        if self
1679            .head
1680            .index
1681            .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1682            .is_err()
1683        {
1684            return Steal::Retry;
1685        }
1686
1687        // Reserve capacity for the stolen batch.
1688        let batch_size = new_offset - offset;
1689        dest.reserve(batch_size);
1690
1691        // Get the destination buffer and back index.
1692        let dest_buffer = dest.buffer.get();
1693        let dest_b = dest.inner.back.load(Ordering::Relaxed);
1694
1695        unsafe {
1696            // If we've reached the end of the block, move to the next one.
1697            if new_offset == BLOCK_CAP {
1698                let next = (*block).wait_next();
1699                let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1700                if !(*next).next.load(Ordering::Relaxed).is_null() {
1701                    next_index |= HAS_NEXT;
1702                }
1703
1704                self.head.block.store(next, Ordering::Release);
1705                self.head.index.store(next_index, Ordering::Release);
1706            }
1707
1708            // Copy values from the injector into the destination queue.
1709            match dest.flavor {
1710                Flavor::Fifo => {
1711                    for i in 0..batch_size {
1712                        // Read the task.
1713                        let slot = (*block).slots.get_unchecked(offset + i);
1714                        slot.wait_write();
1715                        let task = slot.task.get().read();
1716
1717                        // Write it into the destination queue.
1718                        dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1719                    }
1720                }
1721
1722                Flavor::Lifo => {
1723                    for i in 0..batch_size {
1724                        // Read the task.
1725                        let slot = (*block).slots.get_unchecked(offset + i);
1726                        slot.wait_write();
1727                        let task = slot.task.get().read();
1728
1729                        // Write it into the destination queue.
1730                        dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1731                    }
1732                }
1733            }
1734
1735            atomic::fence(Ordering::Release);
1736
1737            // Update the back index in the destination queue.
1738            //
1739            // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1740            // data races because it doesn't understand fences.
1741            dest.inner
1742                .back
1743                .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1744
1745            // Destroy the block if we've reached the end, or if another thread wanted to destroy
1746            // but couldn't because we were busy reading from the slot.
1747            if new_offset == BLOCK_CAP {
1748                Block::destroy(block, offset);
1749            } else {
1750                for i in offset..new_offset {
1751                    let slot = (*block).slots.get_unchecked(i);
1752
1753                    if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1754                        Block::destroy(block, offset);
1755                        break;
1756                    }
1757                }
1758            }
1759
1760            Steal::Success(())
1761        }
1762    }
1763
1764    /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker.
1765    ///
1766    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1767    /// steal around half of the tasks in the queue, but also not more than some constant limit.
1768    ///
1769    /// # Examples
1770    ///
1771    /// ```
1772    /// use crossbeam_deque::{Injector, Steal, Worker};
1773    ///
1774    /// let q = Injector::new();
1775    /// q.push(1);
1776    /// q.push(2);
1777    /// q.push(3);
1778    /// q.push(4);
1779    ///
1780    /// let w = Worker::new_fifo();
1781    /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1));
1782    /// assert_eq!(w.pop(), Some(2));
1783    /// ```
1784    pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
1785        // TODO: we use `MAX_BATCH + 1` as the hard limit for Injecter as the performance is slightly
1786        // better, but we may change it in the future to be compatible with the same method in Stealer.
1787        self.steal_batch_with_limit_and_pop(dest, MAX_BATCH + 1)
1788    }
1789
1790    /// Steals no more than `limit` of tasks, pushes them into a worker, and pops a task from that worker.
1791    ///
1792    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1793    /// steal around half of the tasks in the queue, but also not more than the given limit.
1794    ///
1795    /// # Examples
1796    ///
1797    /// ```
1798    /// use crossbeam_deque::{Injector, Steal, Worker};
1799    ///
1800    /// let q = Injector::new();
1801    /// q.push(1);
1802    /// q.push(2);
1803    /// q.push(3);
1804    /// q.push(4);
1805    /// q.push(5);
1806    /// q.push(6);
1807    ///
1808    /// let w = Worker::new_fifo();
1809    /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, 2), Steal::Success(1));
1810    /// assert_eq!(w.pop(), Some(2));
1811    /// assert_eq!(w.pop(), None);
1812    ///
1813    /// q.push(7);
1814    /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1815    /// // half of the elements are currently popped, but the number of popped elements is considered
1816    /// // an implementation detail that may be changed in the future.
1817    /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, std::usize::MAX), Steal::Success(3));
1818    /// assert_eq!(w.pop(), Some(4));
1819    /// assert_eq!(w.pop(), Some(5));
1820    /// assert_eq!(w.pop(), None);
1821    /// ```
1822    pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
1823        assert!(limit > 0);
1824        let mut head;
1825        let mut block;
1826        let mut offset;
1827
1828        let backoff = Backoff::new();
1829        loop {
1830            head = self.head.index.load(Ordering::Acquire);
1831            block = self.head.block.load(Ordering::Acquire);
1832
1833            // Calculate the offset of the index into the block.
1834            offset = (head >> SHIFT) % LAP;
1835
1836            // If we reached the end of the block, wait until the next one is installed.
1837            if offset == BLOCK_CAP {
1838                backoff.snooze();
1839            } else {
1840                break;
1841            }
1842        }
1843
1844        let mut new_head = head;
1845        let advance;
1846
1847        if new_head & HAS_NEXT == 0 {
1848            atomic::fence(Ordering::SeqCst);
1849            let tail = self.tail.index.load(Ordering::Relaxed);
1850
1851            // If the tail equals the head, that means the queue is empty.
1852            if head >> SHIFT == tail >> SHIFT {
1853                return Steal::Empty;
1854            }
1855
1856            // If head and tail are not in the same block, set `HAS_NEXT` in head.
1857            if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1858                new_head |= HAS_NEXT;
1859                // We can steal all tasks till the end of the block.
1860                advance = (BLOCK_CAP - offset).min(limit);
1861            } else {
1862                let len = (tail - head) >> SHIFT;
1863                // Steal half of the available tasks.
1864                advance = ((len + 1) / 2).min(limit);
1865            }
1866        } else {
1867            // We can steal all tasks till the end of the block.
1868            advance = (BLOCK_CAP - offset).min(limit);
1869        }
1870
1871        new_head += advance << SHIFT;
1872        let new_offset = offset + advance;
1873
1874        // Try moving the head index forward.
1875        if self
1876            .head
1877            .index
1878            .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1879            .is_err()
1880        {
1881            return Steal::Retry;
1882        }
1883
1884        // Reserve capacity for the stolen batch.
1885        let batch_size = new_offset - offset - 1;
1886        dest.reserve(batch_size);
1887
1888        // Get the destination buffer and back index.
1889        let dest_buffer = dest.buffer.get();
1890        let dest_b = dest.inner.back.load(Ordering::Relaxed);
1891
1892        unsafe {
1893            // If we've reached the end of the block, move to the next one.
1894            if new_offset == BLOCK_CAP {
1895                let next = (*block).wait_next();
1896                let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1897                if !(*next).next.load(Ordering::Relaxed).is_null() {
1898                    next_index |= HAS_NEXT;
1899                }
1900
1901                self.head.block.store(next, Ordering::Release);
1902                self.head.index.store(next_index, Ordering::Release);
1903            }
1904
1905            // Read the task.
1906            let slot = (*block).slots.get_unchecked(offset);
1907            slot.wait_write();
1908            let task = slot.task.get().read();
1909
1910            match dest.flavor {
1911                Flavor::Fifo => {
1912                    // Copy values from the injector into the destination queue.
1913                    for i in 0..batch_size {
1914                        // Read the task.
1915                        let slot = (*block).slots.get_unchecked(offset + i + 1);
1916                        slot.wait_write();
1917                        let task = slot.task.get().read();
1918
1919                        // Write it into the destination queue.
1920                        dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1921                    }
1922                }
1923
1924                Flavor::Lifo => {
1925                    // Copy values from the injector into the destination queue.
1926                    for i in 0..batch_size {
1927                        // Read the task.
1928                        let slot = (*block).slots.get_unchecked(offset + i + 1);
1929                        slot.wait_write();
1930                        let task = slot.task.get().read();
1931
1932                        // Write it into the destination queue.
1933                        dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1934                    }
1935                }
1936            }
1937
1938            atomic::fence(Ordering::Release);
1939
1940            // Update the back index in the destination queue.
1941            //
1942            // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1943            // data races because it doesn't understand fences.
1944            dest.inner
1945                .back
1946                .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1947
1948            // Destroy the block if we've reached the end, or if another thread wanted to destroy
1949            // but couldn't because we were busy reading from the slot.
1950            if new_offset == BLOCK_CAP {
1951                Block::destroy(block, offset);
1952            } else {
1953                for i in offset..new_offset {
1954                    let slot = (*block).slots.get_unchecked(i);
1955
1956                    if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1957                        Block::destroy(block, offset);
1958                        break;
1959                    }
1960                }
1961            }
1962
1963            Steal::Success(task.assume_init())
1964        }
1965    }
1966
1967    /// Returns `true` if the queue is empty.
1968    ///
1969    /// # Examples
1970    ///
1971    /// ```
1972    /// use crossbeam_deque::Injector;
1973    ///
1974    /// let q = Injector::new();
1975    ///
1976    /// assert!(q.is_empty());
1977    /// q.push(1);
1978    /// assert!(!q.is_empty());
1979    /// ```
1980    pub fn is_empty(&self) -> bool {
1981        let head = self.head.index.load(Ordering::SeqCst);
1982        let tail = self.tail.index.load(Ordering::SeqCst);
1983        head >> SHIFT == tail >> SHIFT
1984    }
1985
1986    /// Returns the number of tasks in the queue.
1987    ///
1988    /// # Examples
1989    ///
1990    /// ```
1991    /// use crossbeam_deque::Injector;
1992    ///
1993    /// let q = Injector::new();
1994    ///
1995    /// assert_eq!(q.len(), 0);
1996    /// q.push(1);
1997    /// assert_eq!(q.len(), 1);
1998    /// q.push(1);
1999    /// assert_eq!(q.len(), 2);
2000    /// ```
2001    pub fn len(&self) -> usize {
2002        loop {
2003            // Load the tail index, then load the head index.
2004            let mut tail = self.tail.index.load(Ordering::SeqCst);
2005            let mut head = self.head.index.load(Ordering::SeqCst);
2006
2007            // If the tail index didn't change, we've got consistent indices to work with.
2008            if self.tail.index.load(Ordering::SeqCst) == tail {
2009                // Erase the lower bits.
2010                tail &= !((1 << SHIFT) - 1);
2011                head &= !((1 << SHIFT) - 1);
2012
2013                // Fix up indices if they fall onto block ends.
2014                if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
2015                    tail = tail.wrapping_add(1 << SHIFT);
2016                }
2017                if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
2018                    head = head.wrapping_add(1 << SHIFT);
2019                }
2020
2021                // Rotate indices so that head falls into the first block.
2022                let lap = (head >> SHIFT) / LAP;
2023                tail = tail.wrapping_sub((lap * LAP) << SHIFT);
2024                head = head.wrapping_sub((lap * LAP) << SHIFT);
2025
2026                // Remove the lower bits.
2027                tail >>= SHIFT;
2028                head >>= SHIFT;
2029
2030                // Return the difference minus the number of blocks between tail and head.
2031                return tail - head - tail / LAP;
2032            }
2033        }
2034    }
2035}
2036
2037impl<T> Drop for Injector<T> {
2038    fn drop(&mut self) {
2039        let mut head = *self.head.index.get_mut();
2040        let mut tail = *self.tail.index.get_mut();
2041        let mut block = *self.head.block.get_mut();
2042
2043        // Erase the lower bits.
2044        head &= !((1 << SHIFT) - 1);
2045        tail &= !((1 << SHIFT) - 1);
2046
2047        unsafe {
2048            // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
2049            while head != tail {
2050                let offset = (head >> SHIFT) % LAP;
2051
2052                if offset < BLOCK_CAP {
2053                    // Drop the task in the slot.
2054                    let slot = (*block).slots.get_unchecked(offset);
2055                    (*slot.task.get()).assume_init_drop();
2056                } else {
2057                    // Deallocate the block and move to the next one.
2058                    let next = *(*block).next.get_mut();
2059                    drop(Box::from_raw(block));
2060                    block = next;
2061                }
2062
2063                head = head.wrapping_add(1 << SHIFT);
2064            }
2065
2066            // Deallocate the last remaining block.
2067            drop(Box::from_raw(block));
2068        }
2069    }
2070}
2071
2072impl<T> fmt::Debug for Injector<T> {
2073    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2074        f.pad("Worker { .. }")
2075    }
2076}
2077
2078/// Possible outcomes of a steal operation.
2079///
2080/// # Examples
2081///
2082/// There are lots of ways to chain results of steal operations together:
2083///
2084/// ```
2085/// use crossbeam_deque::Steal::{self, Empty, Retry, Success};
2086///
2087/// let collect = |v: Vec<Steal<i32>>| v.into_iter().collect::<Steal<i32>>();
2088///
2089/// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty);
2090/// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry);
2091/// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1));
2092///
2093/// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry);
2094/// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1));
2095/// ```
2096#[must_use]
2097#[derive(PartialEq, Eq, Copy, Clone)]
2098pub enum Steal<T> {
2099    /// The queue was empty at the time of stealing.
2100    Empty,
2101
2102    /// At least one task was successfully stolen.
2103    Success(T),
2104
2105    /// The steal operation needs to be retried.
2106    Retry,
2107}
2108pub enum StealStatus<T> {
2109    Empty,
2110    Retry,
2111    Success { task: T, was_last: bool },
2112}
2113
2114impl<T> Steal<T> {
2115    /// Returns `true` if the queue was empty at the time of stealing.
2116    ///
2117    /// # Examples
2118    ///
2119    /// ```
2120    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2121    ///
2122    /// assert!(!Success(7).is_empty());
2123    /// assert!(!Retry::<i32>.is_empty());
2124    ///
2125    /// assert!(Empty::<i32>.is_empty());
2126    /// ```
2127    pub fn is_empty(&self) -> bool {
2128        match self {
2129            Steal::Empty => true,
2130            _ => false,
2131        }
2132    }
2133
2134    /// Returns `true` if at least one task was stolen.
2135    ///
2136    /// # Examples
2137    ///
2138    /// ```
2139    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2140    ///
2141    /// assert!(!Empty::<i32>.is_success());
2142    /// assert!(!Retry::<i32>.is_success());
2143    ///
2144    /// assert!(Success(7).is_success());
2145    /// ```
2146    pub fn is_success(&self) -> bool {
2147        match self {
2148            Steal::Success(_) => true,
2149            _ => false,
2150        }
2151    }
2152
2153    /// Returns `true` if the steal operation needs to be retried.
2154    ///
2155    /// # Examples
2156    ///
2157    /// ```
2158    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2159    ///
2160    /// assert!(!Empty::<i32>.is_retry());
2161    /// assert!(!Success(7).is_retry());
2162    ///
2163    /// assert!(Retry::<i32>.is_retry());
2164    /// ```
2165    pub fn is_retry(&self) -> bool {
2166        match self {
2167            Steal::Retry => true,
2168            _ => false,
2169        }
2170    }
2171
2172    /// Returns the result of the operation, if successful.
2173    ///
2174    /// # Examples
2175    ///
2176    /// ```
2177    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2178    ///
2179    /// assert_eq!(Empty::<i32>.success(), None);
2180    /// assert_eq!(Retry::<i32>.success(), None);
2181    ///
2182    /// assert_eq!(Success(7).success(), Some(7));
2183    /// ```
2184    pub fn success(self) -> Option<T> {
2185        match self {
2186            Steal::Success(res) => Some(res),
2187            _ => None,
2188        }
2189    }
2190
2191    /// If no task was stolen, attempts another steal operation.
2192    ///
2193    /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then:
2194    ///
2195    /// * If the second steal resulted in `Success`, it is returned.
2196    /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned.
2197    /// * If both resulted in `None`, then `None` is returned.
2198    ///
2199    /// # Examples
2200    ///
2201    /// ```
2202    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2203    ///
2204    /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1));
2205    /// assert_eq!(Retry.or_else(|| Success(2)), Success(2));
2206    ///
2207    /// assert_eq!(Retry.or_else(|| Empty), Retry::<i32>);
2208    /// assert_eq!(Empty.or_else(|| Retry), Retry::<i32>);
2209    ///
2210    /// assert_eq!(Empty.or_else(|| Empty), Empty::<i32>);
2211    /// ```
2212    pub fn or_else<F>(self, f: F) -> Steal<T>
2213    where
2214        F: FnOnce() -> Steal<T>,
2215    {
2216        match self {
2217            Steal::Empty => f(),
2218            Steal::Success(_) => self,
2219            Steal::Retry => {
2220                if let Steal::Success(res) = f() {
2221                    Steal::Success(res)
2222                } else {
2223                    Steal::Retry
2224                }
2225            }
2226        }
2227    }
2228}
2229
2230impl<T> fmt::Debug for Steal<T> {
2231    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2232        match self {
2233            Steal::Empty => f.pad("Empty"),
2234            Steal::Success(_) => f.pad("Success(..)"),
2235            Steal::Retry => f.pad("Retry"),
2236        }
2237    }
2238}
2239
2240impl<T> FromIterator<Steal<T>> for Steal<T> {
2241    /// Consumes items until a `Success` is found and returns it.
2242    ///
2243    /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`.
2244    /// Otherwise, `Empty` is returned.
2245    fn from_iter<I>(iter: I) -> Steal<T>
2246    where
2247        I: IntoIterator<Item = Steal<T>>,
2248    {
2249        let mut retry = false;
2250        for s in iter {
2251            match &s {
2252                Steal::Empty => {}
2253                Steal::Success(_) => return s,
2254                Steal::Retry => retry = true,
2255            }
2256        }
2257
2258        if retry { Steal::Retry } else { Steal::Empty }
2259    }
2260}