Skip to main content

tokio_sync/mpsc/
block.rs

1use loom::{
2    self,
3    sync::atomic::{AtomicPtr, AtomicUsize},
4    sync::CausalCell,
5};
6
7use std::mem::{self, ManuallyDrop};
8use std::ops;
9use std::ptr::{self, NonNull};
10use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release};
11
12/// A block in a linked list.
13///
14/// Each block in the list can hold up to `BLOCK_CAP` messages.
15pub(crate) struct Block<T> {
16    /// The start index of this block.
17    ///
18    /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`.
19    start_index: usize,
20
21    /// The next block in the linked list.
22    next: AtomicPtr<Block<T>>,
23
24    /// Bitfield tracking slots that are ready to have their values consumed.
25    ready_slots: AtomicUsize,
26
27    /// The observed `tail_position` value *after* the block has been passed by
28    /// `block_tail`.
29    observed_tail_position: CausalCell<usize>,
30
31    /// Array containing values pushed into the block. Values are stored in a
32    /// continuous array in order to improve cache line behavior when reading.
33    /// The values must be manually dropped.
34    values: Values<T>,
35}
36
37pub(crate) enum Read<T> {
38    Value(T),
39    Closed,
40}
41
42struct Values<T>([CausalCell<ManuallyDrop<T>>; BLOCK_CAP]);
43
44use super::BLOCK_CAP;
45
46/// Masks an index to get the block identifier
47const BLOCK_MASK: usize = !(BLOCK_CAP - 1);
48
49/// Masks an index to get the value offset in a block.
50const SLOT_MASK: usize = BLOCK_CAP - 1;
51
52/// Flag tracking that a block has gone through the sender's release routine.
53///
54/// When this is set, the receiver may consider freeing the block.
55const RELEASED: usize = 1 << BLOCK_CAP;
56
57/// Flag tracking all senders dropped.
58///
59/// When this flag is set, the send half of the channel has closed.
60const TX_CLOSED: usize = RELEASED << 1;
61
62/// Mask covering all bits used to track slot readiness.
63const READY_MASK: usize = RELEASED - 1;
64
65/// Returns the index of the first slot in the block referenced by `slot_index`.
66#[inline(always)]
67pub(crate) fn start_index(slot_index: usize) -> usize {
68    BLOCK_MASK & slot_index
69}
70
71/// Returns the offset into the block referenced by `slot_index`.
72#[inline(always)]
73pub(crate) fn offset(slot_index: usize) -> usize {
74    SLOT_MASK & slot_index
75}
76
77impl<T> Block<T> {
78    pub(crate) fn new(start_index: usize) -> Block<T> {
79        Block {
80            // The absolute index in the channel of the first slot in the block.
81            start_index,
82
83            // Pointer to the next block in the linked list.
84            next: AtomicPtr::new(ptr::null_mut()),
85
86            ready_slots: AtomicUsize::new(0),
87
88            observed_tail_position: CausalCell::new(0),
89
90            // Value storage
91            values: unsafe { Values::uninitialized() },
92        }
93    }
94
95    /// Returns `true` if the block matches the given index
96    pub(crate) fn is_at_index(&self, index: usize) -> bool {
97        debug_assert!(offset(index) == 0);
98        self.start_index == index
99    }
100
101    /// Returns the number of blocks between `self` and the block at the
102    /// specified index.
103    ///
104    /// `start_index` must represent a block *after* `self`.
105    pub(crate) fn distance(&self, other_index: usize) -> usize {
106        debug_assert!(offset(other_index) == 0);
107        other_index.wrapping_sub(self.start_index) / BLOCK_CAP
108    }
109
110    /// Read the value at the given offset.
111    ///
112    /// Returns `None` if the slot is empty.
113    ///
114    /// # Safety
115    ///
116    /// To maintain safety, the caller must ensure:
117    ///
118    /// * No concurrent access to the slot.
119    pub(crate) unsafe fn read(&self, slot_index: usize) -> Option<Read<T>> {
120        let offset = offset(slot_index);
121
122        let ready_bits = self.ready_slots.load(Acquire);
123
124        if !is_ready(ready_bits, offset) {
125            if is_tx_closed(ready_bits) {
126                return Some(Read::Closed);
127            }
128
129            return None;
130        }
131
132        // Get the value
133        let value = self.values[offset].with(|ptr| ptr::read(ptr));
134
135        Some(Read::Value(ManuallyDrop::into_inner(value)))
136    }
137
138    /// Write a value to the block at the given offset.
139    ///
140    /// # Safety
141    ///
142    /// To maintain safety, the caller must ensure:
143    ///
144    /// * The slot is empty.
145    /// * No concurrent access to the slot.
146    pub(crate) unsafe fn write(&self, slot_index: usize, value: T) {
147        // Get the offset into the block
148        let slot_offset = offset(slot_index);
149
150        self.values[slot_offset].with_mut(|ptr| {
151            ptr::write(ptr, ManuallyDrop::new(value));
152        });
153
154        // Release the value. After this point, the slot ref may no longer
155        // be used. It is possible for the receiver to free the memory at
156        // any point.
157        self.set_ready(slot_offset);
158    }
159
160    /// Signal to the receiver that the sender half of the list is closed.
161    pub(crate) unsafe fn tx_close(&self) {
162        self.ready_slots.fetch_or(TX_CLOSED, Release);
163    }
164
165    /// Reset the block to a blank state. This enables reusing blocks in the
166    /// channel.
167    ///
168    /// # Safety
169    ///
170    /// To maintain safety, the caller must ensure:
171    ///
172    /// * All slots are empty.
173    /// * The caller holds a unique pointer to the block.
174    pub(crate) unsafe fn reclaim(&mut self) {
175        self.start_index = 0;
176        self.next = AtomicPtr::new(ptr::null_mut());
177        self.ready_slots = AtomicUsize::new(0);
178    }
179
180    /// Release the block to the rx half for freeing.
181    ///
182    /// This function is called by the tx half once it can be guaranteed that no
183    /// more senders will attempt to access the block.
184    ///
185    /// # Safety
186    ///
187    /// To maintain safety, the caller must ensure:
188    ///
189    /// * The block will no longer be accessed by any sender.
190    pub(crate) unsafe fn tx_release(&self, tail_position: usize) {
191        // Track the observed tail_position. Any sender targetting a greater
192        // tail_position is guaranteed to not access this block.
193        self.observed_tail_position
194            .with_mut(|ptr| *ptr = tail_position);
195
196        // Set the released bit, signalling to the receiver that it is safe to
197        // free the block's memory as soon as all slots **prior** to
198        // `observed_tail_position` have been filled.
199        self.ready_slots.fetch_or(RELEASED, Release);
200    }
201
202    /// Mark a slot as ready
203    fn set_ready(&self, slot: usize) {
204        let mask = 1 << slot;
205        self.ready_slots.fetch_or(mask, Release);
206    }
207
208    /// Returns `true` when all slots have their `ready` bits set.
209    ///
210    /// This indicates that the block is in its final state and will no longer
211    /// be mutated.
212    ///
213    /// # Implementation
214    ///
215    /// The implementation walks each slot checking the `ready` flag. It might
216    /// be that it would make more sense to coalesce ready flags as bits in a
217    /// single atomic cell. However, this could have negative impact on cache
218    /// behavior as there would be many more mutations to a single slot.
219    pub(crate) fn is_final(&self) -> bool {
220        self.ready_slots.load(Acquire) & READY_MASK == READY_MASK
221    }
222
223    /// Returns the `observed_tail_position` value, if set
224    pub(crate) fn observed_tail_position(&self) -> Option<usize> {
225        if 0 == RELEASED & self.ready_slots.load(Acquire) {
226            None
227        } else {
228            Some(self.observed_tail_position.with(|ptr| unsafe { *ptr }))
229        }
230    }
231
232    /// Load the next block
233    pub(crate) fn load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>> {
234        let ret = NonNull::new(self.next.load(ordering));
235
236        debug_assert!(unsafe {
237            ret.map(|block| block.as_ref().start_index == self.start_index.wrapping_add(BLOCK_CAP))
238                .unwrap_or(true)
239        });
240
241        ret
242    }
243
244    /// Push `block` as the next block in the link.
245    ///
246    /// Returns Ok if successful, otherwise, a pointer to the next block in
247    /// the list is returned.
248    ///
249    /// This requires that the next pointer is null.
250    ///
251    /// # Ordering
252    ///
253    /// This performs a compare-and-swap on `next` using AcqRel ordering.
254    ///
255    /// # Safety
256    ///
257    /// To maintain safety, the caller must ensure:
258    ///
259    /// * `block` is not freed until it has been removed from the list.
260    pub(crate) unsafe fn try_push(
261        &self,
262        block: &mut NonNull<Block<T>>,
263        ordering: Ordering,
264    ) -> Result<(), NonNull<Block<T>>> {
265        block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP);
266
267        let next_ptr = self
268            .next
269            .compare_and_swap(ptr::null_mut(), block.as_ptr(), ordering);
270
271        match NonNull::new(next_ptr) {
272            Some(next_ptr) => Err(next_ptr),
273            None => Ok(()),
274        }
275    }
276
277    /// Grow the `Block` linked list by allocating and appending a new block.
278    ///
279    /// The next block in the linked list is returned. This may or may not be
280    /// the one allocated by the function call.
281    ///
282    /// # Implementation
283    ///
284    /// It is assumed that `self.next` is null. A new block is allocated with
285    /// `start_index` set to be the next block. A compare-and-swap is performed
286    /// with AcqRel memory ordering. If the compare-and-swap is successful, the
287    /// newly allocated block is released to other threads walking the block
288    /// linked list. If the compare-and-swap fails, the current thread acquires
289    /// the next block in the linked list, allowing the current thread to access
290    /// the slots.
291    pub(crate) fn grow(&self) -> NonNull<Block<T>> {
292        // Create the new block. It is assumed that the block will become the
293        // next one after `&self`. If this turns out to not be the case,
294        // `start_index` is updated accordingly.
295        let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP));
296
297        let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) };
298
299        // Attempt to store the block. The first compare-and-swap attempt is
300        // "unrolled" due to minor differences in logic
301        //
302        // `AcqRel` is used as the ordering **only** when attempting the
303        // compare-and-swap on self.next.
304        //
305        // If the compare-and-swap fails, then the actual value of the cell is
306        // returned from this function and accessed by the caller. Given this,
307        // the memory must be acquired.
308        //
309        // `Release` ensures that the newly allocated block is available to
310        // other threads acquiring the next pointer.
311        let next = NonNull::new(self.next.compare_and_swap(
312            ptr::null_mut(),
313            new_block.as_ptr(),
314            AcqRel,
315        ));
316
317        let next = match next {
318            Some(next) => next,
319            None => {
320                // The compare-and-swap succeeded and the newly allocated block
321                // is successfully pushed.
322                return new_block;
323            }
324        };
325
326        // There already is a next block in the linked list. The newly allocated
327        // block could be dropped and the discovered next block returned;
328        // however, that would be wasteful. Instead, the linked list is walked
329        // by repeatedly attempting to compare-and-swap the pointer into the
330        // `next` register until the compare-and-swap succeed.
331        //
332        // Care is taken to update new_block's start_index field as appropriate.
333
334        let mut curr = next;
335
336        // TODO: Should this iteration be capped?
337        loop {
338            let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel) };
339
340            curr = match actual {
341                Ok(_) => {
342                    return next;
343                }
344                Err(curr) => curr,
345            };
346
347            // When running outside of loom, this calls `spin_loop_hint`.
348            loom::yield_now();
349        }
350    }
351}
352
353/// Returns `true` if the specificed slot has a value ready to be consumed.
354fn is_ready(bits: usize, slot: usize) -> bool {
355    let mask = 1 << slot;
356    mask == mask & bits
357}
358
359/// Returns `true` if the closed flag has been set.
360fn is_tx_closed(bits: usize) -> bool {
361    TX_CLOSED == bits & TX_CLOSED
362}
363
364impl<T> Values<T> {
365    unsafe fn uninitialized() -> Values<T> {
366        let mut vals = mem::uninitialized();
367
368        // When fuzzing, `CausalCell` needs to be initialized.
369        if_fuzz! {
370            use std::ptr;
371
372            for v in &mut vals {
373                ptr::write(
374                    v as *mut _,
375                    CausalCell::new(mem::zeroed()));
376            }
377        }
378
379        Values(vals)
380    }
381}
382
383impl<T> ops::Index<usize> for Values<T> {
384    type Output = CausalCell<ManuallyDrop<T>>;
385
386    fn index(&self, index: usize) -> &Self::Output {
387        self.0.index(index)
388    }
389}