1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
use std::{
cell::UnsafeCell,
ptr,
sync::{
Arc,
atomic::{
AtomicPtr,
Ordering::{AcqRel, Acquire, Relaxed, Release},
},
},
};
use super::{abort, atomic_waker::AtomicWaker, task::Task};
pub(super) enum Dequeue<Fut> {
Data(*const Task<Fut>),
Empty,
Inconsistent,
}
pub(super) struct ReadyToRunQueue<Fut> {
// The waker of the task using `FuturesUnordered`.
pub(super) waker: AtomicWaker,
// Head/tail of the readiness queue
pub(super) head: AtomicPtr<Task<Fut>>,
pub(super) tail: UnsafeCell<*const Task<Fut>>,
pub(super) stub: Arc<Task<Fut>>,
}
/// An MPSC queue into which the tasks containing the futures are inserted
/// whenever the future inside is scheduled for polling.
impl<Fut> ReadyToRunQueue<Fut> {
/// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
pub(super) fn enqueue(&self, task: *const Task<Fut>) {
unsafe {
// debug_assert!((*task).queued.load(Relaxed));
// This action does not require any coordination
(*task).next_ready_to_run.store(ptr::null_mut(), Relaxed);
// Note that these atomic orderings come from 1024cores
let task = task as *mut _;
let prev = self.head.swap(task, AcqRel);
(*prev).next_ready_to_run.store(task, Release);
}
}
/// The dequeue function from the 1024cores intrusive MPSC queue algorithm
///
/// Note that this is unsafe as it required mutual exclusion (only one
/// thread can call this) to be guaranteed elsewhere.
pub(super) unsafe fn dequeue(&self) -> Dequeue<Fut> {
unsafe {
let mut tail = *self.tail.get();
let mut next = (*tail).next_ready_to_run.load(Acquire);
if tail == self.stub() {
if next.is_null() {
return Dequeue::Empty;
}
*self.tail.get() = next;
tail = next;
next = (*next).next_ready_to_run.load(Acquire);
}
if !next.is_null() {
*self.tail.get() = next;
debug_assert!(tail != self.stub());
return Dequeue::Data(tail);
}
if !std::ptr::eq(self.head.load(Acquire), tail) {
return Dequeue::Inconsistent;
}
self.enqueue(self.stub());
next = (*tail).next_ready_to_run.load(Acquire);
if !next.is_null() {
*self.tail.get() = next;
return Dequeue::Data(tail);
}
}
Dequeue::Inconsistent
}
pub(super) fn stub(&self) -> *const Task<Fut> {
Arc::as_ptr(&self.stub)
}
// Clear the queue of tasks.
//
// Note that each task has a strong reference count associated with it
// which is owned by the ready to run queue. This method just pulls out
// tasks and drops their refcounts.
//
// # Safety
//
// - All tasks **must** have had their futures dropped already (by FuturesUnordered::clear)
// - The caller **must** guarantee unique access to `self`
pub(crate) unsafe fn clear(&self) {
loop {
unsafe {
// SAFETY: We have the guarantee of mutual exclusion required by `dequeue`.
match self.dequeue() {
Dequeue::Empty => break,
Dequeue::Inconsistent => abort("inconsistent in drop"),
Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)),
}
}
}
}
}
impl<Fut> Drop for ReadyToRunQueue<Fut> {
fn drop(&mut self) {
// Once we're in the destructor for `Inner<Fut>` we need to clear out
// the ready to run queue of tasks if there's anything left in there.
// All tasks have had their futures dropped already by the `FuturesUnordered`
// destructor above, and we have &mut self, so this is safe.
unsafe {
self.clear();
}
}
}