tokio_sync/mpsc/list.rs
1//! A concurrent, lock-free, FIFO list.
2
3use super::block::{self, Block};
4
5use loom::{
6 self,
7 sync::atomic::{AtomicPtr, AtomicUsize},
8};
9
10use std::fmt;
11use std::ptr::NonNull;
12use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
13
14/// List queue transmit handle
15pub(crate) struct Tx<T> {
16 /// Tail in the `Block` mpmc list.
17 block_tail: AtomicPtr<Block<T>>,
18
19 /// Position to push the next message. This reference a block and offset
20 /// into the block.
21 tail_position: AtomicUsize,
22}
23
24/// List queue receive handle
25pub(crate) struct Rx<T> {
26 /// Pointer to the block being processed
27 head: NonNull<Block<T>>,
28
29 /// Next slot index to process
30 index: usize,
31
32 /// Pointer to the next block pending release
33 free_head: NonNull<Block<T>>,
34}
35
36pub(crate) fn channel<T>() -> (Tx<T>, Rx<T>) {
37 // Create the initial block shared between the tx and rx halves.
38 let initial_block = Box::new(Block::new(0));
39 let initial_block_ptr = Box::into_raw(initial_block);
40
41 let tx = Tx {
42 block_tail: AtomicPtr::new(initial_block_ptr),
43 tail_position: AtomicUsize::new(0),
44 };
45
46 let head = NonNull::new(initial_block_ptr).unwrap();
47
48 let rx = Rx {
49 head,
50 index: 0,
51 free_head: head,
52 };
53
54 (tx, rx)
55}
56
57impl<T> Tx<T> {
58 /// Push a value into the list.
59 pub(crate) fn push(&self, value: T) {
60 // First, claim a slot for the value. `Acquire` is used here to
61 // synchronize with the `fetch_add` in `reclaim_blocks`.
62 let slot_index = self.tail_position.fetch_add(1, Acquire);
63
64 // Load the current block and write the value
65 let block = self.find_block(slot_index);
66
67 unsafe {
68 // Write the value to the block
69 block.as_ref().write(slot_index, value);
70 }
71 }
72
73 /// Close the send half of the list
74 ///
75 /// Similar process as pushing a value, but instead of writing the value &
76 /// setting the ready flag, the TX_CLOSED flag is set on the block.
77 pub(crate) fn close(&self) {
78 // First, claim a slot for the value. This is the last slot that will be
79 // claimed.
80 let slot_index = self.tail_position.fetch_add(1, Acquire);
81
82 let block = self.find_block(slot_index);
83
84 unsafe { block.as_ref().tx_close() }
85 }
86
87 fn find_block(&self, slot_index: usize) -> NonNull<Block<T>> {
88 // The start index of the block that contains `index`.
89 let start_index = block::start_index(slot_index);
90
91 // The index offset into the block
92 let offset = block::offset(slot_index);
93
94 // Load the current head of the block
95 let mut block_ptr = self.block_tail.load(Acquire);
96
97 let block = unsafe { &*block_ptr };
98
99 // Calculate the distance between the tail ptr and the target block
100 let distance = block.distance(start_index);
101
102 // Decide if this call to `find_block` should attempt to update the
103 // `block_tail` pointer.
104 //
105 // Updating `block_tail` is not always performed in order to reduce
106 // contention.
107 //
108 // When set, as the routine walks the linked list, it attempts to update
109 // `block_tail`. If the update cannot be performed, `try_updating_tail`
110 // is unset.
111 let mut try_updating_tail = distance > offset;
112
113 // Walk the linked list of blocks until the block with `start_index` is
114 // found.
115 loop {
116 let block = unsafe { &(*block_ptr) };
117
118 if block.is_at_index(start_index) {
119 return unsafe { NonNull::new_unchecked(block_ptr) };
120 }
121
122 let next_block = block
123 .load_next(Acquire)
124 // There is no allocated next block, grow the linked list.
125 .unwrap_or_else(|| block.grow());
126
127 // If the block is **not** final, then the tail pointer cannot be
128 // advanced any more.
129 try_updating_tail &= block.is_final();
130
131 if try_updating_tail {
132 // Advancing `block_tail` must happen when walking the linked
133 // list. `block_tail` may not advance passed any blocks that are
134 // not "final". At the point a block is finalized, it is unknown
135 // if there are any prior blocks that are unfinalized, which
136 // makes it impossible to advance `block_tail`.
137 //
138 // While walking the linked list, `block_tail` can be advanced
139 // as long as finalized blocks are traversed.
140 //
141 // Release ordering is used to ensure that any subsequent reads
142 // are able to see the memory pointed to by `block_tail`.
143 //
144 // Acquire is not needed as any "actual" value is not accessed.
145 // At this point, the linked list is walked to acquire blocks.
146 let actual =
147 self.block_tail
148 .compare_and_swap(block_ptr, next_block.as_ptr(), Release);
149
150 if actual == block_ptr {
151 // Synchronize with any senders
152 let tail_position = self.tail_position.fetch_add(0, Release);
153
154 unsafe {
155 block.tx_release(tail_position);
156 }
157 } else {
158 // A concurrent sender is also working on advancing
159 // `block_tail` and this thread is falling behind.
160 //
161 // Stop trying to advance the tail pointer
162 try_updating_tail = false;
163 }
164 }
165
166 block_ptr = next_block.as_ptr();
167
168 loom::yield_now();
169 }
170 }
171
172 pub(crate) unsafe fn reclaim_block(&self, mut block: NonNull<Block<T>>) {
173 debug!("+ reclaim_block({:p})", block);
174 // The block has been removed from the linked list and ownership
175 // is reclaimed.
176 //
177 // Before dropping the block, see if it can be reused by
178 // inserting it back at the end of the linked list.
179 //
180 // First, reset the data
181 block.as_mut().reclaim();
182
183 let mut reused = false;
184
185 // Attempt to insert the block at the end
186 //
187 // Walk at most three times
188 //
189 let curr_ptr = self.block_tail.load(Acquire);
190
191 // The pointer can never be null
192 debug_assert!(!curr_ptr.is_null());
193
194 let mut curr = NonNull::new_unchecked(curr_ptr);
195
196 // TODO: Unify this logic with Block::grow
197 for _ in 0..3 {
198 match curr.as_ref().try_push(&mut block, AcqRel) {
199 Ok(_) => {
200 reused = true;
201 break;
202 }
203 Err(next) => {
204 curr = next;
205 }
206 }
207 }
208
209 if !reused {
210 debug!(" + block freed {:p}", block);
211 let _ = Box::from_raw(block.as_ptr());
212 }
213 }
214}
215
216impl<T> fmt::Debug for Tx<T> {
217 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
218 fmt.debug_struct("Tx")
219 .field("block_tail", &self.block_tail.load(Relaxed))
220 .field("tail_position", &self.tail_position.load(Relaxed))
221 .finish()
222 }
223}
224
225impl<T> Rx<T> {
226 /// Pop the next value off the queue
227 pub(crate) fn pop(&mut self, tx: &Tx<T>) -> Option<block::Read<T>> {
228 // Advance `head`, if needed
229 if !self.try_advancing_head() {
230 debug!("+ !self.try_advancing_head() -> false");
231 return None;
232 }
233
234 self.reclaim_blocks(tx);
235
236 unsafe {
237 let block = self.head.as_ref();
238
239 let ret = block.read(self.index);
240
241 if let Some(block::Read::Value(..)) = ret {
242 self.index = self.index.wrapping_add(1);
243 }
244
245 ret
246 }
247 }
248
249 /// Try advancing the block pointer to the block referenced by `self.index`.
250 ///
251 /// Returns `true` if successful, `false` if there is no next block to load.
252 fn try_advancing_head(&mut self) -> bool {
253 let block_index = block::start_index(self.index);
254
255 loop {
256 let next_block = {
257 let block = unsafe { self.head.as_ref() };
258
259 if block.is_at_index(block_index) {
260 return true;
261 }
262
263 block.load_next(Acquire)
264 };
265
266 let next_block = match next_block {
267 Some(next_block) => next_block,
268 None => {
269 return false;
270 }
271 };
272
273 self.head = next_block;
274
275 loom::yield_now();
276 }
277 }
278
279 fn reclaim_blocks(&mut self, tx: &Tx<T>) {
280 debug!("+ reclaim_blocks()");
281
282 while self.free_head != self.head {
283 unsafe {
284 // Get a handle to the block that will be freed and update
285 // `free_head` to point to the next block.
286 let block = self.free_head;
287
288 let observed_tail_position = block.as_ref().observed_tail_position();
289
290 let required_index = match observed_tail_position {
291 Some(i) => i,
292 None => return,
293 };
294
295 if required_index > self.index {
296 return;
297 }
298
299 // We may read the next pointer with `Relaxed` ordering as it is
300 // guaranteed that the `reclaim_blocks` routine trails the `recv`
301 // routine. Any memory accessed by `reclaim_blocks` has already
302 // been acquired by `recv`.
303 let next_block = block.as_ref().load_next(Relaxed);
304
305 // Update the free list head
306 self.free_head = next_block.unwrap();
307
308 // Push the emptied block onto the back of the queue, making it
309 // available to senders.
310 tx.reclaim_block(block);
311 }
312
313 loom::yield_now();
314 }
315 }
316
317 /// Effectively `Drop` all the blocks. Should only be called once, when
318 /// the list is dropping.
319 pub(super) unsafe fn free_blocks(&mut self) {
320 debug!("+ free_blocks()");
321 debug_assert_ne!(self.free_head, NonNull::dangling());
322
323 let mut cur = Some(self.free_head);
324
325 #[cfg(debug_assertions)]
326 {
327 // to trigger the debug assert above so as to catch that we
328 // don't call `free_blocks` more than once.
329 self.free_head = NonNull::dangling();
330 self.head = NonNull::dangling();
331 }
332
333 while let Some(block) = cur {
334 cur = block.as_ref().load_next(Relaxed);
335 debug!(" + free: block = {:p}", block);
336 drop(Box::from_raw(block.as_ptr()));
337 }
338 }
339}
340
341impl<T> fmt::Debug for Rx<T> {
342 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
343 fmt.debug_struct("Rx")
344 .field("head", &self.head)
345 .field("index", &self.index)
346 .field("free_head", &self.free_head)
347 .finish()
348 }
349}