bastion_executor/run_queue.rs
1//! Concurrent work-stealing deques with proportional stealing enabled.
2//!
3//! Adapted from [crossbeam-deque].
4//!
5//! These data structures are most commonly used in work-stealing schedulers. The typical setup
6//! involves a number of threads, each having its own FIFO or LIFO queue (*worker*). There is also
7//! one global FIFO queue (*injector*) and a list of references to *worker* queues that are able to
8//! steal tasks (*stealers*).
9//!
10//! We spawn a new task onto the scheduler by pushing it into the *injector* queue. Each worker
11//! thread waits in a loop until it finds the next task to run and then runs it. To find a task, it
12//! first looks into its local *worker* queue, and then into the *injector* and *stealers*.
13//!
14//! # Queues
15//!
16//! [`Injector`] is a FIFO queue, where tasks are pushed and stolen from opposite ends. It is
17//! shared among threads and is usually the entry point for new tasks.
18//!
19//! [`Worker`] has two constructors:
20//!
21//! * [`new_fifo`] - Creates a FIFO queue, in which tasks are pushed and popped from opposite
22//! ends.
23//! * [`new_lifo`] - Creates a LIFO queue, in which tasks are pushed and popped from the same
24//! end.
25//!
26//! Each [`Worker`] is owned by a single thread and supports only push and pop operations.
27//!
28//! Method [`stealer`] creates a [`Stealer`] that may be shared among threads and can only steal
29//! tasks from its [`Worker`]. Tasks are stolen from the end opposite to where they get pushed.
30//!
31//! # Stealing
32//!
33//! Steal operations come in three flavors:
34//!
35//! 1. [`steal`] - Steals one task.
36//! 2. [`steal_batch`] - Steals a batch of tasks and moves them into another worker.
37//! 3. [`steal_batch_and_pop`] - Steals a batch of tasks, moves them into another queue, and pops
38//! one task from that worker.
39//!
40//! In contrast to push and pop operations, stealing can spuriously fail with [`Steal::Retry`], in
41//! which case the steal operation needs to be retried.
42//!
43//! [`new_fifo`]: Worker::new_fifo
44//! [`new_lifo`]: Worker::new_lifo
45//! [`stealer`]: Worker::stealer
46//! [`steal`]: Stealer::steal
47//! [`steal_batch`]: Stealer::steal_batch
48//! [`steal_batch_and_pop`]: Stealer::steal_batch_and_pop
49use crossbeam_epoch::{self as epoch, Atomic, Owned};
50use crossbeam_utils::{Backoff, CachePadded};
51use std::cell::{Cell, UnsafeCell};
52use std::iter::FromIterator;
53use std::marker::PhantomData;
54use std::mem::{self, ManuallyDrop};
55use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
56use std::sync::Arc;
57use std::{cmp, fmt, ptr};
58
59// Minimum buffer capacity.
60const MIN_CAP: usize = 64;
61// Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`.
62const MAX_BATCH: usize = 32;
63// If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets
64// deallocated as soon as possible.
65const FLUSH_THRESHOLD_BYTES: usize = 1 << 10;
66
67/// A buffer that holds tasks in a worker queue.
68///
69/// This is just a pointer to the buffer and its length - dropping an instance of this struct will
70/// *not* deallocate the buffer.
71struct Buffer<T> {
72 /// Pointer to the allocated memory.
73 ptr: *mut T,
74
75 /// Capacity of the buffer. Always a power of two.
76 cap: usize,
77}
78
79unsafe impl<T> Send for Buffer<T> {}
80
81impl<T> Buffer<T> {
82 /// Allocates a new buffer with the specified capacity.
83 fn alloc(cap: usize) -> Buffer<T> {
84 debug_assert_eq!(cap, cap.next_power_of_two());
85
86 let mut v = Vec::with_capacity(cap);
87 let ptr = v.as_mut_ptr();
88 mem::forget(v);
89
90 Buffer { ptr, cap }
91 }
92
93 /// Deallocates the buffer.
94 unsafe fn dealloc(self) {
95 drop(Vec::from_raw_parts(self.ptr, 0, self.cap));
96 }
97
98 /// Returns a pointer to the task at the specified `index`.
99 unsafe fn at(&self, index: isize) -> *mut T {
100 // `self.cap` is always a power of two.
101 self.ptr.offset(index & (self.cap - 1) as isize)
102 }
103
104 /// Writes `task` into the specified `index`.
105 ///
106 /// This method might be concurrently called with another `read` at the same index, which is
107 /// technically speaking a data race and therefore UB. We should use an atomic store here, but
108 /// that would be more expensive and difficult to implement generically for all types `T`.
109 /// Hence, as a hack, we use a volatile write instead.
110 unsafe fn write(&self, index: isize, task: T) {
111 ptr::write_volatile(self.at(index), task)
112 }
113
114 /// Reads a task from the specified `index`.
115 ///
116 /// This method might be concurrently called with another `write` at the same index, which is
117 /// technically speaking a data race and therefore UB. We should use an atomic load here, but
118 /// that would be more expensive and difficult to implement generically for all types `T`.
119 /// Hence, as a hack, we use a volatile write instead.
120 unsafe fn read(&self, index: isize) -> T {
121 ptr::read_volatile(self.at(index))
122 }
123}
124
125impl<T> Clone for Buffer<T> {
126 fn clone(&self) -> Buffer<T> {
127 Buffer {
128 ptr: self.ptr,
129 cap: self.cap,
130 }
131 }
132}
133
134impl<T> Copy for Buffer<T> {}
135
136/// Internal queue data shared between the worker and stealers.
137///
138/// The implementation is based on the following work:
139///
140/// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev]
141/// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models.
142/// PPoPP 2013.][weak-mem]
143/// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++
144/// atomics. OOPSLA 2013.][checker]
145///
146/// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974
147/// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524
148/// [checker]: https://dl.acm.org/citation.cfm?id=2509514
149struct Inner<T> {
150 /// The front index.
151 front: AtomicIsize,
152
153 /// The back index.
154 back: AtomicIsize,
155
156 /// The underlying buffer.
157 buffer: CachePadded<Atomic<Buffer<T>>>,
158}
159
160impl<T> Drop for Inner<T> {
161 fn drop(&mut self) {
162 // Load the back index, front index, and buffer.
163 let b = self.back.load(Ordering::Relaxed);
164 let f = self.front.load(Ordering::Relaxed);
165
166 unsafe {
167 let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected());
168
169 // Go through the buffer from front to back and drop all tasks in the queue.
170 let mut i = f;
171 while i != b {
172 ptr::drop_in_place(buffer.deref().at(i));
173 i = i.wrapping_add(1);
174 }
175
176 // Free the memory allocated by the buffer.
177 buffer.into_owned().into_box().dealloc();
178 }
179 }
180}
181
182/// Worker queue flavor: FIFO or LIFO.
183#[derive(Clone, Copy, Debug, Eq, PartialEq)]
184enum Flavor {
185 /// The first-in first-out flavor.
186 Fifo,
187
188 /// The last-in first-out flavor.
189 Lifo,
190}
191
192/// A worker queue.
193///
194/// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal
195/// tasks from it. Task schedulers typically create a single worker queue per thread.
196pub struct Worker<T> {
197 /// A reference to the inner representation of the queue.
198 inner: Arc<CachePadded<Inner<T>>>,
199
200 /// A copy of `inner.buffer` for quick access.
201 buffer: Cell<Buffer<T>>,
202
203 /// The flavor of the queue.
204 flavor: Flavor,
205
206 /// Indicates that the worker cannot be shared among threads.
207 _marker: PhantomData<*mut ()>, // !Send + !Sync
208}
209
210unsafe impl<T: Send> Send for Worker<T> {}
211
212impl<T> Worker<T> {
213 /// Creates a FIFO worker queue.
214 pub fn new_fifo() -> Worker<T> {
215 let buffer = Buffer::alloc(MIN_CAP);
216
217 let inner = Arc::new(CachePadded::new(Inner {
218 front: AtomicIsize::new(0),
219 back: AtomicIsize::new(0),
220 buffer: CachePadded::new(Atomic::new(buffer)),
221 }));
222
223 Worker {
224 inner,
225 buffer: Cell::new(buffer),
226 flavor: Flavor::Fifo,
227 _marker: PhantomData,
228 }
229 }
230
231 /// Creates a LIFO worker queue.
232 pub fn new_lifo() -> Worker<T> {
233 let buffer = Buffer::alloc(MIN_CAP);
234
235 let inner = Arc::new(CachePadded::new(Inner {
236 front: AtomicIsize::new(0),
237 back: AtomicIsize::new(0),
238 buffer: CachePadded::new(Atomic::new(buffer)),
239 }));
240
241 Worker {
242 inner,
243 buffer: Cell::new(buffer),
244 flavor: Flavor::Lifo,
245 _marker: PhantomData,
246 }
247 }
248
249 /// Get the worker's run queue size
250 pub fn worker_run_queue_size(&self) -> usize {
251 let b = self.inner.back.load(Ordering::Relaxed);
252 let f = self.inner.front.load(Ordering::SeqCst);
253 match b.wrapping_sub(f) {
254 x if x <= 0 => 0_usize,
255 y => y as usize,
256 }
257 }
258
259 /// Creates a stealer for this queue.
260 ///
261 /// The returned stealer can be shared among threads and cloned.
262 pub fn stealer(&self) -> Stealer<T> {
263 Stealer {
264 inner: self.inner.clone(),
265 flavor: self.flavor,
266 }
267 }
268
269 /// Resizes the internal buffer to the new capacity of `new_cap`.
270 #[cold]
271 unsafe fn resize(&self, new_cap: usize) {
272 // Load the back index, front index, and buffer.
273 let b = self.inner.back.load(Ordering::Relaxed);
274 let f = self.inner.front.load(Ordering::Relaxed);
275 let buffer = self.buffer.get();
276
277 // Allocate a new buffer and copy data from the old buffer to the new one.
278 let new = Buffer::alloc(new_cap);
279 let mut i = f;
280 while i != b {
281 ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1);
282 i = i.wrapping_add(1);
283 }
284
285 let guard = &epoch::pin();
286
287 // Replace the old buffer with the new one.
288 self.buffer.replace(new);
289 let old =
290 self.inner
291 .buffer
292 .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard);
293
294 // Destroy the old buffer later.
295 guard.defer_unchecked(move || old.into_owned().into_box().dealloc());
296
297 // If the buffer is very large, then flush the thread-local garbage in order to deallocate
298 // it as soon as possible.
299 if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES {
300 guard.flush();
301 }
302 }
303
304 /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the
305 /// buffer.
306 fn reserve(&self, reserve_cap: usize) {
307 if reserve_cap > 0 {
308 // Compute the current length.
309 let b = self.inner.back.load(Ordering::Relaxed);
310 let f = self.inner.front.load(Ordering::SeqCst);
311 let len = b.wrapping_sub(f) as usize;
312
313 // The current capacity.
314 let cap = self.buffer.get().cap;
315
316 // Is there enough capacity to push `reserve_cap` tasks?
317 if cap.saturating_sub(len) < reserve_cap {
318 // Keep doubling the capacity as much as is needed.
319 let mut new_cap = cap * 2;
320 while new_cap.saturating_sub(len) < reserve_cap {
321 new_cap = new_cap.wrapping_mul(2);
322 }
323
324 // Resize the buffer.
325 unsafe {
326 self.resize(new_cap);
327 }
328 }
329 }
330 }
331
332 /// Returns `true` if the queue is empty.
333 pub fn is_empty(&self) -> bool {
334 let b = self.inner.back.load(Ordering::Relaxed);
335 let f = self.inner.front.load(Ordering::SeqCst);
336 b.wrapping_sub(f) <= 0
337 }
338
339 /// Pushes a task into the queue.
340 pub fn push(&self, task: T) {
341 // Load the back index, front index, and buffer.
342 let b = self.inner.back.load(Ordering::Relaxed);
343 let f = self.inner.front.load(Ordering::Acquire);
344 let mut buffer = self.buffer.get();
345
346 // Calculate the length of the queue.
347 let len = b.wrapping_sub(f);
348
349 // Is the queue full?
350 if len >= buffer.cap as isize {
351 // Yes. Grow the underlying buffer.
352 unsafe {
353 self.resize(2 * buffer.cap);
354 }
355 buffer = self.buffer.get();
356 }
357
358 // Write `task` into the slot.
359 unsafe {
360 buffer.write(b, task);
361 }
362
363 atomic::fence(Ordering::Release);
364
365 // Increment the back index.
366 //
367 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
368 // races because it doesn't understand fences.
369 self.inner.back.store(b.wrapping_add(1), Ordering::Release);
370 }
371
372 /// Pops a task from the queue.
373 pub fn pop(&self) -> Option<T> {
374 // Load the back and front index.
375 let b = self.inner.back.load(Ordering::Relaxed);
376 let f = self.inner.front.load(Ordering::Relaxed);
377
378 // Calculate the length of the queue.
379 let len = b.wrapping_sub(f);
380
381 // Is the queue empty?
382 if len <= 0 {
383 return None;
384 }
385
386 match self.flavor {
387 // Pop from the front of the queue.
388 Flavor::Fifo => {
389 // Try incrementing the front index to pop the task.
390 let f = self.inner.front.fetch_add(1, Ordering::SeqCst);
391 let new_f = f.wrapping_add(1);
392
393 if b.wrapping_sub(new_f) < 0 {
394 self.inner.front.store(f, Ordering::Relaxed);
395 return None;
396 }
397
398 unsafe {
399 // Read the popped task.
400 let buffer = self.buffer.get();
401 let task = buffer.read(f);
402
403 // Shrink the buffer if `len - 1` is less than one fourth of the capacity.
404 if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
405 self.resize(buffer.cap / 2);
406 }
407
408 Some(task)
409 }
410 }
411
412 // Pop from the back of the queue.
413 Flavor::Lifo => {
414 // Decrement the back index.
415 let b = b.wrapping_sub(1);
416 self.inner.back.store(b, Ordering::Relaxed);
417
418 atomic::fence(Ordering::SeqCst);
419
420 // Load the front index.
421 let f = self.inner.front.load(Ordering::Relaxed);
422
423 // Compute the length after the back index was decremented.
424 let len = b.wrapping_sub(f);
425
426 if len < 0 {
427 // The queue is empty. Restore the back index to the original task.
428 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
429 None
430 } else {
431 // Read the task to be popped.
432 let buffer = self.buffer.get();
433 let mut task = unsafe { Some(buffer.read(b)) };
434
435 // Are we popping the last task from the queue?
436 if len == 0 {
437 // Try incrementing the front index.
438 if self
439 .inner
440 .front
441 .compare_exchange(
442 f,
443 f.wrapping_add(1),
444 Ordering::SeqCst,
445 Ordering::Relaxed,
446 )
447 .is_err()
448 {
449 // Failed. We didn't pop anything.
450 mem::forget(task.take());
451 }
452
453 // Restore the back index to the original task.
454 self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
455 } else {
456 // Shrink the buffer if `len` is less than one fourth of the capacity.
457 if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 {
458 unsafe {
459 self.resize(buffer.cap / 2);
460 }
461 }
462 }
463
464 task
465 }
466 }
467 }
468 }
469}
470
471impl<T> fmt::Debug for Worker<T> {
472 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
473 f.pad("Worker { .. }")
474 }
475}
476
477/// A stealer handle of a worker queue.
478///
479/// Stealers can be shared among threads.
480///
481/// Task schedulers typically have a single worker queue per worker thread.
482pub struct Stealer<T> {
483 /// A reference to the inner representation of the queue.
484 inner: Arc<CachePadded<Inner<T>>>,
485
486 /// The flavor of the queue.
487 flavor: Flavor,
488}
489
490unsafe impl<T: Send> Send for Stealer<T> {}
491unsafe impl<T: Send> Sync for Stealer<T> {}
492
493impl<T> Stealer<T> {
494 /// Returns `true` if the queue is empty.
495 pub fn is_empty(&self) -> bool {
496 let f = self.inner.front.load(Ordering::Acquire);
497 atomic::fence(Ordering::SeqCst);
498 let b = self.inner.back.load(Ordering::Acquire);
499 b.wrapping_sub(f) <= 0
500 }
501
502 /// Returns back the run queue size for the current worker through stealer
503 pub fn run_queue_size(&self) -> usize {
504 let b = self.inner.back.load(Ordering::Acquire);
505 atomic::fence(Ordering::SeqCst);
506 let f = self.inner.front.load(Ordering::Acquire);
507 match b.wrapping_sub(f) {
508 x if x <= 0 => 0_usize,
509 y => y as usize,
510 }
511 }
512
513 /// Steals a task from the queue.
514 pub fn steal(&self) -> Steal<T> {
515 // Load the front index.
516 let f = self.inner.front.load(Ordering::Acquire);
517
518 // A SeqCst fence is needed here.
519 //
520 // If the current thread is already pinned (reentrantly), we must manually issue the
521 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
522 // have to.
523 if epoch::is_pinned() {
524 atomic::fence(Ordering::SeqCst);
525 }
526
527 let guard = &epoch::pin();
528
529 // Load the back index.
530 let b = self.inner.back.load(Ordering::Acquire);
531
532 // Is the queue empty?
533 if b.wrapping_sub(f) <= 0 {
534 return Steal::Empty;
535 }
536
537 // Load the buffer and read the task at the front.
538 let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
539 let task = unsafe { buffer.deref().read(f) };
540
541 // Try incrementing the front index to steal the task.
542 if self
543 .inner
544 .front
545 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
546 .is_err()
547 {
548 // We didn't steal this task, forget it.
549 mem::forget(task);
550 return Steal::Retry;
551 }
552
553 // Return the stolen task.
554 Steal::Success(task)
555 }
556
557 /// Steals a batch of tasks and pushes them into another worker.
558 ///
559 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
560 /// steal around half of the tasks in the queue, but also not more than some constant limit.
561 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
562 // Load the front index.
563 let mut f = self.inner.front.load(Ordering::Acquire);
564
565 // A SeqCst fence is needed here.
566 //
567 // If the current thread is already pinned (reentrantly), we must manually issue the
568 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
569 // have to.
570 if epoch::is_pinned() {
571 atomic::fence(Ordering::SeqCst);
572 }
573
574 let guard = &epoch::pin();
575
576 // Load the back index.
577 let b = self.inner.back.load(Ordering::Acquire);
578
579 // Is the queue empty?
580 let len = b.wrapping_sub(f);
581 if len <= 0 {
582 return Steal::Empty;
583 }
584
585 // Reserve capacity for the stolen batch.
586 let batch_size = cmp::min((len as usize + 1) / 2, MAX_BATCH);
587 dest.reserve(batch_size);
588 let mut batch_size = batch_size as isize;
589
590 // Get the destination buffer and back index.
591 let dest_buffer = dest.buffer.get();
592 let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
593
594 // Load the buffer.
595 let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
596
597 match self.flavor {
598 // Steal a batch of tasks from the front at once.
599 Flavor::Fifo => {
600 // Copy the batch from the source to the destination buffer.
601 match dest.flavor {
602 Flavor::Fifo => {
603 for i in 0..batch_size {
604 unsafe {
605 let task = buffer.deref().read(f.wrapping_add(i));
606 dest_buffer.write(dest_b.wrapping_add(i), task);
607 }
608 }
609 }
610 Flavor::Lifo => {
611 for i in 0..batch_size {
612 unsafe {
613 let task = buffer.deref().read(f.wrapping_add(i));
614 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
615 }
616 }
617 }
618 }
619
620 // Try incrementing the front index to steal the batch.
621 if self
622 .inner
623 .front
624 .compare_exchange(
625 f,
626 f.wrapping_add(batch_size),
627 Ordering::SeqCst,
628 Ordering::Relaxed,
629 )
630 .is_err()
631 {
632 return Steal::Retry;
633 }
634
635 dest_b = dest_b.wrapping_add(batch_size);
636 }
637
638 // Steal a batch of tasks from the front one by one.
639 Flavor::Lifo => {
640 let size = batch_size;
641 for i in 0..size {
642 // If this is not the first steal, check whether the queue is empty.
643 if i > 0 {
644 // We've already got the current front index. Now execute the fence to
645 // synchronize with other threads.
646 atomic::fence(Ordering::SeqCst);
647
648 // Load the back index.
649 let b = self.inner.back.load(Ordering::Acquire);
650
651 // Is the queue empty?
652 if b.wrapping_sub(f) <= 0 {
653 batch_size = i;
654 break;
655 }
656 }
657
658 // Read the task at the front.
659 let task = unsafe { buffer.deref().read(f) };
660
661 // Try incrementing the front index to steal the task.
662 if self
663 .inner
664 .front
665 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
666 .is_err()
667 {
668 // We didn't steal this task, forget it and break from the loop.
669 mem::forget(task);
670 batch_size = i;
671 break;
672 }
673
674 // Write the stolen task into the destination buffer.
675 unsafe {
676 dest_buffer.write(dest_b, task);
677 }
678
679 // Move the source front index and the destination back index one step forward.
680 f = f.wrapping_add(1);
681 dest_b = dest_b.wrapping_add(1);
682 }
683
684 // If we didn't steal anything, the operation needs to be retried.
685 if batch_size == 0 {
686 return Steal::Retry;
687 }
688
689 // If stealing into a FIFO queue, stolen tasks need to be reversed.
690 if dest.flavor == Flavor::Fifo {
691 for i in 0..batch_size / 2 {
692 unsafe {
693 let i1 = dest_b.wrapping_sub(batch_size - i);
694 let i2 = dest_b.wrapping_sub(i + 1);
695 let t1 = dest_buffer.read(i1);
696 let t2 = dest_buffer.read(i2);
697 dest_buffer.write(i1, t2);
698 dest_buffer.write(i2, t1);
699 }
700 }
701 }
702 }
703 }
704
705 atomic::fence(Ordering::Release);
706
707 // Update the back index in the destination queue.
708 //
709 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
710 // races because it doesn't understand fences.
711 dest.inner.back.store(dest_b, Ordering::Release);
712
713 // Return with success.
714 Steal::Success(())
715 }
716
717 /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker.
718 ///
719 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
720 /// steal around half of the tasks in the queue, but also not more than some constant limit.
721 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
722 // Load the front index.
723 let mut f = self.inner.front.load(Ordering::Acquire);
724
725 // A SeqCst fence is needed here.
726 //
727 // If the current thread is already pinned (reentrantly), we must manually issue the
728 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
729 // have to.
730 if epoch::is_pinned() {
731 atomic::fence(Ordering::SeqCst);
732 }
733
734 let guard = &epoch::pin();
735
736 // Load the back index.
737 let b = self.inner.back.load(Ordering::Acquire);
738
739 // Is the queue empty?
740 let len = b.wrapping_sub(f);
741 if len <= 0 {
742 return Steal::Empty;
743 }
744
745 // Reserve capacity for the stolen batch.
746 let batch_size = cmp::min((len as usize - 1) / 2, MAX_BATCH - 1);
747 dest.reserve(batch_size);
748 let mut batch_size = batch_size as isize;
749
750 // Get the destination buffer and back index.
751 let dest_buffer = dest.buffer.get();
752 let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
753
754 // Load the buffer
755 let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
756
757 // Read the task at the front.
758 let mut task = unsafe { buffer.deref().read(f) };
759
760 match self.flavor {
761 // Steal a batch of tasks from the front at once.
762 Flavor::Fifo => {
763 // Copy the batch from the source to the destination buffer.
764 match dest.flavor {
765 Flavor::Fifo => {
766 for i in 0..batch_size {
767 unsafe {
768 let task = buffer.deref().read(f.wrapping_add(i + 1));
769 dest_buffer.write(dest_b.wrapping_add(i), task);
770 }
771 }
772 }
773 Flavor::Lifo => {
774 for i in 0..batch_size {
775 unsafe {
776 let task = buffer.deref().read(f.wrapping_add(i + 1));
777 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
778 }
779 }
780 }
781 }
782
783 // Try incrementing the front index to steal the batch.
784 if self
785 .inner
786 .front
787 .compare_exchange(
788 f,
789 f.wrapping_add(batch_size + 1),
790 Ordering::SeqCst,
791 Ordering::Relaxed,
792 )
793 .is_err()
794 {
795 // We didn't steal this task, forget it.
796 mem::forget(task);
797 return Steal::Retry;
798 }
799
800 dest_b = dest_b.wrapping_add(batch_size);
801 }
802
803 // Steal a batch of tasks from the front one by one.
804 Flavor::Lifo => {
805 // Try incrementing the front index to steal the task.
806 if self
807 .inner
808 .front
809 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
810 .is_err()
811 {
812 // We didn't steal this task, forget it.
813 mem::forget(task);
814 return Steal::Retry;
815 }
816
817 // Move the front index one step forward.
818 f = f.wrapping_add(1);
819
820 // Repeat the same procedure for the batch steals.
821 let size = batch_size;
822 for i in 0..size {
823 // We've already got the current front index. Now execute the fence to
824 // synchronize with other threads.
825 atomic::fence(Ordering::SeqCst);
826
827 // Load the back index.
828 let b = self.inner.back.load(Ordering::Acquire);
829
830 // Is the queue empty?
831 if b.wrapping_sub(f) <= 0 {
832 batch_size = i;
833 break;
834 }
835
836 // Read the task at the front.
837 let tmp = unsafe { buffer.deref().read(f) };
838
839 // Try incrementing the front index to steal the task.
840 if self
841 .inner
842 .front
843 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
844 .is_err()
845 {
846 // We didn't steal this task, forget it and break from the loop.
847 mem::forget(tmp);
848 batch_size = i;
849 break;
850 }
851
852 // Write the previously stolen task into the destination buffer.
853 unsafe {
854 dest_buffer.write(dest_b, mem::replace(&mut task, tmp));
855 }
856
857 // Move the source front index and the destination back index one step forward.
858 f = f.wrapping_add(1);
859 dest_b = dest_b.wrapping_add(1);
860 }
861
862 // If stealing into a FIFO queue, stolen tasks need to be reversed.
863 if dest.flavor == Flavor::Fifo {
864 for i in 0..batch_size / 2 {
865 unsafe {
866 let i1 = dest_b.wrapping_sub(batch_size - i);
867 let i2 = dest_b.wrapping_sub(i + 1);
868 let t1 = dest_buffer.read(i1);
869 let t2 = dest_buffer.read(i2);
870 dest_buffer.write(i1, t2);
871 dest_buffer.write(i2, t1);
872 }
873 }
874 }
875 }
876 }
877
878 atomic::fence(Ordering::Release);
879
880 // Update the back index in the destination queue.
881 //
882 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
883 // races because it doesn't understand fences.
884 dest.inner.back.store(dest_b, Ordering::Release);
885
886 // Return with success.
887 Steal::Success(task)
888 }
889
890 ///
891 /// Steals a batch of tasks and amend to the given worker's run queue with the specified amount.
892 /// If requested amount is above the threshold of time-ahead it defaults to time-ahead amount.
893 pub fn steal_batch_and_pop_with_amount(&self, dest: &Worker<T>, amount: usize) -> Steal<T> {
894 // Load the front index.
895 let mut f = self.inner.front.load(Ordering::Acquire);
896
897 // A SeqCst fence is needed here.
898 //
899 // If the current thread is already pinned (reentrantly), we must manually issue the
900 // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
901 // have to.
902 if epoch::is_pinned() {
903 atomic::fence(Ordering::SeqCst);
904 }
905
906 let guard = &epoch::pin();
907
908 // Shadow the requested amount;
909 let mut amount = amount;
910
911 // Load the back index.
912 let b = self.inner.back.load(Ordering::Acquire);
913
914 // Is the queue empty?
915 let len = b.wrapping_sub(f);
916 if len <= 0 {
917 return Steal::Empty;
918 }
919
920 let default_time_ahead = (len as usize - 1) / 2;
921 if amount > default_time_ahead as usize {
922 amount = default_time_ahead;
923 }
924
925 // Reserve capacity for the stolen batch.
926 let batch_size = cmp::min(amount, MAX_BATCH - 1);
927 dest.reserve(batch_size);
928 let mut batch_size = batch_size as isize;
929
930 // Get the destination buffer and back index.
931 let dest_buffer = dest.buffer.get();
932 let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
933
934 // Load the buffer
935 let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
936
937 // Read the task at the front.
938 let mut task = unsafe { buffer.deref().read(f) };
939
940 match self.flavor {
941 // Steal a batch of tasks from the front at once.
942 Flavor::Fifo => {
943 // Copy the batch from the source to the destination buffer.
944 match dest.flavor {
945 Flavor::Fifo => {
946 for i in 0..batch_size {
947 unsafe {
948 let task = buffer.deref().read(f.wrapping_add(i + 1));
949 dest_buffer.write(dest_b.wrapping_add(i), task);
950 }
951 }
952 }
953 Flavor::Lifo => {
954 for i in 0..batch_size {
955 unsafe {
956 let task = buffer.deref().read(f.wrapping_add(i + 1));
957 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
958 }
959 }
960 }
961 }
962
963 // Try incrementing the front index to steal the batch.
964 if self
965 .inner
966 .front
967 .compare_exchange(
968 f,
969 f.wrapping_add(batch_size + 1),
970 Ordering::SeqCst,
971 Ordering::Relaxed,
972 )
973 .is_err()
974 {
975 // We didn't steal this task, forget it.
976 mem::forget(task);
977 return Steal::Retry;
978 }
979
980 dest_b = dest_b.wrapping_add(batch_size);
981 }
982
983 // Steal a batch of tasks from the front one by one.
984 Flavor::Lifo => {
985 // Try incrementing the front index to steal the task.
986 if self
987 .inner
988 .front
989 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
990 .is_err()
991 {
992 // We didn't steal this task, forget it.
993 mem::forget(task);
994 return Steal::Retry;
995 }
996
997 // Move the front index one step forward.
998 f = f.wrapping_add(1);
999
1000 // Repeat the same procedure for the batch steals.
1001 let size = batch_size;
1002 for i in 0..size {
1003 // We've already got the current front index. Now execute the fence to
1004 // synchronize with other threads.
1005 atomic::fence(Ordering::SeqCst);
1006
1007 // Load the back index.
1008 let b = self.inner.back.load(Ordering::Acquire);
1009
1010 // Is the queue empty?
1011 if b.wrapping_sub(f) <= 0 {
1012 batch_size = i;
1013 break;
1014 }
1015
1016 // Read the task at the front.
1017 let tmp = unsafe { buffer.deref().read(f) };
1018
1019 // Try incrementing the front index to steal the task.
1020 if self
1021 .inner
1022 .front
1023 .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
1024 .is_err()
1025 {
1026 // We didn't steal this task, forget it and break from the loop.
1027 mem::forget(tmp);
1028 batch_size = i;
1029 break;
1030 }
1031
1032 // Write the previously stolen task into the destination buffer.
1033 unsafe {
1034 dest_buffer.write(dest_b, mem::replace(&mut task, tmp));
1035 }
1036
1037 // Move the source front index and the destination back index one step forward.
1038 f = f.wrapping_add(1);
1039 dest_b = dest_b.wrapping_add(1);
1040 }
1041
1042 // If stealing into a FIFO queue, stolen tasks need to be reversed.
1043 if dest.flavor == Flavor::Fifo {
1044 for i in 0..batch_size / 2 {
1045 unsafe {
1046 let i1 = dest_b.wrapping_sub(batch_size - i);
1047 let i2 = dest_b.wrapping_sub(i + 1);
1048 let t1 = dest_buffer.read(i1);
1049 let t2 = dest_buffer.read(i2);
1050 dest_buffer.write(i1, t2);
1051 dest_buffer.write(i2, t1);
1052 }
1053 }
1054 }
1055 }
1056 }
1057
1058 atomic::fence(Ordering::Release);
1059
1060 // Update the back index in the destination queue.
1061 //
1062 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
1063 // races because it doesn't understand fences.
1064 dest.inner.back.store(dest_b, Ordering::Release);
1065
1066 // Return with success.
1067 Steal::Success(task)
1068 }
1069}
1070
1071impl<T> Clone for Stealer<T> {
1072 fn clone(&self) -> Stealer<T> {
1073 Stealer {
1074 inner: self.inner.clone(),
1075 flavor: self.flavor,
1076 }
1077 }
1078}
1079
1080impl<T> fmt::Debug for Stealer<T> {
1081 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1082 f.pad("Stealer { .. }")
1083 }
1084}
1085
1086// Bits indicating the state of a slot:
1087// * If a task has been written into the slot, `WRITE` is set.
1088// * If a task has been read from the slot, `READ` is set.
1089// * If the block is being destroyed, `DESTROY` is set.
1090const WRITE: usize = 1;
1091const READ: usize = 2;
1092const DESTROY: usize = 4;
1093
1094// Each block covers one "lap" of indices.
1095const LAP: usize = 64;
1096// The maximum number of values a block can hold.
1097const BLOCK_CAP: usize = LAP - 1;
1098// How many lower bits are reserved for metadata.
1099const SHIFT: usize = 1;
1100// Indicates that the block is not the last one.
1101const HAS_NEXT: usize = 1;
1102
1103/// A slot in a block.
1104struct Slot<T> {
1105 /// The task.
1106 task: UnsafeCell<ManuallyDrop<T>>,
1107
1108 /// The state of the slot.
1109 state: AtomicUsize,
1110}
1111
1112impl<T> Slot<T> {
1113 /// Waits until a task is written into the slot.
1114 fn wait_write(&self) {
1115 let backoff = Backoff::new();
1116 while self.state.load(Ordering::Acquire) & WRITE == 0 {
1117 backoff.snooze();
1118 }
1119 }
1120}
1121
1122/// A block in a linked list.
1123///
1124/// Each block in the list can hold up to `BLOCK_CAP` values.
1125struct Block<T> {
1126 /// The next block in the linked list.
1127 next: AtomicPtr<Block<T>>,
1128
1129 /// Slots for values.
1130 slots: [Slot<T>; BLOCK_CAP],
1131}
1132
1133impl<T> Block<T> {
1134 /// Creates an empty block that starts at `start_index`.
1135 fn new() -> Block<T> {
1136 unsafe { mem::zeroed() }
1137 }
1138
1139 /// Waits until the next pointer is set.
1140 fn wait_next(&self) -> *mut Block<T> {
1141 let backoff = Backoff::new();
1142 loop {
1143 let next = self.next.load(Ordering::Acquire);
1144 if !next.is_null() {
1145 return next;
1146 }
1147 backoff.snooze();
1148 }
1149 }
1150
1151 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
1152 unsafe fn destroy(this: *mut Block<T>, count: usize) {
1153 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
1154 // begun destruction of the block.
1155 for i in (0..count).rev() {
1156 let slot = (*this).slots.get_unchecked(i);
1157
1158 // Mark the `DESTROY` bit if a thread is still using the slot.
1159 if slot.state.load(Ordering::Acquire) & READ == 0
1160 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
1161 {
1162 // If a thread is still using the slot, it will continue destruction of the block.
1163 return;
1164 }
1165 }
1166
1167 // No thread is using the block, now it is safe to destroy it.
1168 drop(Box::from_raw(this));
1169 }
1170}
1171
1172/// A position in a queue.
1173struct Position<T> {
1174 /// The index in the queue.
1175 index: AtomicUsize,
1176
1177 /// The block in the linked list.
1178 block: AtomicPtr<Block<T>>,
1179}
1180
1181/// An injector queue.
1182///
1183/// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have
1184/// a single injector queue, which is the entry point for new tasks.
1185pub struct Injector<T> {
1186 /// The head of the queue.
1187 head: CachePadded<Position<T>>,
1188
1189 /// The tail of the queue.
1190 tail: CachePadded<Position<T>>,
1191
1192 /// Indicates that dropping a `Injector<T>` may drop values of type `T`.
1193 _marker: PhantomData<T>,
1194}
1195
1196unsafe impl<T: Send> Send for Injector<T> {}
1197unsafe impl<T: Send> Sync for Injector<T> {}
1198
1199impl<T> Default for Injector<T> {
1200 fn default() -> Self {
1201 let block = Box::into_raw(Box::new(Block::<T>::new()));
1202 Self {
1203 head: CachePadded::new(Position {
1204 block: AtomicPtr::new(block),
1205 index: AtomicUsize::new(0),
1206 }),
1207 tail: CachePadded::new(Position {
1208 block: AtomicPtr::new(block),
1209 index: AtomicUsize::new(0),
1210 }),
1211 _marker: PhantomData,
1212 }
1213 }
1214}
1215
1216impl<T> Injector<T> {
1217 /// Creates a new injector queue.
1218 pub fn new() -> Self {
1219 Self::default()
1220 }
1221
1222 /// Pushes a task into the queue.
1223 pub fn push(&self, task: T) {
1224 let backoff = Backoff::new();
1225 let mut tail = self.tail.index.load(Ordering::Acquire);
1226 let mut block = self.tail.block.load(Ordering::Acquire);
1227 let mut next_block = None;
1228
1229 loop {
1230 // Calculate the offset of the index into the block.
1231 let offset = (tail >> SHIFT) % LAP;
1232
1233 // If we reached the end of the block, wait until the next one is installed.
1234 if offset == BLOCK_CAP {
1235 backoff.snooze();
1236 tail = self.tail.index.load(Ordering::Acquire);
1237 block = self.tail.block.load(Ordering::Acquire);
1238 continue;
1239 }
1240
1241 // If we're going to have to install the next block, allocate it in advance in order to
1242 // make the wait for other threads as short as possible.
1243 if offset + 1 == BLOCK_CAP && next_block.is_none() {
1244 next_block = Some(Box::new(Block::<T>::new()));
1245 }
1246
1247 let new_tail = tail + (1 << SHIFT);
1248
1249 // Try advancing the tail forward.
1250 match self.tail.index.compare_exchange_weak(
1251 tail,
1252 new_tail,
1253 Ordering::SeqCst,
1254 Ordering::Acquire,
1255 ) {
1256 Ok(_) => unsafe {
1257 // If we've reached the end of the block, install the next one.
1258 if offset + 1 == BLOCK_CAP {
1259 let next_block = Box::into_raw(next_block.unwrap());
1260 let next_index = new_tail.wrapping_add(1 << SHIFT);
1261
1262 self.tail.block.store(next_block, Ordering::Release);
1263 self.tail.index.store(next_index, Ordering::Release);
1264 (*block).next.store(next_block, Ordering::Release);
1265 }
1266
1267 // Write the task into the slot.
1268 let slot = (*block).slots.get_unchecked(offset);
1269 slot.task.get().write(ManuallyDrop::new(task));
1270 slot.state.fetch_or(WRITE, Ordering::Release);
1271
1272 return;
1273 },
1274 Err(t) => {
1275 tail = t;
1276 block = self.tail.block.load(Ordering::Acquire);
1277 backoff.spin();
1278 }
1279 }
1280 }
1281 }
1282
1283 /// Steals a task from the queue.
1284 pub fn steal(&self) -> Steal<T> {
1285 let mut head;
1286 let mut block;
1287 let mut offset;
1288
1289 let backoff = Backoff::new();
1290 loop {
1291 head = self.head.index.load(Ordering::Acquire);
1292 block = self.head.block.load(Ordering::Acquire);
1293
1294 // Calculate the offset of the index into the block.
1295 offset = (head >> SHIFT) % LAP;
1296
1297 // If we reached the end of the block, wait until the next one is installed.
1298 if offset == BLOCK_CAP {
1299 backoff.snooze();
1300 } else {
1301 break;
1302 }
1303 }
1304
1305 let mut new_head = head + (1 << SHIFT);
1306
1307 if new_head & HAS_NEXT == 0 {
1308 atomic::fence(Ordering::SeqCst);
1309 let tail = self.tail.index.load(Ordering::Relaxed);
1310
1311 // If the tail equals the head, that means the queue is empty.
1312 if head >> SHIFT == tail >> SHIFT {
1313 return Steal::Empty;
1314 }
1315
1316 // If head and tail are not in the same block, set `HAS_NEXT` in head.
1317 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1318 new_head |= HAS_NEXT;
1319 }
1320 }
1321
1322 // Try moving the head index forward.
1323 if self
1324 .head
1325 .index
1326 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1327 .is_err()
1328 {
1329 return Steal::Retry;
1330 }
1331
1332 unsafe {
1333 // If we've reached the end of the block, move to the next one.
1334 if offset + 1 == BLOCK_CAP {
1335 let next = (*block).wait_next();
1336 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1337 if !(*next).next.load(Ordering::Relaxed).is_null() {
1338 next_index |= HAS_NEXT;
1339 }
1340
1341 self.head.block.store(next, Ordering::Release);
1342 self.head.index.store(next_index, Ordering::Release);
1343 }
1344
1345 // Read the task.
1346 let slot = (*block).slots.get_unchecked(offset);
1347 slot.wait_write();
1348 let m = slot.task.get().read();
1349 let task = ManuallyDrop::into_inner(m);
1350
1351 // Destroy the block if we've reached the end, or if another thread wanted to destroy
1352 // but couldn't because we were busy reading from the slot.
1353 if offset + 1 == BLOCK_CAP {
1354 Block::destroy(block, offset);
1355 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1356 Block::destroy(block, offset);
1357 }
1358
1359 Steal::Success(task)
1360 }
1361 }
1362
1363 /// Steals a batch of tasks and pushes them into a worker.
1364 ///
1365 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1366 /// steal around half of the tasks in the queue, but also not more than some constant limit.
1367 pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
1368 let mut head;
1369 let mut block;
1370 let mut offset;
1371
1372 let backoff = Backoff::new();
1373 loop {
1374 head = self.head.index.load(Ordering::Acquire);
1375 block = self.head.block.load(Ordering::Acquire);
1376
1377 // Calculate the offset of the index into the block.
1378 offset = (head >> SHIFT) % LAP;
1379
1380 // If we reached the end of the block, wait until the next one is installed.
1381 if offset == BLOCK_CAP {
1382 backoff.snooze();
1383 } else {
1384 break;
1385 }
1386 }
1387
1388 let mut new_head = head;
1389 let advance;
1390
1391 if new_head & HAS_NEXT == 0 {
1392 atomic::fence(Ordering::SeqCst);
1393 let tail = self.tail.index.load(Ordering::Relaxed);
1394
1395 // If the tail equals the head, that means the queue is empty.
1396 if head >> SHIFT == tail >> SHIFT {
1397 return Steal::Empty;
1398 }
1399
1400 // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate
1401 // the right batch size to steal.
1402 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1403 new_head |= HAS_NEXT;
1404 // We can steal all tasks till the end of the block.
1405 advance = (BLOCK_CAP - offset).min(MAX_BATCH);
1406 } else {
1407 let len = (tail - head) >> SHIFT;
1408 // Steal half of the available tasks.
1409 advance = ((len + 1) / 2).min(MAX_BATCH);
1410 }
1411 } else {
1412 // We can steal all tasks till the end of the block.
1413 advance = (BLOCK_CAP - offset).min(MAX_BATCH);
1414 }
1415
1416 new_head += advance << SHIFT;
1417 let new_offset = offset + advance;
1418
1419 // Try moving the head index forward.
1420 if self
1421 .head
1422 .index
1423 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1424 .is_err()
1425 {
1426 return Steal::Retry;
1427 }
1428
1429 // Reserve capacity for the stolen batch.
1430 let batch_size = new_offset - offset;
1431 dest.reserve(batch_size);
1432
1433 // Get the destination buffer and back index.
1434 let dest_buffer = dest.buffer.get();
1435 let dest_b = dest.inner.back.load(Ordering::Relaxed);
1436
1437 unsafe {
1438 // If we've reached the end of the block, move to the next one.
1439 if new_offset == BLOCK_CAP {
1440 let next = (*block).wait_next();
1441 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1442 if !(*next).next.load(Ordering::Relaxed).is_null() {
1443 next_index |= HAS_NEXT;
1444 }
1445
1446 self.head.block.store(next, Ordering::Release);
1447 self.head.index.store(next_index, Ordering::Release);
1448 }
1449
1450 // Copy values from the injector into the destination queue.
1451 match dest.flavor {
1452 Flavor::Fifo => {
1453 for i in 0..batch_size {
1454 // Read the task.
1455 let slot = (*block).slots.get_unchecked(offset + i);
1456 slot.wait_write();
1457 let m = slot.task.get().read();
1458 let task = ManuallyDrop::into_inner(m);
1459
1460 // Write it into the destination queue.
1461 dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1462 }
1463 }
1464
1465 Flavor::Lifo => {
1466 for i in 0..batch_size {
1467 // Read the task.
1468 let slot = (*block).slots.get_unchecked(offset + i);
1469 slot.wait_write();
1470 let m = slot.task.get().read();
1471 let task = ManuallyDrop::into_inner(m);
1472
1473 // Write it into the destination queue.
1474 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1475 }
1476 }
1477 }
1478
1479 atomic::fence(Ordering::Release);
1480
1481 // Update the back index in the destination queue.
1482 //
1483 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1484 // data races because it doesn't understand fences.
1485 dest.inner
1486 .back
1487 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1488
1489 // Destroy the block if we've reached the end, or if another thread wanted to destroy
1490 // but couldn't because we were busy reading from the slot.
1491 if new_offset == BLOCK_CAP {
1492 Block::destroy(block, offset);
1493 } else {
1494 for i in offset..new_offset {
1495 let slot = (*block).slots.get_unchecked(i);
1496
1497 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1498 Block::destroy(block, offset);
1499 break;
1500 }
1501 }
1502 }
1503
1504 Steal::Success(())
1505 }
1506 }
1507
1508 /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker.
1509 ///
1510 /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1511 /// steal around half of the tasks in the queue, but also not more than some constant limit.
1512 pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
1513 let mut head;
1514 let mut block;
1515 let mut offset;
1516
1517 let backoff = Backoff::new();
1518 loop {
1519 head = self.head.index.load(Ordering::Acquire);
1520 block = self.head.block.load(Ordering::Acquire);
1521
1522 // Calculate the offset of the index into the block.
1523 offset = (head >> SHIFT) % LAP;
1524
1525 // If we reached the end of the block, wait until the next one is installed.
1526 if offset == BLOCK_CAP {
1527 backoff.snooze();
1528 } else {
1529 break;
1530 }
1531 }
1532
1533 let mut new_head = head;
1534 let advance;
1535
1536 if new_head & HAS_NEXT == 0 {
1537 atomic::fence(Ordering::SeqCst);
1538 let tail = self.tail.index.load(Ordering::Relaxed);
1539
1540 // If the tail equals the head, that means the queue is empty.
1541 if head >> SHIFT == tail >> SHIFT {
1542 return Steal::Empty;
1543 }
1544
1545 // If head and tail are not in the same block, set `HAS_NEXT` in head.
1546 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1547 new_head |= HAS_NEXT;
1548 // We can steal all tasks till the end of the block.
1549 advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1);
1550 } else {
1551 let len = (tail - head) >> SHIFT;
1552 // Steal half of the available tasks.
1553 advance = ((len + 1) / 2).min(MAX_BATCH + 1);
1554 }
1555 } else {
1556 // We can steal all tasks till the end of the block.
1557 advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1);
1558 }
1559
1560 new_head += advance << SHIFT;
1561 let new_offset = offset + advance;
1562
1563 // Try moving the head index forward.
1564 if self
1565 .head
1566 .index
1567 .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1568 .is_err()
1569 {
1570 return Steal::Retry;
1571 }
1572
1573 // Reserve capacity for the stolen batch.
1574 let batch_size = new_offset - offset - 1;
1575 dest.reserve(batch_size);
1576
1577 // Get the destination buffer and back index.
1578 let dest_buffer = dest.buffer.get();
1579 let dest_b = dest.inner.back.load(Ordering::Relaxed);
1580
1581 unsafe {
1582 // If we've reached the end of the block, move to the next one.
1583 if new_offset == BLOCK_CAP {
1584 let next = (*block).wait_next();
1585 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1586 if !(*next).next.load(Ordering::Relaxed).is_null() {
1587 next_index |= HAS_NEXT;
1588 }
1589
1590 self.head.block.store(next, Ordering::Release);
1591 self.head.index.store(next_index, Ordering::Release);
1592 }
1593
1594 // Read the task.
1595 let slot = (*block).slots.get_unchecked(offset);
1596 slot.wait_write();
1597 let m = slot.task.get().read();
1598 let task = ManuallyDrop::into_inner(m);
1599
1600 match dest.flavor {
1601 Flavor::Fifo => {
1602 // Copy values from the injector into the destination queue.
1603 for i in 0..batch_size {
1604 // Read the task.
1605 let slot = (*block).slots.get_unchecked(offset + i + 1);
1606 slot.wait_write();
1607 let m = slot.task.get().read();
1608 let task = ManuallyDrop::into_inner(m);
1609
1610 // Write it into the destination queue.
1611 dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1612 }
1613 }
1614
1615 Flavor::Lifo => {
1616 // Copy values from the injector into the destination queue.
1617 for i in 0..batch_size {
1618 // Read the task.
1619 let slot = (*block).slots.get_unchecked(offset + i + 1);
1620 slot.wait_write();
1621 let m = slot.task.get().read();
1622 let task = ManuallyDrop::into_inner(m);
1623
1624 // Write it into the destination queue.
1625 dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1626 }
1627 }
1628 }
1629
1630 atomic::fence(Ordering::Release);
1631
1632 // Update the back index in the destination queue.
1633 //
1634 // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1635 // data races because it doesn't understand fences.
1636 dest.inner
1637 .back
1638 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1639
1640 // Destroy the block if we've reached the end, or if another thread wanted to destroy
1641 // but couldn't because we were busy reading from the slot.
1642 if new_offset == BLOCK_CAP {
1643 Block::destroy(block, offset);
1644 } else {
1645 for i in offset..new_offset {
1646 let slot = (*block).slots.get_unchecked(i);
1647
1648 if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1649 Block::destroy(block, offset);
1650 break;
1651 }
1652 }
1653 }
1654
1655 Steal::Success(task)
1656 }
1657 }
1658
1659 /// Returns `true` if the queue is empty.
1660 pub fn is_empty(&self) -> bool {
1661 let head = self.head.index.load(Ordering::SeqCst);
1662 let tail = self.tail.index.load(Ordering::SeqCst);
1663 head >> SHIFT == tail >> SHIFT
1664 }
1665}
1666
1667impl<T> Drop for Injector<T> {
1668 fn drop(&mut self) {
1669 let mut head = self.head.index.load(Ordering::Relaxed);
1670 let mut tail = self.tail.index.load(Ordering::Relaxed);
1671 let mut block = self.head.block.load(Ordering::Relaxed);
1672
1673 // Erase the lower bits.
1674 head &= !((1 << SHIFT) - 1);
1675 tail &= !((1 << SHIFT) - 1);
1676
1677 unsafe {
1678 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
1679 while head != tail {
1680 let offset = (head >> SHIFT) % LAP;
1681
1682 if offset < BLOCK_CAP {
1683 // Drop the task in the slot.
1684 let slot = (*block).slots.get_unchecked(offset);
1685 ManuallyDrop::drop(&mut *(*slot).task.get());
1686 } else {
1687 // Deallocate the block and move to the next one.
1688 let next = (*block).next.load(Ordering::Relaxed);
1689 drop(Box::from_raw(block));
1690 block = next;
1691 }
1692
1693 head = head.wrapping_add(1 << SHIFT);
1694 }
1695
1696 // Deallocate the last remaining block.
1697 drop(Box::from_raw(block));
1698 }
1699 }
1700}
1701
1702impl<T> fmt::Debug for Injector<T> {
1703 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1704 f.pad("Worker { .. }")
1705 }
1706}
1707
1708/// Possible outcomes of a steal operation.
1709#[must_use]
1710#[derive(PartialEq, Eq, Copy, Clone)]
1711pub enum Steal<T> {
1712 /// The queue was empty at the time of stealing.
1713 Empty,
1714
1715 /// At least one task was successfully stolen.
1716 Success(T),
1717
1718 /// The steal operation needs to be retried.
1719 Retry,
1720}
1721
1722impl<T> Steal<T> {
1723 /// Returns `true` if the queue was empty at the time of stealing.
1724 pub fn is_empty(&self) -> bool {
1725 matches!(self, Steal::Empty)
1726 }
1727
1728 /// Returns `true` if at least one task was stolen.
1729 pub fn is_success(&self) -> bool {
1730 matches!(self, Steal::Success(_))
1731 }
1732
1733 /// Returns `true` if the steal operation needs to be retried.
1734 pub fn is_retry(&self) -> bool {
1735 matches!(self, Steal::Retry)
1736 }
1737
1738 /// Returns the result of the operation, if successful.
1739 pub fn success(self) -> Option<T> {
1740 match self {
1741 Steal::Success(res) => Some(res),
1742 _ => None,
1743 }
1744 }
1745
1746 /// If no task was stolen, attempts another steal operation.
1747 ///
1748 /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then:
1749 ///
1750 /// * If the second steal resulted in `Success`, it is returned.
1751 /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned.
1752 /// * If both resulted in `None`, then `None` is returned.
1753 pub fn or_else<F>(self, f: F) -> Steal<T>
1754 where
1755 F: FnOnce() -> Steal<T>,
1756 {
1757 match self {
1758 Steal::Empty => f(),
1759 Steal::Success(_) => self,
1760 Steal::Retry => {
1761 if let Steal::Success(res) = f() {
1762 Steal::Success(res)
1763 } else {
1764 Steal::Retry
1765 }
1766 }
1767 }
1768 }
1769}
1770
1771impl<T> fmt::Debug for Steal<T> {
1772 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1773 match self {
1774 Steal::Empty => f.pad("Empty"),
1775 Steal::Success(_) => f.pad("Success(..)"),
1776 Steal::Retry => f.pad("Retry"),
1777 }
1778 }
1779}
1780
1781impl<T> FromIterator<Steal<T>> for Steal<T> {
1782 /// Consumes items until a [`Success`] is found and returns it.
1783 ///
1784 /// If no [`Success`] was found, but there was at least one [`Retry`], then returns [`Retry`].
1785 /// Otherwise, [`Empty`] is returned.
1786 ///
1787 /// [`Success`]: Steal::Success
1788 /// [`Retry`]: Steal::Retry
1789 /// [`Empty`]: Steal::Empty
1790 fn from_iter<I>(iter: I) -> Steal<T>
1791 where
1792 I: IntoIterator<Item = Steal<T>>,
1793 {
1794 let mut retry = false;
1795 for s in iter {
1796 match &s {
1797 Steal::Empty => {}
1798 Steal::Success(_) => return s,
1799 Steal::Retry => retry = true,
1800 }
1801 }
1802
1803 if retry {
1804 Steal::Retry
1805 } else {
1806 Steal::Empty
1807 }
1808 }
1809}