bastion_executor/
run_queue.rs

1//! Concurrent work-stealing deques with proportional stealing enabled.
2//!
3//! Adapted from [crossbeam-deque].
4//!
5//! These data structures are most commonly used in work-stealing schedulers. The typical setup
6//! involves a number of threads, each having its own FIFO or LIFO queue (*worker*). There is also
7//! one global FIFO queue (*injector*) and a list of references to *worker* queues that are able to
8//! steal tasks (*stealers*).
9//!
10//! We spawn a new task onto the scheduler by pushing it into the *injector* queue. Each worker
11//! thread waits in a loop until it finds the next task to run and then runs it. To find a task, it
12//! first looks into its local *worker* queue, and then into the *injector* and *stealers*.
13//!
14//! # Queues
15//!
16//! [`Injector`] is a FIFO queue, where tasks are pushed and stolen from opposite ends. It is
17//! shared among threads and is usually the entry point for new tasks.
18//!
19//! [`Worker`] has two constructors:
20//!
21//! * [`new_fifo`] - Creates a FIFO queue, in which tasks are pushed and popped from opposite
22//!   ends.
23//! * [`new_lifo`] - Creates a LIFO queue, in which tasks are pushed and popped from the same
24//!   end.
25//!
26//! Each [`Worker`] is owned by a single thread and supports only push and pop operations.
27//!
28//! Method [`stealer`] creates a [`Stealer`] that may be shared among threads and can only steal
29//! tasks from its [`Worker`]. Tasks are stolen from the end opposite to where they get pushed.
30//!
31//! # Stealing
32//!
33//! Steal operations come in three flavors:
34//!
35//! 1. [`steal`] - Steals one task.
36//! 2. [`steal_batch`] - Steals a batch of tasks and moves them into another worker.
37//! 3. [`steal_batch_and_pop`] - Steals a batch of tasks, moves them into another queue, and pops
38//!    one task from that worker.
39//!
40//! In contrast to push and pop operations, stealing can spuriously fail with [`Steal::Retry`], in
41//! which case the steal operation needs to be retried.
42//!
43//! [`new_fifo`]: Worker::new_fifo
44//! [`new_lifo`]: Worker::new_lifo
45//! [`stealer`]: Worker::stealer
46//! [`steal`]: Stealer::steal
47//! [`steal_batch`]: Stealer::steal_batch
48//! [`steal_batch_and_pop`]: Stealer::steal_batch_and_pop
49use crossbeam_epoch::{self as epoch, Atomic, Owned};
50use crossbeam_utils::{Backoff, CachePadded};
51use std::cell::{Cell, UnsafeCell};
52use std::iter::FromIterator;
53use std::marker::PhantomData;
54use std::mem::{self, ManuallyDrop};
55use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
56use std::sync::Arc;
57use std::{cmp, fmt, ptr};
58
59// Minimum buffer capacity.
60const MIN_CAP: usize = 64;
61// Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`.
62const MAX_BATCH: usize = 32;
63// If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets
64// deallocated as soon as possible.
65const FLUSH_THRESHOLD_BYTES: usize = 1 << 10;
66
67/// A buffer that holds tasks in a worker queue.
68///
69/// This is just a pointer to the buffer and its length - dropping an instance of this struct will
70/// *not* deallocate the buffer.
71struct Buffer<T> {
72    /// Pointer to the allocated memory.
73    ptr: *mut T,
74
75    /// Capacity of the buffer. Always a power of two.
76    cap: usize,
77}
78
79unsafe impl<T> Send for Buffer<T> {}
80
81impl<T> Buffer<T> {
82    /// Allocates a new buffer with the specified capacity.
83    fn alloc(cap: usize) -> Buffer<T> {
84        debug_assert_eq!(cap, cap.next_power_of_two());
85
86        let mut v = Vec::with_capacity(cap);
87        let ptr = v.as_mut_ptr();
88        mem::forget(v);
89
90        Buffer { ptr, cap }
91    }
92
93    /// Deallocates the buffer.
94    unsafe fn dealloc(self) {
95        drop(Vec::from_raw_parts(self.ptr, 0, self.cap));
96    }
97
98    /// Returns a pointer to the task at the specified `index`.
99    unsafe fn at(&self, index: isize) -> *mut T {
100        // `self.cap` is always a power of two.
101        self.ptr.offset(index & (self.cap - 1) as isize)
102    }
103
104    /// Writes `task` into the specified `index`.
105    ///
106    /// This method might be concurrently called with another `read` at the same index, which is
107    /// technically speaking a data race and therefore UB. We should use an atomic store here, but
108    /// that would be more expensive and difficult to implement generically for all types `T`.
109    /// Hence, as a hack, we use a volatile write instead.
110    unsafe fn write(&self, index: isize, task: T) {
111        ptr::write_volatile(self.at(index), task)
112    }
113
114    /// Reads a task from the specified `index`.
115    ///
116    /// This method might be concurrently called with another `write` at the same index, which is
117    /// technically speaking a data race and therefore UB. We should use an atomic load here, but
118    /// that would be more expensive and difficult to implement generically for all types `T`.
119    /// Hence, as a hack, we use a volatile write instead.
120    unsafe fn read(&self, index: isize) -> T {
121        ptr::read_volatile(self.at(index))
122    }
123}
124
125impl<T> Clone for Buffer<T> {
126    fn clone(&self) -> Buffer<T> {
127        Buffer {
128            ptr: self.ptr,
129            cap: self.cap,
130        }
131    }
132}
133
134impl<T> Copy for Buffer<T> {}
135
136/// Internal queue data shared between the worker and stealers.
137///
138/// The implementation is based on the following work:
139///
140/// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev]
141/// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models.
142///    PPoPP 2013.][weak-mem]
143/// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++
144///    atomics. OOPSLA 2013.][checker]
145///
146/// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974
147/// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524
148/// [checker]: https://dl.acm.org/citation.cfm?id=2509514
149struct Inner<T> {
150    /// The front index.
151    front: AtomicIsize,
152
153    /// The back index.
154    back: AtomicIsize,
155
156    /// The underlying buffer.
157    buffer: CachePadded<Atomic<Buffer<T>>>,
158}
159
160impl<T> Drop for Inner<T> {
161    fn drop(&mut self) {
162        // Load the back index, front index, and buffer.
163        let b = self.back.load(Ordering::Relaxed);
164        let f = self.front.load(Ordering::Relaxed);
165
166        unsafe {
167            let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected());
168
169            // Go through the buffer from front to back and drop all tasks in the queue.
170            let mut i = f;
171            while i != b {
172                ptr::drop_in_place(buffer.deref().at(i));
173                i = i.wrapping_add(1);
174            }
175
176            // Free the memory allocated by the buffer.
177            buffer.into_owned().into_box().dealloc();
178        }
179    }
180}
181
182/// Worker queue flavor: FIFO or LIFO.
183#[derive(Clone, Copy, Debug, Eq, PartialEq)]
184enum Flavor {
185    /// The first-in first-out flavor.
186    Fifo,
187
188    /// The last-in first-out flavor.
189    Lifo,
190}
191
192/// A worker queue.
193///
194/// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal
195/// tasks from it. Task schedulers typically create a single worker queue per thread.
196pub struct Worker<T> {
197    /// A reference to the inner representation of the queue.
198    inner: Arc<CachePadded<Inner<T>>>,
199
200    /// A copy of `inner.buffer` for quick access.
201    buffer: Cell<Buffer<T>>,
202
203    /// The flavor of the queue.
204    flavor: Flavor,
205
206    /// Indicates that the worker cannot be shared among threads.
207    _marker: PhantomData<*mut ()>, // !Send + !Sync
208}
209
210unsafe impl<T: Send> Send for Worker<T> {}
211
212impl<T> Worker<T> {
213    /// Creates a FIFO worker queue.
214    pub fn new_fifo() -> Worker<T> {
215        let buffer = Buffer::alloc(MIN_CAP);
216
217        let inner = Arc::new(CachePadded::new(Inner {
218            front: AtomicIsize::new(0),
219            back: AtomicIsize::new(0),
220            buffer: CachePadded::new(Atomic::new(buffer)),
221        }));
222
223        Worker {
224            inner,
225            buffer: Cell::new(buffer),
226            flavor: Flavor::Fifo,
227            _marker: PhantomData,
228        }
229    }
230
231    /// Creates a LIFO worker queue.
232    pub fn new_lifo() -> Worker<T> {
233        let buffer = Buffer::alloc(MIN_CAP);
234
235        let inner = Arc::new(CachePadded::new(Inner {
236            front: AtomicIsize::new(0),
237            back: AtomicIsize::new(0),
238            buffer: CachePadded::new(Atomic::new(buffer)),
239        }));
240
241        Worker {
242            inner,
243            buffer: Cell::new(buffer),
244            flavor: Flavor::Lifo,
245            _marker: PhantomData,
246        }
247    }
248
249    /// Get the worker's run queue size
250    pub fn worker_run_queue_size(&self) -> usize {
251        let b = self.inner.back.load(Ordering::Relaxed);
252        let f = self.inner.front.load(Ordering::SeqCst);
253        match b.wrapping_sub(f) {
254            x if x <= 0 => 0_usize,
255            y => y as usize,
256        }
257    }
258
259    /// Creates a stealer for this queue.
260    ///
261    /// The returned stealer can be shared among threads and cloned.
262    pub fn stealer(&self) -> Stealer<T> {
263        Stealer {
264            inner: self.inner.clone(),
265            flavor: self.flavor,
266        }
267    }
268
269    /// Resizes the internal buffer to the new capacity of `new_cap`.
270    #[cold]
271    unsafe fn resize(&self, new_cap: usize) {
272        // Load the back index, front index, and buffer.
273        let b = self.inner.back.load(Ordering::Relaxed);
274        let f = self.inner.front.load(Ordering::Relaxed);
275        let buffer = self.buffer.get();
276
277        // Allocate a new buffer and copy data from the old buffer to the new one.
278        let new = Buffer::alloc(new_cap);
279        let mut i = f;
280        while i != b {
281            ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1);
282            i = i.wrapping_add(1);
283        }
284
285        let guard = &epoch::pin();
286
287        // Replace the old buffer with the new one.
288        self.buffer.replace(new);
289        let old =
290            self.inner
291                .buffer
292                .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard);
293
294        // Destroy the old buffer later.
295        guard.defer_unchecked(move || old.into_owned().into_box().dealloc());
296
297        // If the buffer is very large, then flush the thread-local garbage in order to deallocate
298        // it as soon as possible.
299        if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES {
300            guard.flush();
301        }
302    }
303
304    /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the
305    /// buffer.
306    fn reserve(&self, reserve_cap: usize) {
307        if reserve_cap > 0 {
308            // Compute the current length.
309            let b = self.inner.back.load(Ordering::Relaxed);
310            let f = self.inner.front.load(Ordering::SeqCst);
311            let len = b.wrapping_sub(f) as usize;
312
313            // The current capacity.
314            let cap = self.buffer.get().cap;
315
316            // Is there enough capacity to push `reserve_cap` tasks?
317            if cap.saturating_sub(len) < reserve_cap {
318                // Keep doubling the capacity as much as is needed.
319                let mut new_cap = cap * 2;
320                while new_cap.saturating_sub(len) < reserve_cap {
321                    new_cap = new_cap.wrapping_mul(2);
322                }
323
324                // Resize the buffer.
325                unsafe {
326                    self.resize(new_cap);
327                }
328            }
329        }
330    }
331
332    /// Returns `true` if the queue is empty.
333    pub fn is_empty(&self) -> bool {
334        let b = self.inner.back.load(Ordering::Relaxed);
335        let f = self.inner.front.load(Ordering::SeqCst);
336        b.wrapping_sub(f) <= 0
337    }
338
339    /// Pushes a task into the queue.
340    pub fn push(&self, task: T) {
341        // Load the back index, front index, and buffer.
342        let b = self.inner.back.load(Ordering::Relaxed);
343        let f = self.inner.front.load(Ordering::Acquire);
344        let mut buffer = self.buffer.get();
345
346        // Calculate the length of the queue.
347        let len = b.wrapping_sub(f);
348
349        // Is the queue full?
350        if len >= buffer.cap as isize {
351            // Yes. Grow the underlying buffer.
352            unsafe {
353                self.resize(2 * buffer.cap);
354            }
355            buffer = self.buffer.get();
356        }
357
358        // Write `task` into the slot.
359        unsafe {
360            buffer.write(b, task);
361        }
362
363        atomic::fence(Ordering::Release);
364
365        // Increment the back index.
366        //
367        // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
368        // races because it doesn't understand fences.
369        self.inner.back.store(b.wrapping_add(1), Ordering::Release);
370    }
371
372    /// Pops a task from the queue.
373    pub fn pop(&self) -> Option<T> {
374        // Load the back and front index.
375        let b = self.inner.back.load(Ordering::Relaxed);
376        let f = self.inner.front.load(Ordering::Relaxed);
377
378        // Calculate the length of the queue.
379        let len = b.wrapping_sub(f);
380
381        // Is the queue empty?
382        if len <= 0 {
383            return None;
384        }
385
386        match self.flavor {
387            // Pop from the front of the queue.
388            Flavor::Fifo => {
389                // Try incrementing the front index to pop the task.
390                let f = self.inner.front.fetch_add(1, Ordering::SeqCst);
391                let new_f = f.wrapping_add(1);
392
393                if b.wrapping_sub(new_f) < 0 {
394                    self.inner.front.store(f, Ordering::Relaxed);
395                    return None;
396                }
397
398                unsafe {
399                    // Read the popped task.
400                    let buffer = self.buffer.get();
401                    let task = buffer.read(f);
402
403                    // Shrink the buffer if `len - 1` is less than one fourth of the capacity.
404                    if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
405                        self.resize(buffer.cap / 2);
406                    }
407
408                    Some(task)
409                }
410            }
411
412            // Pop from the back of the queue.
413            Flavor::Lifo => {
414                // Decrement the back index.
415                let b = b.wrapping_sub(1);
416                self.inner.back.store(b, Ordering::Relaxed);
417
418                atomic::fence(Ordering::SeqCst);
419
420                // Load the front index.
421                let f = self.inner.front.load(Ordering::Relaxed);
422
423                // Compute the length after the back index was decremented.
424                let len = b.wrapping_sub(f);
425
426                if len < 0 {
427                    // The queue is empty. Restore the back index to the original task.
428                    self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
429                    None
430                } else {
431                    // Read the task to be popped.
432                    let buffer = self.buffer.get();
433                    let mut task = unsafe { Some(buffer.read(b)) };
434
435                    // Are we popping the last task from the queue?
436                    if len == 0 {
437                        // Try incrementing the front index.
438                        if self
439                            .inner
440                            .front
441                            .compare_exchange(
442                                f,
443                                f.wrapping_add(1),
444                                Ordering::SeqCst,
445                                Ordering::Relaxed,
446                            )
447                            .is_err()
448                        {
449                            // Failed. We didn't pop anything.
450                            mem::forget(task.take());
451                        }
452
453                        // Restore the back index to the original task.
454                        self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
455                    } else {
456                        // Shrink the buffer if `len` is less than one fourth of the capacity.
457                        if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 {
458                            unsafe {
459                                self.resize(buffer.cap / 2);
460                            }
461                        }
462                    }
463
464                    task
465                }
466            }
467        }
468    }
469}
470
471impl<T> fmt::Debug for Worker<T> {
472    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
473        f.pad("Worker { .. }")
474    }
475}
476
477/// A stealer handle of a worker queue.
478///
479/// Stealers can be shared among threads.
480///
481/// Task schedulers typically have a single worker queue per worker thread.
482pub struct Stealer<T> {
483    /// A reference to the inner representation of the queue.
484    inner: Arc<CachePadded<Inner<T>>>,
485
486    /// The flavor of the queue.
487    flavor: Flavor,
488}
489
490unsafe impl<T: Send> Send for Stealer<T> {}
491unsafe impl<T: Send> Sync for Stealer<T> {}
492
493impl<T> Stealer<T> {
494    /// Returns `true` if the queue is empty.
495    pub fn is_empty(&self) -> bool {
496        let f = self.inner.front.load(Ordering::Acquire);
497        atomic::fence(Ordering::SeqCst);
498        let b = self.inner.back.load(Ordering::Acquire);
499        b.wrapping_sub(f) <= 0
500    }
501
502    /// Returns back the run queue size for the current worker through stealer
503    pub fn run_queue_size(&self) -> usize {
504        let b = self.inner.back.load(Ordering::Acquire);
505        atomic::fence(Ordering::SeqCst);
506        let f = self.inner.front.load(Ordering::Acquire);
507        match b.wrapping_sub(f) {
508            x if x <= 0 => 0_usize,
509            y => y as usize,
510        }
511    }
512
513    /// Steals a task from the queue.
514    pub fn steal(&self) -> Steal<T> {
515        // Load the front index.
516        let f = self.inner.front.load(Ordering::Acquire);
517
518        // A SeqCst fence is needed here.
519        //
520        // If the current thread is already pinned (reentrantly), we must manually issue the
521        // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
522        // have to.
523        if epoch::is_pinned() {
524            atomic::fence(Ordering::SeqCst);
525        }
526
527        let guard = &epoch::pin();
528
529        // Load the back index.
530        let b = self.inner.back.load(Ordering::Acquire);
531
532        // Is the queue empty?
533        if b.wrapping_sub(f) <= 0 {
534            return Steal::Empty;
535        }
536
537        // Load the buffer and read the task at the front.
538        let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
539        let task = unsafe { buffer.deref().read(f) };
540
541        // Try incrementing the front index to steal the task.
542        if self
543            .inner
544            .front
545            .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
546            .is_err()
547        {
548            // We didn't steal this task, forget it.
549            mem::forget(task);
550            return Steal::Retry;
551        }
552
553        // Return the stolen task.
554        Steal::Success(task)
555    }
556
557    /// Steals a batch of tasks and pushes them into another worker.
558    ///
559    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
560    /// steal around half of the tasks in the queue, but also not more than some constant limit.
561    pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
562        // Load the front index.
563        let mut f = self.inner.front.load(Ordering::Acquire);
564
565        // A SeqCst fence is needed here.
566        //
567        // If the current thread is already pinned (reentrantly), we must manually issue the
568        // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
569        // have to.
570        if epoch::is_pinned() {
571            atomic::fence(Ordering::SeqCst);
572        }
573
574        let guard = &epoch::pin();
575
576        // Load the back index.
577        let b = self.inner.back.load(Ordering::Acquire);
578
579        // Is the queue empty?
580        let len = b.wrapping_sub(f);
581        if len <= 0 {
582            return Steal::Empty;
583        }
584
585        // Reserve capacity for the stolen batch.
586        let batch_size = cmp::min((len as usize + 1) / 2, MAX_BATCH);
587        dest.reserve(batch_size);
588        let mut batch_size = batch_size as isize;
589
590        // Get the destination buffer and back index.
591        let dest_buffer = dest.buffer.get();
592        let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
593
594        // Load the buffer.
595        let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
596
597        match self.flavor {
598            // Steal a batch of tasks from the front at once.
599            Flavor::Fifo => {
600                // Copy the batch from the source to the destination buffer.
601                match dest.flavor {
602                    Flavor::Fifo => {
603                        for i in 0..batch_size {
604                            unsafe {
605                                let task = buffer.deref().read(f.wrapping_add(i));
606                                dest_buffer.write(dest_b.wrapping_add(i), task);
607                            }
608                        }
609                    }
610                    Flavor::Lifo => {
611                        for i in 0..batch_size {
612                            unsafe {
613                                let task = buffer.deref().read(f.wrapping_add(i));
614                                dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
615                            }
616                        }
617                    }
618                }
619
620                // Try incrementing the front index to steal the batch.
621                if self
622                    .inner
623                    .front
624                    .compare_exchange(
625                        f,
626                        f.wrapping_add(batch_size),
627                        Ordering::SeqCst,
628                        Ordering::Relaxed,
629                    )
630                    .is_err()
631                {
632                    return Steal::Retry;
633                }
634
635                dest_b = dest_b.wrapping_add(batch_size);
636            }
637
638            // Steal a batch of tasks from the front one by one.
639            Flavor::Lifo => {
640                let size = batch_size;
641                for i in 0..size {
642                    // If this is not the first steal, check whether the queue is empty.
643                    if i > 0 {
644                        // We've already got the current front index. Now execute the fence to
645                        // synchronize with other threads.
646                        atomic::fence(Ordering::SeqCst);
647
648                        // Load the back index.
649                        let b = self.inner.back.load(Ordering::Acquire);
650
651                        // Is the queue empty?
652                        if b.wrapping_sub(f) <= 0 {
653                            batch_size = i;
654                            break;
655                        }
656                    }
657
658                    // Read the task at the front.
659                    let task = unsafe { buffer.deref().read(f) };
660
661                    // Try incrementing the front index to steal the task.
662                    if self
663                        .inner
664                        .front
665                        .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
666                        .is_err()
667                    {
668                        // We didn't steal this task, forget it and break from the loop.
669                        mem::forget(task);
670                        batch_size = i;
671                        break;
672                    }
673
674                    // Write the stolen task into the destination buffer.
675                    unsafe {
676                        dest_buffer.write(dest_b, task);
677                    }
678
679                    // Move the source front index and the destination back index one step forward.
680                    f = f.wrapping_add(1);
681                    dest_b = dest_b.wrapping_add(1);
682                }
683
684                // If we didn't steal anything, the operation needs to be retried.
685                if batch_size == 0 {
686                    return Steal::Retry;
687                }
688
689                // If stealing into a FIFO queue, stolen tasks need to be reversed.
690                if dest.flavor == Flavor::Fifo {
691                    for i in 0..batch_size / 2 {
692                        unsafe {
693                            let i1 = dest_b.wrapping_sub(batch_size - i);
694                            let i2 = dest_b.wrapping_sub(i + 1);
695                            let t1 = dest_buffer.read(i1);
696                            let t2 = dest_buffer.read(i2);
697                            dest_buffer.write(i1, t2);
698                            dest_buffer.write(i2, t1);
699                        }
700                    }
701                }
702            }
703        }
704
705        atomic::fence(Ordering::Release);
706
707        // Update the back index in the destination queue.
708        //
709        // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
710        // races because it doesn't understand fences.
711        dest.inner.back.store(dest_b, Ordering::Release);
712
713        // Return with success.
714        Steal::Success(())
715    }
716
717    /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker.
718    ///
719    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
720    /// steal around half of the tasks in the queue, but also not more than some constant limit.
721    pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
722        // Load the front index.
723        let mut f = self.inner.front.load(Ordering::Acquire);
724
725        // A SeqCst fence is needed here.
726        //
727        // If the current thread is already pinned (reentrantly), we must manually issue the
728        // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
729        // have to.
730        if epoch::is_pinned() {
731            atomic::fence(Ordering::SeqCst);
732        }
733
734        let guard = &epoch::pin();
735
736        // Load the back index.
737        let b = self.inner.back.load(Ordering::Acquire);
738
739        // Is the queue empty?
740        let len = b.wrapping_sub(f);
741        if len <= 0 {
742            return Steal::Empty;
743        }
744
745        // Reserve capacity for the stolen batch.
746        let batch_size = cmp::min((len as usize - 1) / 2, MAX_BATCH - 1);
747        dest.reserve(batch_size);
748        let mut batch_size = batch_size as isize;
749
750        // Get the destination buffer and back index.
751        let dest_buffer = dest.buffer.get();
752        let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
753
754        // Load the buffer
755        let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
756
757        // Read the task at the front.
758        let mut task = unsafe { buffer.deref().read(f) };
759
760        match self.flavor {
761            // Steal a batch of tasks from the front at once.
762            Flavor::Fifo => {
763                // Copy the batch from the source to the destination buffer.
764                match dest.flavor {
765                    Flavor::Fifo => {
766                        for i in 0..batch_size {
767                            unsafe {
768                                let task = buffer.deref().read(f.wrapping_add(i + 1));
769                                dest_buffer.write(dest_b.wrapping_add(i), task);
770                            }
771                        }
772                    }
773                    Flavor::Lifo => {
774                        for i in 0..batch_size {
775                            unsafe {
776                                let task = buffer.deref().read(f.wrapping_add(i + 1));
777                                dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
778                            }
779                        }
780                    }
781                }
782
783                // Try incrementing the front index to steal the batch.
784                if self
785                    .inner
786                    .front
787                    .compare_exchange(
788                        f,
789                        f.wrapping_add(batch_size + 1),
790                        Ordering::SeqCst,
791                        Ordering::Relaxed,
792                    )
793                    .is_err()
794                {
795                    // We didn't steal this task, forget it.
796                    mem::forget(task);
797                    return Steal::Retry;
798                }
799
800                dest_b = dest_b.wrapping_add(batch_size);
801            }
802
803            // Steal a batch of tasks from the front one by one.
804            Flavor::Lifo => {
805                // Try incrementing the front index to steal the task.
806                if self
807                    .inner
808                    .front
809                    .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
810                    .is_err()
811                {
812                    // We didn't steal this task, forget it.
813                    mem::forget(task);
814                    return Steal::Retry;
815                }
816
817                // Move the front index one step forward.
818                f = f.wrapping_add(1);
819
820                // Repeat the same procedure for the batch steals.
821                let size = batch_size;
822                for i in 0..size {
823                    // We've already got the current front index. Now execute the fence to
824                    // synchronize with other threads.
825                    atomic::fence(Ordering::SeqCst);
826
827                    // Load the back index.
828                    let b = self.inner.back.load(Ordering::Acquire);
829
830                    // Is the queue empty?
831                    if b.wrapping_sub(f) <= 0 {
832                        batch_size = i;
833                        break;
834                    }
835
836                    // Read the task at the front.
837                    let tmp = unsafe { buffer.deref().read(f) };
838
839                    // Try incrementing the front index to steal the task.
840                    if self
841                        .inner
842                        .front
843                        .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
844                        .is_err()
845                    {
846                        // We didn't steal this task, forget it and break from the loop.
847                        mem::forget(tmp);
848                        batch_size = i;
849                        break;
850                    }
851
852                    // Write the previously stolen task into the destination buffer.
853                    unsafe {
854                        dest_buffer.write(dest_b, mem::replace(&mut task, tmp));
855                    }
856
857                    // Move the source front index and the destination back index one step forward.
858                    f = f.wrapping_add(1);
859                    dest_b = dest_b.wrapping_add(1);
860                }
861
862                // If stealing into a FIFO queue, stolen tasks need to be reversed.
863                if dest.flavor == Flavor::Fifo {
864                    for i in 0..batch_size / 2 {
865                        unsafe {
866                            let i1 = dest_b.wrapping_sub(batch_size - i);
867                            let i2 = dest_b.wrapping_sub(i + 1);
868                            let t1 = dest_buffer.read(i1);
869                            let t2 = dest_buffer.read(i2);
870                            dest_buffer.write(i1, t2);
871                            dest_buffer.write(i2, t1);
872                        }
873                    }
874                }
875            }
876        }
877
878        atomic::fence(Ordering::Release);
879
880        // Update the back index in the destination queue.
881        //
882        // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
883        // races because it doesn't understand fences.
884        dest.inner.back.store(dest_b, Ordering::Release);
885
886        // Return with success.
887        Steal::Success(task)
888    }
889
890    ///
891    /// Steals a batch of tasks and amend to the given worker's run queue with the specified amount.
892    /// If requested amount is above the threshold of time-ahead it defaults to time-ahead amount.
893    pub fn steal_batch_and_pop_with_amount(&self, dest: &Worker<T>, amount: usize) -> Steal<T> {
894        // Load the front index.
895        let mut f = self.inner.front.load(Ordering::Acquire);
896
897        // A SeqCst fence is needed here.
898        //
899        // If the current thread is already pinned (reentrantly), we must manually issue the
900        // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
901        // have to.
902        if epoch::is_pinned() {
903            atomic::fence(Ordering::SeqCst);
904        }
905
906        let guard = &epoch::pin();
907
908        // Shadow the requested amount;
909        let mut amount = amount;
910
911        // Load the back index.
912        let b = self.inner.back.load(Ordering::Acquire);
913
914        // Is the queue empty?
915        let len = b.wrapping_sub(f);
916        if len <= 0 {
917            return Steal::Empty;
918        }
919
920        let default_time_ahead = (len as usize - 1) / 2;
921        if amount > default_time_ahead as usize {
922            amount = default_time_ahead;
923        }
924
925        // Reserve capacity for the stolen batch.
926        let batch_size = cmp::min(amount, MAX_BATCH - 1);
927        dest.reserve(batch_size);
928        let mut batch_size = batch_size as isize;
929
930        // Get the destination buffer and back index.
931        let dest_buffer = dest.buffer.get();
932        let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
933
934        // Load the buffer
935        let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
936
937        // Read the task at the front.
938        let mut task = unsafe { buffer.deref().read(f) };
939
940        match self.flavor {
941            // Steal a batch of tasks from the front at once.
942            Flavor::Fifo => {
943                // Copy the batch from the source to the destination buffer.
944                match dest.flavor {
945                    Flavor::Fifo => {
946                        for i in 0..batch_size {
947                            unsafe {
948                                let task = buffer.deref().read(f.wrapping_add(i + 1));
949                                dest_buffer.write(dest_b.wrapping_add(i), task);
950                            }
951                        }
952                    }
953                    Flavor::Lifo => {
954                        for i in 0..batch_size {
955                            unsafe {
956                                let task = buffer.deref().read(f.wrapping_add(i + 1));
957                                dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
958                            }
959                        }
960                    }
961                }
962
963                // Try incrementing the front index to steal the batch.
964                if self
965                    .inner
966                    .front
967                    .compare_exchange(
968                        f,
969                        f.wrapping_add(batch_size + 1),
970                        Ordering::SeqCst,
971                        Ordering::Relaxed,
972                    )
973                    .is_err()
974                {
975                    // We didn't steal this task, forget it.
976                    mem::forget(task);
977                    return Steal::Retry;
978                }
979
980                dest_b = dest_b.wrapping_add(batch_size);
981            }
982
983            // Steal a batch of tasks from the front one by one.
984            Flavor::Lifo => {
985                // Try incrementing the front index to steal the task.
986                if self
987                    .inner
988                    .front
989                    .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
990                    .is_err()
991                {
992                    // We didn't steal this task, forget it.
993                    mem::forget(task);
994                    return Steal::Retry;
995                }
996
997                // Move the front index one step forward.
998                f = f.wrapping_add(1);
999
1000                // Repeat the same procedure for the batch steals.
1001                let size = batch_size;
1002                for i in 0..size {
1003                    // We've already got the current front index. Now execute the fence to
1004                    // synchronize with other threads.
1005                    atomic::fence(Ordering::SeqCst);
1006
1007                    // Load the back index.
1008                    let b = self.inner.back.load(Ordering::Acquire);
1009
1010                    // Is the queue empty?
1011                    if b.wrapping_sub(f) <= 0 {
1012                        batch_size = i;
1013                        break;
1014                    }
1015
1016                    // Read the task at the front.
1017                    let tmp = unsafe { buffer.deref().read(f) };
1018
1019                    // Try incrementing the front index to steal the task.
1020                    if self
1021                        .inner
1022                        .front
1023                        .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
1024                        .is_err()
1025                    {
1026                        // We didn't steal this task, forget it and break from the loop.
1027                        mem::forget(tmp);
1028                        batch_size = i;
1029                        break;
1030                    }
1031
1032                    // Write the previously stolen task into the destination buffer.
1033                    unsafe {
1034                        dest_buffer.write(dest_b, mem::replace(&mut task, tmp));
1035                    }
1036
1037                    // Move the source front index and the destination back index one step forward.
1038                    f = f.wrapping_add(1);
1039                    dest_b = dest_b.wrapping_add(1);
1040                }
1041
1042                // If stealing into a FIFO queue, stolen tasks need to be reversed.
1043                if dest.flavor == Flavor::Fifo {
1044                    for i in 0..batch_size / 2 {
1045                        unsafe {
1046                            let i1 = dest_b.wrapping_sub(batch_size - i);
1047                            let i2 = dest_b.wrapping_sub(i + 1);
1048                            let t1 = dest_buffer.read(i1);
1049                            let t2 = dest_buffer.read(i2);
1050                            dest_buffer.write(i1, t2);
1051                            dest_buffer.write(i2, t1);
1052                        }
1053                    }
1054                }
1055            }
1056        }
1057
1058        atomic::fence(Ordering::Release);
1059
1060        // Update the back index in the destination queue.
1061        //
1062        // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
1063        // races because it doesn't understand fences.
1064        dest.inner.back.store(dest_b, Ordering::Release);
1065
1066        // Return with success.
1067        Steal::Success(task)
1068    }
1069}
1070
1071impl<T> Clone for Stealer<T> {
1072    fn clone(&self) -> Stealer<T> {
1073        Stealer {
1074            inner: self.inner.clone(),
1075            flavor: self.flavor,
1076        }
1077    }
1078}
1079
1080impl<T> fmt::Debug for Stealer<T> {
1081    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1082        f.pad("Stealer { .. }")
1083    }
1084}
1085
1086// Bits indicating the state of a slot:
1087// * If a task has been written into the slot, `WRITE` is set.
1088// * If a task has been read from the slot, `READ` is set.
1089// * If the block is being destroyed, `DESTROY` is set.
1090const WRITE: usize = 1;
1091const READ: usize = 2;
1092const DESTROY: usize = 4;
1093
1094// Each block covers one "lap" of indices.
1095const LAP: usize = 64;
1096// The maximum number of values a block can hold.
1097const BLOCK_CAP: usize = LAP - 1;
1098// How many lower bits are reserved for metadata.
1099const SHIFT: usize = 1;
1100// Indicates that the block is not the last one.
1101const HAS_NEXT: usize = 1;
1102
1103/// A slot in a block.
1104struct Slot<T> {
1105    /// The task.
1106    task: UnsafeCell<ManuallyDrop<T>>,
1107
1108    /// The state of the slot.
1109    state: AtomicUsize,
1110}
1111
1112impl<T> Slot<T> {
1113    /// Waits until a task is written into the slot.
1114    fn wait_write(&self) {
1115        let backoff = Backoff::new();
1116        while self.state.load(Ordering::Acquire) & WRITE == 0 {
1117            backoff.snooze();
1118        }
1119    }
1120}
1121
1122/// A block in a linked list.
1123///
1124/// Each block in the list can hold up to `BLOCK_CAP` values.
1125struct Block<T> {
1126    /// The next block in the linked list.
1127    next: AtomicPtr<Block<T>>,
1128
1129    /// Slots for values.
1130    slots: [Slot<T>; BLOCK_CAP],
1131}
1132
1133impl<T> Block<T> {
1134    /// Creates an empty block that starts at `start_index`.
1135    fn new() -> Block<T> {
1136        unsafe { mem::zeroed() }
1137    }
1138
1139    /// Waits until the next pointer is set.
1140    fn wait_next(&self) -> *mut Block<T> {
1141        let backoff = Backoff::new();
1142        loop {
1143            let next = self.next.load(Ordering::Acquire);
1144            if !next.is_null() {
1145                return next;
1146            }
1147            backoff.snooze();
1148        }
1149    }
1150
1151    /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
1152    unsafe fn destroy(this: *mut Block<T>, count: usize) {
1153        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
1154        // begun destruction of the block.
1155        for i in (0..count).rev() {
1156            let slot = (*this).slots.get_unchecked(i);
1157
1158            // Mark the `DESTROY` bit if a thread is still using the slot.
1159            if slot.state.load(Ordering::Acquire) & READ == 0
1160                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
1161            {
1162                // If a thread is still using the slot, it will continue destruction of the block.
1163                return;
1164            }
1165        }
1166
1167        // No thread is using the block, now it is safe to destroy it.
1168        drop(Box::from_raw(this));
1169    }
1170}
1171
1172/// A position in a queue.
1173struct Position<T> {
1174    /// The index in the queue.
1175    index: AtomicUsize,
1176
1177    /// The block in the linked list.
1178    block: AtomicPtr<Block<T>>,
1179}
1180
1181/// An injector queue.
1182///
1183/// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have
1184/// a single injector queue, which is the entry point for new tasks.
1185pub struct Injector<T> {
1186    /// The head of the queue.
1187    head: CachePadded<Position<T>>,
1188
1189    /// The tail of the queue.
1190    tail: CachePadded<Position<T>>,
1191
1192    /// Indicates that dropping a `Injector<T>` may drop values of type `T`.
1193    _marker: PhantomData<T>,
1194}
1195
1196unsafe impl<T: Send> Send for Injector<T> {}
1197unsafe impl<T: Send> Sync for Injector<T> {}
1198
1199impl<T> Default for Injector<T> {
1200    fn default() -> Self {
1201        let block = Box::into_raw(Box::new(Block::<T>::new()));
1202        Self {
1203            head: CachePadded::new(Position {
1204                block: AtomicPtr::new(block),
1205                index: AtomicUsize::new(0),
1206            }),
1207            tail: CachePadded::new(Position {
1208                block: AtomicPtr::new(block),
1209                index: AtomicUsize::new(0),
1210            }),
1211            _marker: PhantomData,
1212        }
1213    }
1214}
1215
1216impl<T> Injector<T> {
1217    /// Creates a new injector queue.
1218    pub fn new() -> Self {
1219        Self::default()
1220    }
1221
1222    /// Pushes a task into the queue.
1223    pub fn push(&self, task: T) {
1224        let backoff = Backoff::new();
1225        let mut tail = self.tail.index.load(Ordering::Acquire);
1226        let mut block = self.tail.block.load(Ordering::Acquire);
1227        let mut next_block = None;
1228
1229        loop {
1230            // Calculate the offset of the index into the block.
1231            let offset = (tail >> SHIFT) % LAP;
1232
1233            // If we reached the end of the block, wait until the next one is installed.
1234            if offset == BLOCK_CAP {
1235                backoff.snooze();
1236                tail = self.tail.index.load(Ordering::Acquire);
1237                block = self.tail.block.load(Ordering::Acquire);
1238                continue;
1239            }
1240
1241            // If we're going to have to install the next block, allocate it in advance in order to
1242            // make the wait for other threads as short as possible.
1243            if offset + 1 == BLOCK_CAP && next_block.is_none() {
1244                next_block = Some(Box::new(Block::<T>::new()));
1245            }
1246
1247            let new_tail = tail + (1 << SHIFT);
1248
1249            // Try advancing the tail forward.
1250            match self.tail.index.compare_exchange_weak(
1251                tail,
1252                new_tail,
1253                Ordering::SeqCst,
1254                Ordering::Acquire,
1255            ) {
1256                Ok(_) => unsafe {
1257                    // If we've reached the end of the block, install the next one.
1258                    if offset + 1 == BLOCK_CAP {
1259                        let next_block = Box::into_raw(next_block.unwrap());
1260                        let next_index = new_tail.wrapping_add(1 << SHIFT);
1261
1262                        self.tail.block.store(next_block, Ordering::Release);
1263                        self.tail.index.store(next_index, Ordering::Release);
1264                        (*block).next.store(next_block, Ordering::Release);
1265                    }
1266
1267                    // Write the task into the slot.
1268                    let slot = (*block).slots.get_unchecked(offset);
1269                    slot.task.get().write(ManuallyDrop::new(task));
1270                    slot.state.fetch_or(WRITE, Ordering::Release);
1271
1272                    return;
1273                },
1274                Err(t) => {
1275                    tail = t;
1276                    block = self.tail.block.load(Ordering::Acquire);
1277                    backoff.spin();
1278                }
1279            }
1280        }
1281    }
1282
1283    /// Steals a task from the queue.
1284    pub fn steal(&self) -> Steal<T> {
1285        let mut head;
1286        let mut block;
1287        let mut offset;
1288
1289        let backoff = Backoff::new();
1290        loop {
1291            head = self.head.index.load(Ordering::Acquire);
1292            block = self.head.block.load(Ordering::Acquire);
1293
1294            // Calculate the offset of the index into the block.
1295            offset = (head >> SHIFT) % LAP;
1296
1297            // If we reached the end of the block, wait until the next one is installed.
1298            if offset == BLOCK_CAP {
1299                backoff.snooze();
1300            } else {
1301                break;
1302            }
1303        }
1304
1305        let mut new_head = head + (1 << SHIFT);
1306
1307        if new_head & HAS_NEXT == 0 {
1308            atomic::fence(Ordering::SeqCst);
1309            let tail = self.tail.index.load(Ordering::Relaxed);
1310
1311            // If the tail equals the head, that means the queue is empty.
1312            if head >> SHIFT == tail >> SHIFT {
1313                return Steal::Empty;
1314            }
1315
1316            // If head and tail are not in the same block, set `HAS_NEXT` in head.
1317            if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1318                new_head |= HAS_NEXT;
1319            }
1320        }
1321
1322        // Try moving the head index forward.
1323        if self
1324            .head
1325            .index
1326            .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1327            .is_err()
1328        {
1329            return Steal::Retry;
1330        }
1331
1332        unsafe {
1333            // If we've reached the end of the block, move to the next one.
1334            if offset + 1 == BLOCK_CAP {
1335                let next = (*block).wait_next();
1336                let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1337                if !(*next).next.load(Ordering::Relaxed).is_null() {
1338                    next_index |= HAS_NEXT;
1339                }
1340
1341                self.head.block.store(next, Ordering::Release);
1342                self.head.index.store(next_index, Ordering::Release);
1343            }
1344
1345            // Read the task.
1346            let slot = (*block).slots.get_unchecked(offset);
1347            slot.wait_write();
1348            let m = slot.task.get().read();
1349            let task = ManuallyDrop::into_inner(m);
1350
1351            // Destroy the block if we've reached the end, or if another thread wanted to destroy
1352            // but couldn't because we were busy reading from the slot.
1353            if offset + 1 == BLOCK_CAP {
1354                Block::destroy(block, offset);
1355            } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1356                Block::destroy(block, offset);
1357            }
1358
1359            Steal::Success(task)
1360        }
1361    }
1362
1363    /// Steals a batch of tasks and pushes them into a worker.
1364    ///
1365    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1366    /// steal around half of the tasks in the queue, but also not more than some constant limit.
1367    pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
1368        let mut head;
1369        let mut block;
1370        let mut offset;
1371
1372        let backoff = Backoff::new();
1373        loop {
1374            head = self.head.index.load(Ordering::Acquire);
1375            block = self.head.block.load(Ordering::Acquire);
1376
1377            // Calculate the offset of the index into the block.
1378            offset = (head >> SHIFT) % LAP;
1379
1380            // If we reached the end of the block, wait until the next one is installed.
1381            if offset == BLOCK_CAP {
1382                backoff.snooze();
1383            } else {
1384                break;
1385            }
1386        }
1387
1388        let mut new_head = head;
1389        let advance;
1390
1391        if new_head & HAS_NEXT == 0 {
1392            atomic::fence(Ordering::SeqCst);
1393            let tail = self.tail.index.load(Ordering::Relaxed);
1394
1395            // If the tail equals the head, that means the queue is empty.
1396            if head >> SHIFT == tail >> SHIFT {
1397                return Steal::Empty;
1398            }
1399
1400            // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate
1401            // the right batch size to steal.
1402            if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1403                new_head |= HAS_NEXT;
1404                // We can steal all tasks till the end of the block.
1405                advance = (BLOCK_CAP - offset).min(MAX_BATCH);
1406            } else {
1407                let len = (tail - head) >> SHIFT;
1408                // Steal half of the available tasks.
1409                advance = ((len + 1) / 2).min(MAX_BATCH);
1410            }
1411        } else {
1412            // We can steal all tasks till the end of the block.
1413            advance = (BLOCK_CAP - offset).min(MAX_BATCH);
1414        }
1415
1416        new_head += advance << SHIFT;
1417        let new_offset = offset + advance;
1418
1419        // Try moving the head index forward.
1420        if self
1421            .head
1422            .index
1423            .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1424            .is_err()
1425        {
1426            return Steal::Retry;
1427        }
1428
1429        // Reserve capacity for the stolen batch.
1430        let batch_size = new_offset - offset;
1431        dest.reserve(batch_size);
1432
1433        // Get the destination buffer and back index.
1434        let dest_buffer = dest.buffer.get();
1435        let dest_b = dest.inner.back.load(Ordering::Relaxed);
1436
1437        unsafe {
1438            // If we've reached the end of the block, move to the next one.
1439            if new_offset == BLOCK_CAP {
1440                let next = (*block).wait_next();
1441                let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1442                if !(*next).next.load(Ordering::Relaxed).is_null() {
1443                    next_index |= HAS_NEXT;
1444                }
1445
1446                self.head.block.store(next, Ordering::Release);
1447                self.head.index.store(next_index, Ordering::Release);
1448            }
1449
1450            // Copy values from the injector into the destination queue.
1451            match dest.flavor {
1452                Flavor::Fifo => {
1453                    for i in 0..batch_size {
1454                        // Read the task.
1455                        let slot = (*block).slots.get_unchecked(offset + i);
1456                        slot.wait_write();
1457                        let m = slot.task.get().read();
1458                        let task = ManuallyDrop::into_inner(m);
1459
1460                        // Write it into the destination queue.
1461                        dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1462                    }
1463                }
1464
1465                Flavor::Lifo => {
1466                    for i in 0..batch_size {
1467                        // Read the task.
1468                        let slot = (*block).slots.get_unchecked(offset + i);
1469                        slot.wait_write();
1470                        let m = slot.task.get().read();
1471                        let task = ManuallyDrop::into_inner(m);
1472
1473                        // Write it into the destination queue.
1474                        dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1475                    }
1476                }
1477            }
1478
1479            atomic::fence(Ordering::Release);
1480
1481            // Update the back index in the destination queue.
1482            //
1483            // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1484            // data races because it doesn't understand fences.
1485            dest.inner
1486                .back
1487                .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1488
1489            // Destroy the block if we've reached the end, or if another thread wanted to destroy
1490            // but couldn't because we were busy reading from the slot.
1491            if new_offset == BLOCK_CAP {
1492                Block::destroy(block, offset);
1493            } else {
1494                for i in offset..new_offset {
1495                    let slot = (*block).slots.get_unchecked(i);
1496
1497                    if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1498                        Block::destroy(block, offset);
1499                        break;
1500                    }
1501                }
1502            }
1503
1504            Steal::Success(())
1505        }
1506    }
1507
1508    /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker.
1509    ///
1510    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1511    /// steal around half of the tasks in the queue, but also not more than some constant limit.
1512    pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
1513        let mut head;
1514        let mut block;
1515        let mut offset;
1516
1517        let backoff = Backoff::new();
1518        loop {
1519            head = self.head.index.load(Ordering::Acquire);
1520            block = self.head.block.load(Ordering::Acquire);
1521
1522            // Calculate the offset of the index into the block.
1523            offset = (head >> SHIFT) % LAP;
1524
1525            // If we reached the end of the block, wait until the next one is installed.
1526            if offset == BLOCK_CAP {
1527                backoff.snooze();
1528            } else {
1529                break;
1530            }
1531        }
1532
1533        let mut new_head = head;
1534        let advance;
1535
1536        if new_head & HAS_NEXT == 0 {
1537            atomic::fence(Ordering::SeqCst);
1538            let tail = self.tail.index.load(Ordering::Relaxed);
1539
1540            // If the tail equals the head, that means the queue is empty.
1541            if head >> SHIFT == tail >> SHIFT {
1542                return Steal::Empty;
1543            }
1544
1545            // If head and tail are not in the same block, set `HAS_NEXT` in head.
1546            if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1547                new_head |= HAS_NEXT;
1548                // We can steal all tasks till the end of the block.
1549                advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1);
1550            } else {
1551                let len = (tail - head) >> SHIFT;
1552                // Steal half of the available tasks.
1553                advance = ((len + 1) / 2).min(MAX_BATCH + 1);
1554            }
1555        } else {
1556            // We can steal all tasks till the end of the block.
1557            advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1);
1558        }
1559
1560        new_head += advance << SHIFT;
1561        let new_offset = offset + advance;
1562
1563        // Try moving the head index forward.
1564        if self
1565            .head
1566            .index
1567            .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1568            .is_err()
1569        {
1570            return Steal::Retry;
1571        }
1572
1573        // Reserve capacity for the stolen batch.
1574        let batch_size = new_offset - offset - 1;
1575        dest.reserve(batch_size);
1576
1577        // Get the destination buffer and back index.
1578        let dest_buffer = dest.buffer.get();
1579        let dest_b = dest.inner.back.load(Ordering::Relaxed);
1580
1581        unsafe {
1582            // If we've reached the end of the block, move to the next one.
1583            if new_offset == BLOCK_CAP {
1584                let next = (*block).wait_next();
1585                let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1586                if !(*next).next.load(Ordering::Relaxed).is_null() {
1587                    next_index |= HAS_NEXT;
1588                }
1589
1590                self.head.block.store(next, Ordering::Release);
1591                self.head.index.store(next_index, Ordering::Release);
1592            }
1593
1594            // Read the task.
1595            let slot = (*block).slots.get_unchecked(offset);
1596            slot.wait_write();
1597            let m = slot.task.get().read();
1598            let task = ManuallyDrop::into_inner(m);
1599
1600            match dest.flavor {
1601                Flavor::Fifo => {
1602                    // Copy values from the injector into the destination queue.
1603                    for i in 0..batch_size {
1604                        // Read the task.
1605                        let slot = (*block).slots.get_unchecked(offset + i + 1);
1606                        slot.wait_write();
1607                        let m = slot.task.get().read();
1608                        let task = ManuallyDrop::into_inner(m);
1609
1610                        // Write it into the destination queue.
1611                        dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1612                    }
1613                }
1614
1615                Flavor::Lifo => {
1616                    // Copy values from the injector into the destination queue.
1617                    for i in 0..batch_size {
1618                        // Read the task.
1619                        let slot = (*block).slots.get_unchecked(offset + i + 1);
1620                        slot.wait_write();
1621                        let m = slot.task.get().read();
1622                        let task = ManuallyDrop::into_inner(m);
1623
1624                        // Write it into the destination queue.
1625                        dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1626                    }
1627                }
1628            }
1629
1630            atomic::fence(Ordering::Release);
1631
1632            // Update the back index in the destination queue.
1633            //
1634            // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1635            // data races because it doesn't understand fences.
1636            dest.inner
1637                .back
1638                .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1639
1640            // Destroy the block if we've reached the end, or if another thread wanted to destroy
1641            // but couldn't because we were busy reading from the slot.
1642            if new_offset == BLOCK_CAP {
1643                Block::destroy(block, offset);
1644            } else {
1645                for i in offset..new_offset {
1646                    let slot = (*block).slots.get_unchecked(i);
1647
1648                    if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1649                        Block::destroy(block, offset);
1650                        break;
1651                    }
1652                }
1653            }
1654
1655            Steal::Success(task)
1656        }
1657    }
1658
1659    /// Returns `true` if the queue is empty.
1660    pub fn is_empty(&self) -> bool {
1661        let head = self.head.index.load(Ordering::SeqCst);
1662        let tail = self.tail.index.load(Ordering::SeqCst);
1663        head >> SHIFT == tail >> SHIFT
1664    }
1665}
1666
1667impl<T> Drop for Injector<T> {
1668    fn drop(&mut self) {
1669        let mut head = self.head.index.load(Ordering::Relaxed);
1670        let mut tail = self.tail.index.load(Ordering::Relaxed);
1671        let mut block = self.head.block.load(Ordering::Relaxed);
1672
1673        // Erase the lower bits.
1674        head &= !((1 << SHIFT) - 1);
1675        tail &= !((1 << SHIFT) - 1);
1676
1677        unsafe {
1678            // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
1679            while head != tail {
1680                let offset = (head >> SHIFT) % LAP;
1681
1682                if offset < BLOCK_CAP {
1683                    // Drop the task in the slot.
1684                    let slot = (*block).slots.get_unchecked(offset);
1685                    ManuallyDrop::drop(&mut *(*slot).task.get());
1686                } else {
1687                    // Deallocate the block and move to the next one.
1688                    let next = (*block).next.load(Ordering::Relaxed);
1689                    drop(Box::from_raw(block));
1690                    block = next;
1691                }
1692
1693                head = head.wrapping_add(1 << SHIFT);
1694            }
1695
1696            // Deallocate the last remaining block.
1697            drop(Box::from_raw(block));
1698        }
1699    }
1700}
1701
1702impl<T> fmt::Debug for Injector<T> {
1703    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1704        f.pad("Worker { .. }")
1705    }
1706}
1707
1708/// Possible outcomes of a steal operation.
1709#[must_use]
1710#[derive(PartialEq, Eq, Copy, Clone)]
1711pub enum Steal<T> {
1712    /// The queue was empty at the time of stealing.
1713    Empty,
1714
1715    /// At least one task was successfully stolen.
1716    Success(T),
1717
1718    /// The steal operation needs to be retried.
1719    Retry,
1720}
1721
1722impl<T> Steal<T> {
1723    /// Returns `true` if the queue was empty at the time of stealing.
1724    pub fn is_empty(&self) -> bool {
1725        matches!(self, Steal::Empty)
1726    }
1727
1728    /// Returns `true` if at least one task was stolen.
1729    pub fn is_success(&self) -> bool {
1730        matches!(self, Steal::Success(_))
1731    }
1732
1733    /// Returns `true` if the steal operation needs to be retried.
1734    pub fn is_retry(&self) -> bool {
1735        matches!(self, Steal::Retry)
1736    }
1737
1738    /// Returns the result of the operation, if successful.
1739    pub fn success(self) -> Option<T> {
1740        match self {
1741            Steal::Success(res) => Some(res),
1742            _ => None,
1743        }
1744    }
1745
1746    /// If no task was stolen, attempts another steal operation.
1747    ///
1748    /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then:
1749    ///
1750    /// * If the second steal resulted in `Success`, it is returned.
1751    /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned.
1752    /// * If both resulted in `None`, then `None` is returned.
1753    pub fn or_else<F>(self, f: F) -> Steal<T>
1754    where
1755        F: FnOnce() -> Steal<T>,
1756    {
1757        match self {
1758            Steal::Empty => f(),
1759            Steal::Success(_) => self,
1760            Steal::Retry => {
1761                if let Steal::Success(res) = f() {
1762                    Steal::Success(res)
1763                } else {
1764                    Steal::Retry
1765                }
1766            }
1767        }
1768    }
1769}
1770
1771impl<T> fmt::Debug for Steal<T> {
1772    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1773        match self {
1774            Steal::Empty => f.pad("Empty"),
1775            Steal::Success(_) => f.pad("Success(..)"),
1776            Steal::Retry => f.pad("Retry"),
1777        }
1778    }
1779}
1780
1781impl<T> FromIterator<Steal<T>> for Steal<T> {
1782    /// Consumes items until a [`Success`] is found and returns it.
1783    ///
1784    /// If no [`Success`] was found, but there was at least one [`Retry`], then returns [`Retry`].
1785    /// Otherwise, [`Empty`] is returned.
1786    ///
1787    /// [`Success`]: Steal::Success
1788    /// [`Retry`]: Steal::Retry
1789    /// [`Empty`]: Steal::Empty
1790    fn from_iter<I>(iter: I) -> Steal<T>
1791    where
1792        I: IntoIterator<Item = Steal<T>>,
1793    {
1794        let mut retry = false;
1795        for s in iter {
1796            match &s {
1797                Steal::Empty => {}
1798                Steal::Success(_) => return s,
1799                Steal::Retry => retry = true,
1800            }
1801        }
1802
1803        if retry {
1804            Steal::Retry
1805        } else {
1806            Steal::Empty
1807        }
1808    }
1809}