Skip to main content

crossfire/flavor/
array_mpsc.rs

1use super::{FlavorBounded, FlavorImpl, FlavorSelect, Queue, Token};
2use crate::crossbeam::array_queue_mpsc::ArrayQueueMpsc;
3use std::mem::MaybeUninit;
4
5/// Simplified ArrayQueue tweaks for MPSC
6///
7/// Push and pop fast path reduced one atomic ops compared to its MPMC version (only 3 ops instead
8/// of 4),
9/// and it's faster to detect the empty / full condition (2 ops instead of 3).
10pub struct ArrayMpsc<T>(ArrayQueueMpsc<T>);
11
12impl<T> ArrayMpsc<T> {
13    pub fn new(mut bound: usize) -> Self {
14        assert!(bound <= u32::MAX as usize);
15        if bound == 0 {
16            bound = 1;
17        }
18        Self(ArrayQueueMpsc::<T>::new(bound))
19    }
20}
21
22impl<T> Queue for ArrayMpsc<T> {
23    type Item = T;
24
25    #[inline(always)]
26    fn pop(&self) -> Option<T>
27    where
28        T: Send,
29    {
30        self.0.pop(true)
31    }
32
33    #[inline(always)]
34    fn push(&self, item: T) -> Result<(), T>
35    where
36        T: Send,
37    {
38        let _item = MaybeUninit::new(item);
39        if unsafe { self.0.push_with_ptr(_item.as_ptr()) } {
40            Ok(())
41        } else {
42            Err(unsafe { _item.assume_init_read() })
43        }
44    }
45
46    #[inline(always)]
47    fn is_full(&self) -> bool {
48        self.0.is_full()
49    }
50
51    #[inline(always)]
52    fn is_empty(&self) -> bool {
53        self.0.is_empty()
54    }
55
56    #[inline(always)]
57    fn len(&self) -> usize {
58        self.0.len()
59    }
60
61    #[inline(always)]
62    fn capacity(&self) -> Option<usize> {
63        Some(self.0.capacity())
64    }
65}
66
67impl<T> FlavorImpl for ArrayMpsc<T> {
68    #[inline(always)]
69    fn try_send(&self, item: &MaybeUninit<T>) -> bool {
70        unsafe { self.0.push_with_ptr(item.as_ptr()) }
71    }
72
73    #[inline(always)]
74    fn try_send_oneshot(&self, item: *const T) -> Option<bool> {
75        unsafe { self.0.try_push_oneshot(item) }
76    }
77
78    #[inline(always)]
79    fn try_recv_cached(&self) -> Option<T> {
80        self.0.pop_cached()
81    }
82
83    #[inline]
84    fn try_recv(&self) -> Option<T> {
85        self.0.pop(false)
86    }
87
88    #[inline]
89    fn try_recv_final(&self) -> Option<T> {
90        self.0.pop(true)
91    }
92
93    #[inline]
94    fn backoff_limit(&self) -> u16 {
95        if self.0.capacity() > 10 {
96            crate::backoff::DEFAULT_LIMIT
97        } else {
98            #[cfg(target_arch = "x86_64")]
99            {
100                crate::backoff::DEFAULT_LIMIT
101            }
102            #[cfg(not(target_arch = "x86_64"))]
103            {
104                crate::backoff::MAX_LIMIT
105            }
106        }
107    }
108
109    #[inline]
110    fn may_direct_copy(&self) -> bool {
111        true
112    }
113}
114
115impl<T> FlavorSelect for ArrayMpsc<T> {
116    #[inline]
117    fn try_select(&self, final_check: bool) -> Option<Token> {
118        self.0.start_read(final_check)
119    }
120
121    #[inline(always)]
122    fn read_with_token(&self, token: Token) -> T {
123        self.0.read(token)
124    }
125}
126
127impl<T> FlavorBounded for ArrayMpsc<T> {
128    #[inline(always)]
129    fn new_with_bound(size: usize) -> Self {
130        Self::new(size)
131    }
132}