tokio_sync/mpsc/
list.rs

1//! A concurrent, lock-free, FIFO list.
2
3use super::block::{self, Block};
4
5use loom::{
6    self,
7    sync::atomic::{AtomicPtr, AtomicUsize},
8};
9
10use std::fmt;
11use std::ptr::NonNull;
12use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
13
14/// List queue transmit handle
15pub(crate) struct Tx<T> {
16    /// Tail in the `Block` mpmc list.
17    block_tail: AtomicPtr<Block<T>>,
18
19    /// Position to push the next message. This reference a block and offset
20    /// into the block.
21    tail_position: AtomicUsize,
22}
23
24/// List queue receive handle
25pub(crate) struct Rx<T> {
26    /// Pointer to the block being processed
27    head: NonNull<Block<T>>,
28
29    /// Next slot index to process
30    index: usize,
31
32    /// Pointer to the next block pending release
33    free_head: NonNull<Block<T>>,
34}
35
36pub(crate) fn channel<T>() -> (Tx<T>, Rx<T>) {
37    // Create the initial block shared between the tx and rx halves.
38    let initial_block = Box::new(Block::new(0));
39    let initial_block_ptr = Box::into_raw(initial_block);
40
41    let tx = Tx {
42        block_tail: AtomicPtr::new(initial_block_ptr),
43        tail_position: AtomicUsize::new(0),
44    };
45
46    let head = NonNull::new(initial_block_ptr).unwrap();
47
48    let rx = Rx {
49        head,
50        index: 0,
51        free_head: head,
52    };
53
54    (tx, rx)
55}
56
57impl<T> Tx<T> {
58    /// Push a value into the list.
59    pub(crate) fn push(&self, value: T) {
60        // First, claim a slot for the value. `Acquire` is used here to
61        // synchronize with the `fetch_add` in `reclaim_blocks`.
62        let slot_index = self.tail_position.fetch_add(1, Acquire);
63
64        // Load the current block and write the value
65        let block = self.find_block(slot_index);
66
67        unsafe {
68            // Write the value to the block
69            block.as_ref().write(slot_index, value);
70        }
71    }
72
73    /// Close the send half of the list
74    ///
75    /// Similar process as pushing a value, but instead of writing the value &
76    /// setting the ready flag, the TX_CLOSED flag is set on the block.
77    pub(crate) fn close(&self) {
78        // First, claim a slot for the value. This is the last slot that will be
79        // claimed.
80        let slot_index = self.tail_position.fetch_add(1, Acquire);
81
82        let block = self.find_block(slot_index);
83
84        unsafe { block.as_ref().tx_close() }
85    }
86
87    fn find_block(&self, slot_index: usize) -> NonNull<Block<T>> {
88        // The start index of the block that contains `index`.
89        let start_index = block::start_index(slot_index);
90
91        // The index offset into the block
92        let offset = block::offset(slot_index);
93
94        // Load the current head of the block
95        let mut block_ptr = self.block_tail.load(Acquire);
96
97        let block = unsafe { &*block_ptr };
98
99        // Calculate the distance between the tail ptr and the target block
100        let distance = block.distance(start_index);
101
102        // Decide if this call to `find_block` should attempt to update the
103        // `block_tail` pointer.
104        //
105        // Updating `block_tail` is not always performed in order to reduce
106        // contention.
107        //
108        // When set, as the routine walks the linked list, it attempts to update
109        // `block_tail`. If the update cannot be performed, `try_updating_tail`
110        // is unset.
111        let mut try_updating_tail = distance > offset;
112
113        // Walk the linked list of blocks until the block with `start_index` is
114        // found.
115        loop {
116            let block = unsafe { &(*block_ptr) };
117
118            if block.is_at_index(start_index) {
119                return unsafe { NonNull::new_unchecked(block_ptr) };
120            }
121
122            let next_block = block
123                .load_next(Acquire)
124                // There is no allocated next block, grow the linked list.
125                .unwrap_or_else(|| block.grow());
126
127            // If the block is **not** final, then the tail pointer cannot be
128            // advanced any more.
129            try_updating_tail &= block.is_final();
130
131            if try_updating_tail {
132                // Advancing `block_tail` must happen when walking the linked
133                // list. `block_tail` may not advance passed any blocks that are
134                // not "final". At the point a block is finalized, it is unknown
135                // if there are any prior blocks that are unfinalized, which
136                // makes it impossible to advance `block_tail`.
137                //
138                // While walking the linked list, `block_tail` can be advanced
139                // as long as finalized blocks are traversed.
140                //
141                // Release ordering is used to ensure that any subsequent reads
142                // are able to see the memory pointed to by `block_tail`.
143                //
144                // Acquire is not needed as any "actual" value is not accessed.
145                // At this point, the linked list is walked to acquire blocks.
146                let actual =
147                    self.block_tail
148                        .compare_and_swap(block_ptr, next_block.as_ptr(), Release);
149
150                if actual == block_ptr {
151                    // Synchronize with any senders
152                    let tail_position = self.tail_position.fetch_add(0, Release);
153
154                    unsafe {
155                        block.tx_release(tail_position);
156                    }
157                } else {
158                    // A concurrent sender is also working on advancing
159                    // `block_tail` and this thread is falling behind.
160                    //
161                    // Stop trying to advance the tail pointer
162                    try_updating_tail = false;
163                }
164            }
165
166            block_ptr = next_block.as_ptr();
167
168            loom::yield_now();
169        }
170    }
171
172    pub(crate) unsafe fn reclaim_block(&self, mut block: NonNull<Block<T>>) {
173        debug!("+ reclaim_block({:p})", block);
174        // The block has been removed from the linked list and ownership
175        // is reclaimed.
176        //
177        // Before dropping the block, see if it can be reused by
178        // inserting it back at the end of the linked list.
179        //
180        // First, reset the data
181        block.as_mut().reclaim();
182
183        let mut reused = false;
184
185        // Attempt to insert the block at the end
186        //
187        // Walk at most three times
188        //
189        let curr_ptr = self.block_tail.load(Acquire);
190
191        // The pointer can never be null
192        debug_assert!(!curr_ptr.is_null());
193
194        let mut curr = NonNull::new_unchecked(curr_ptr);
195
196        // TODO: Unify this logic with Block::grow
197        for _ in 0..3 {
198            match curr.as_ref().try_push(&mut block, AcqRel) {
199                Ok(_) => {
200                    reused = true;
201                    break;
202                }
203                Err(next) => {
204                    curr = next;
205                }
206            }
207        }
208
209        if !reused {
210            debug!(" + block freed {:p}", block);
211            let _ = Box::from_raw(block.as_ptr());
212        }
213    }
214}
215
216impl<T> fmt::Debug for Tx<T> {
217    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
218        fmt.debug_struct("Tx")
219            .field("block_tail", &self.block_tail.load(Relaxed))
220            .field("tail_position", &self.tail_position.load(Relaxed))
221            .finish()
222    }
223}
224
225impl<T> Rx<T> {
226    /// Pop the next value off the queue
227    pub(crate) fn pop(&mut self, tx: &Tx<T>) -> Option<block::Read<T>> {
228        // Advance `head`, if needed
229        if !self.try_advancing_head() {
230            debug!("+ !self.try_advancing_head() -> false");
231            return None;
232        }
233
234        self.reclaim_blocks(tx);
235
236        unsafe {
237            let block = self.head.as_ref();
238
239            let ret = block.read(self.index);
240
241            if let Some(block::Read::Value(..)) = ret {
242                self.index = self.index.wrapping_add(1);
243            }
244
245            ret
246        }
247    }
248
249    /// Try advancing the block pointer to the block referenced by `self.index`.
250    ///
251    /// Returns `true` if successful, `false` if there is no next block to load.
252    fn try_advancing_head(&mut self) -> bool {
253        let block_index = block::start_index(self.index);
254
255        loop {
256            let next_block = {
257                let block = unsafe { self.head.as_ref() };
258
259                if block.is_at_index(block_index) {
260                    return true;
261                }
262
263                block.load_next(Acquire)
264            };
265
266            let next_block = match next_block {
267                Some(next_block) => next_block,
268                None => {
269                    return false;
270                }
271            };
272
273            self.head = next_block;
274
275            loom::yield_now();
276        }
277    }
278
279    fn reclaim_blocks(&mut self, tx: &Tx<T>) {
280        debug!("+ reclaim_blocks()");
281
282        while self.free_head != self.head {
283            unsafe {
284                // Get a handle to the block that will be freed and update
285                // `free_head` to point to the next block.
286                let block = self.free_head;
287
288                let observed_tail_position = block.as_ref().observed_tail_position();
289
290                let required_index = match observed_tail_position {
291                    Some(i) => i,
292                    None => return,
293                };
294
295                if required_index > self.index {
296                    return;
297                }
298
299                // We may read the next pointer with `Relaxed` ordering as it is
300                // guaranteed that the `reclaim_blocks` routine trails the `recv`
301                // routine. Any memory accessed by `reclaim_blocks` has already
302                // been acquired by `recv`.
303                let next_block = block.as_ref().load_next(Relaxed);
304
305                // Update the free list head
306                self.free_head = next_block.unwrap();
307
308                // Push the emptied block onto the back of the queue, making it
309                // available to senders.
310                tx.reclaim_block(block);
311            }
312
313            loom::yield_now();
314        }
315    }
316
317    /// Effectively `Drop` all the blocks. Should only be called once, when
318    /// the list is dropping.
319    pub(super) unsafe fn free_blocks(&mut self) {
320        debug!("+ free_blocks()");
321        debug_assert_ne!(self.free_head, NonNull::dangling());
322
323        let mut cur = Some(self.free_head);
324
325        #[cfg(debug_assertions)]
326        {
327            // to trigger the debug assert above so as to catch that we
328            // don't call `free_blocks` more than once.
329            self.free_head = NonNull::dangling();
330            self.head = NonNull::dangling();
331        }
332
333        while let Some(block) = cur {
334            cur = block.as_ref().load_next(Relaxed);
335            debug!(" + free: block = {:p}", block);
336            drop(Box::from_raw(block.as_ptr()));
337        }
338    }
339}
340
341impl<T> fmt::Debug for Rx<T> {
342    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
343        fmt.debug_struct("Rx")
344            .field("head", &self.head)
345            .field("index", &self.index)
346            .field("free_head", &self.free_head)
347            .finish()
348    }
349}