tokio_sync/mpsc/block.rs
1use loom::{
2 self,
3 sync::atomic::{AtomicPtr, AtomicUsize},
4 sync::CausalCell,
5};
6
7use std::mem::{self, ManuallyDrop};
8use std::ops;
9use std::ptr::{self, NonNull};
10use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release};
11
12/// A block in a linked list.
13///
14/// Each block in the list can hold up to `BLOCK_CAP` messages.
15pub(crate) struct Block<T> {
16 /// The start index of this block.
17 ///
18 /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`.
19 start_index: usize,
20
21 /// The next block in the linked list.
22 next: AtomicPtr<Block<T>>,
23
24 /// Bitfield tracking slots that are ready to have their values consumed.
25 ready_slots: AtomicUsize,
26
27 /// The observed `tail_position` value *after* the block has been passed by
28 /// `block_tail`.
29 observed_tail_position: CausalCell<usize>,
30
31 /// Array containing values pushed into the block. Values are stored in a
32 /// continuous array in order to improve cache line behavior when reading.
33 /// The values must be manually dropped.
34 values: Values<T>,
35}
36
37pub(crate) enum Read<T> {
38 Value(T),
39 Closed,
40}
41
42struct Values<T>([CausalCell<ManuallyDrop<T>>; BLOCK_CAP]);
43
44use super::BLOCK_CAP;
45
46/// Masks an index to get the block identifier
47const BLOCK_MASK: usize = !(BLOCK_CAP - 1);
48
49/// Masks an index to get the value offset in a block.
50const SLOT_MASK: usize = BLOCK_CAP - 1;
51
52/// Flag tracking that a block has gone through the sender's release routine.
53///
54/// When this is set, the receiver may consider freeing the block.
55const RELEASED: usize = 1 << BLOCK_CAP;
56
57/// Flag tracking all senders dropped.
58///
59/// When this flag is set, the send half of the channel has closed.
60const TX_CLOSED: usize = RELEASED << 1;
61
62/// Mask covering all bits used to track slot readiness.
63const READY_MASK: usize = RELEASED - 1;
64
65/// Returns the index of the first slot in the block referenced by `slot_index`.
66#[inline(always)]
67pub(crate) fn start_index(slot_index: usize) -> usize {
68 BLOCK_MASK & slot_index
69}
70
71/// Returns the offset into the block referenced by `slot_index`.
72#[inline(always)]
73pub(crate) fn offset(slot_index: usize) -> usize {
74 SLOT_MASK & slot_index
75}
76
77impl<T> Block<T> {
78 pub(crate) fn new(start_index: usize) -> Block<T> {
79 Block {
80 // The absolute index in the channel of the first slot in the block.
81 start_index,
82
83 // Pointer to the next block in the linked list.
84 next: AtomicPtr::new(ptr::null_mut()),
85
86 ready_slots: AtomicUsize::new(0),
87
88 observed_tail_position: CausalCell::new(0),
89
90 // Value storage
91 values: unsafe { Values::uninitialized() },
92 }
93 }
94
95 /// Returns `true` if the block matches the given index
96 pub(crate) fn is_at_index(&self, index: usize) -> bool {
97 debug_assert!(offset(index) == 0);
98 self.start_index == index
99 }
100
101 /// Returns the number of blocks between `self` and the block at the
102 /// specified index.
103 ///
104 /// `start_index` must represent a block *after* `self`.
105 pub(crate) fn distance(&self, other_index: usize) -> usize {
106 debug_assert!(offset(other_index) == 0);
107 other_index.wrapping_sub(self.start_index) / BLOCK_CAP
108 }
109
110 /// Read the value at the given offset.
111 ///
112 /// Returns `None` if the slot is empty.
113 ///
114 /// # Safety
115 ///
116 /// To maintain safety, the caller must ensure:
117 ///
118 /// * No concurrent access to the slot.
119 pub(crate) unsafe fn read(&self, slot_index: usize) -> Option<Read<T>> {
120 let offset = offset(slot_index);
121
122 let ready_bits = self.ready_slots.load(Acquire);
123
124 if !is_ready(ready_bits, offset) {
125 if is_tx_closed(ready_bits) {
126 return Some(Read::Closed);
127 }
128
129 return None;
130 }
131
132 // Get the value
133 let value = self.values[offset].with(|ptr| ptr::read(ptr));
134
135 Some(Read::Value(ManuallyDrop::into_inner(value)))
136 }
137
138 /// Write a value to the block at the given offset.
139 ///
140 /// # Safety
141 ///
142 /// To maintain safety, the caller must ensure:
143 ///
144 /// * The slot is empty.
145 /// * No concurrent access to the slot.
146 pub(crate) unsafe fn write(&self, slot_index: usize, value: T) {
147 // Get the offset into the block
148 let slot_offset = offset(slot_index);
149
150 self.values[slot_offset].with_mut(|ptr| {
151 ptr::write(ptr, ManuallyDrop::new(value));
152 });
153
154 // Release the value. After this point, the slot ref may no longer
155 // be used. It is possible for the receiver to free the memory at
156 // any point.
157 self.set_ready(slot_offset);
158 }
159
160 /// Signal to the receiver that the sender half of the list is closed.
161 pub(crate) unsafe fn tx_close(&self) {
162 self.ready_slots.fetch_or(TX_CLOSED, Release);
163 }
164
165 /// Reset the block to a blank state. This enables reusing blocks in the
166 /// channel.
167 ///
168 /// # Safety
169 ///
170 /// To maintain safety, the caller must ensure:
171 ///
172 /// * All slots are empty.
173 /// * The caller holds a unique pointer to the block.
174 pub(crate) unsafe fn reclaim(&mut self) {
175 self.start_index = 0;
176 self.next = AtomicPtr::new(ptr::null_mut());
177 self.ready_slots = AtomicUsize::new(0);
178 }
179
180 /// Release the block to the rx half for freeing.
181 ///
182 /// This function is called by the tx half once it can be guaranteed that no
183 /// more senders will attempt to access the block.
184 ///
185 /// # Safety
186 ///
187 /// To maintain safety, the caller must ensure:
188 ///
189 /// * The block will no longer be accessed by any sender.
190 pub(crate) unsafe fn tx_release(&self, tail_position: usize) {
191 // Track the observed tail_position. Any sender targetting a greater
192 // tail_position is guaranteed to not access this block.
193 self.observed_tail_position
194 .with_mut(|ptr| *ptr = tail_position);
195
196 // Set the released bit, signalling to the receiver that it is safe to
197 // free the block's memory as soon as all slots **prior** to
198 // `observed_tail_position` have been filled.
199 self.ready_slots.fetch_or(RELEASED, Release);
200 }
201
202 /// Mark a slot as ready
203 fn set_ready(&self, slot: usize) {
204 let mask = 1 << slot;
205 self.ready_slots.fetch_or(mask, Release);
206 }
207
208 /// Returns `true` when all slots have their `ready` bits set.
209 ///
210 /// This indicates that the block is in its final state and will no longer
211 /// be mutated.
212 ///
213 /// # Implementation
214 ///
215 /// The implementation walks each slot checking the `ready` flag. It might
216 /// be that it would make more sense to coalesce ready flags as bits in a
217 /// single atomic cell. However, this could have negative impact on cache
218 /// behavior as there would be many more mutations to a single slot.
219 pub(crate) fn is_final(&self) -> bool {
220 self.ready_slots.load(Acquire) & READY_MASK == READY_MASK
221 }
222
223 /// Returns the `observed_tail_position` value, if set
224 pub(crate) fn observed_tail_position(&self) -> Option<usize> {
225 if 0 == RELEASED & self.ready_slots.load(Acquire) {
226 None
227 } else {
228 Some(self.observed_tail_position.with(|ptr| unsafe { *ptr }))
229 }
230 }
231
232 /// Load the next block
233 pub(crate) fn load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>> {
234 let ret = NonNull::new(self.next.load(ordering));
235
236 debug_assert!(unsafe {
237 ret.map(|block| block.as_ref().start_index == self.start_index.wrapping_add(BLOCK_CAP))
238 .unwrap_or(true)
239 });
240
241 ret
242 }
243
244 /// Push `block` as the next block in the link.
245 ///
246 /// Returns Ok if successful, otherwise, a pointer to the next block in
247 /// the list is returned.
248 ///
249 /// This requires that the next pointer is null.
250 ///
251 /// # Ordering
252 ///
253 /// This performs a compare-and-swap on `next` using AcqRel ordering.
254 ///
255 /// # Safety
256 ///
257 /// To maintain safety, the caller must ensure:
258 ///
259 /// * `block` is not freed until it has been removed from the list.
260 pub(crate) unsafe fn try_push(
261 &self,
262 block: &mut NonNull<Block<T>>,
263 ordering: Ordering,
264 ) -> Result<(), NonNull<Block<T>>> {
265 block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP);
266
267 let next_ptr = self
268 .next
269 .compare_and_swap(ptr::null_mut(), block.as_ptr(), ordering);
270
271 match NonNull::new(next_ptr) {
272 Some(next_ptr) => Err(next_ptr),
273 None => Ok(()),
274 }
275 }
276
277 /// Grow the `Block` linked list by allocating and appending a new block.
278 ///
279 /// The next block in the linked list is returned. This may or may not be
280 /// the one allocated by the function call.
281 ///
282 /// # Implementation
283 ///
284 /// It is assumed that `self.next` is null. A new block is allocated with
285 /// `start_index` set to be the next block. A compare-and-swap is performed
286 /// with AcqRel memory ordering. If the compare-and-swap is successful, the
287 /// newly allocated block is released to other threads walking the block
288 /// linked list. If the compare-and-swap fails, the current thread acquires
289 /// the next block in the linked list, allowing the current thread to access
290 /// the slots.
291 pub(crate) fn grow(&self) -> NonNull<Block<T>> {
292 // Create the new block. It is assumed that the block will become the
293 // next one after `&self`. If this turns out to not be the case,
294 // `start_index` is updated accordingly.
295 let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP));
296
297 let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) };
298
299 // Attempt to store the block. The first compare-and-swap attempt is
300 // "unrolled" due to minor differences in logic
301 //
302 // `AcqRel` is used as the ordering **only** when attempting the
303 // compare-and-swap on self.next.
304 //
305 // If the compare-and-swap fails, then the actual value of the cell is
306 // returned from this function and accessed by the caller. Given this,
307 // the memory must be acquired.
308 //
309 // `Release` ensures that the newly allocated block is available to
310 // other threads acquiring the next pointer.
311 let next = NonNull::new(self.next.compare_and_swap(
312 ptr::null_mut(),
313 new_block.as_ptr(),
314 AcqRel,
315 ));
316
317 let next = match next {
318 Some(next) => next,
319 None => {
320 // The compare-and-swap succeeded and the newly allocated block
321 // is successfully pushed.
322 return new_block;
323 }
324 };
325
326 // There already is a next block in the linked list. The newly allocated
327 // block could be dropped and the discovered next block returned;
328 // however, that would be wasteful. Instead, the linked list is walked
329 // by repeatedly attempting to compare-and-swap the pointer into the
330 // `next` register until the compare-and-swap succeed.
331 //
332 // Care is taken to update new_block's start_index field as appropriate.
333
334 let mut curr = next;
335
336 // TODO: Should this iteration be capped?
337 loop {
338 let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel) };
339
340 curr = match actual {
341 Ok(_) => {
342 return next;
343 }
344 Err(curr) => curr,
345 };
346
347 // When running outside of loom, this calls `spin_loop_hint`.
348 loom::yield_now();
349 }
350 }
351}
352
353/// Returns `true` if the specificed slot has a value ready to be consumed.
354fn is_ready(bits: usize, slot: usize) -> bool {
355 let mask = 1 << slot;
356 mask == mask & bits
357}
358
359/// Returns `true` if the closed flag has been set.
360fn is_tx_closed(bits: usize) -> bool {
361 TX_CLOSED == bits & TX_CLOSED
362}
363
364impl<T> Values<T> {
365 unsafe fn uninitialized() -> Values<T> {
366 let mut vals = mem::uninitialized();
367
368 // When fuzzing, `CausalCell` needs to be initialized.
369 if_fuzz! {
370 use std::ptr;
371
372 for v in &mut vals {
373 ptr::write(
374 v as *mut _,
375 CausalCell::new(mem::zeroed()));
376 }
377 }
378
379 Values(vals)
380 }
381}
382
383impl<T> ops::Index<usize> for Values<T> {
384 type Output = CausalCell<ManuallyDrop<T>>;
385
386 fn index(&self, index: usize) -> &Self::Output {
387 self.0.index(index)
388 }
389}