ico_memory/mem/
queue.rs

1use crate::sync::IndexSpinlock;
2use core::marker::PhantomData;
3use core::num::NonZeroUsize;
4use core::sync::atomic::AtomicU32;
5use core::sync::atomic::AtomicUsize;
6use core::sync::atomic::Ordering;
7
8// #[allow(dead_code)]
9// #[allow(unions_with_drop_fields)]
10// pub union Swap<T, U>
11// where
12//     T: Sized,
13// {
14//     base: T,
15//     other: U,
16// }
17// // #[allow(dead_code)]
18// impl<T, U> Swap<T, U>
19// where
20//     T: Sized,
21// {
22//     pub const unsafe fn get(value: T) -> U {
23//         return Swap { base: value }.other;
24//     }
25// }
26
27struct Unique<T> {
28    ptr: *const T,           // *const for variance
29    _marker: PhantomData<T>, // For the drop checker
30}
31
32// Deriving Send and Sync is safe because we are the Unique owners
33// of this data. It's like Unique<T> is "just" T.
34unsafe impl<T: Send> Send for Unique<T> {}
35unsafe impl<T: Sync> Sync for Unique<T> {}
36
37impl<T> Unique<T> {
38    pub const fn new(ptr: *mut T) -> Self {
39        Unique {
40            ptr: ptr,
41            _marker: PhantomData,
42        }
43    }
44
45    pub fn as_ptr(&self) -> *mut T {
46        self.ptr as *mut T
47    }
48}
49
50/// A MPMC Queue based on Dmitry Vyukov's queue.  
51/// However, there is a slight modification where head and tail can be locked, as my implementation of Dmitry's queue failed some tests under peak contention  - and I've opted for a more conservative queue
52pub const QUEUE_NULL: usize = 0;
53#[repr(C)]
54pub struct QueueUsize<'a> {
55    _cache_pad_0: [u8; 64],
56    // buffer: &'a [AtomicUsize],
57    buffer: Unique<AtomicUsize>,
58    // buffer_ptr : *const AtomicUsize,
59    capacity: u32,
60    buffer_capacity_mask: u32,
61    _cache_pad_1: [u8; 64],
62    head: IndexSpinlock,
63    _cache_pad_2: [u8; 64],
64    tail: IndexSpinlock,
65    _cache_pad_3: [u8; 64],
66    _lifetime: PhantomData<&'a AtomicUsize>,
67}
68
69impl<'a> QueueUsize<'a> {
70    // const CAPACITY_MASK : u32 = CAPACITY as u32 - 1;
71
72    pub const unsafe fn from_static(
73        slice: &'a *mut AtomicUsize,
74        capacity: usize,
75    ) -> QueueUsize<'a> {
76        //pub const fn new(buffer_ptr : *const usize, capacity : usize)->Queue{
77
78        return QueueUsize {
79            head: IndexSpinlock::new(0),
80            tail: IndexSpinlock::new(0),
81            buffer: Unique::new(*slice),
82            // buffer_ptr : slice.as_ptr() as *const AtomicUsize,
83            capacity: capacity as u32,
84            buffer_capacity_mask: capacity as u32 - 1,
85            _cache_pad_0: [0; 64],
86            _cache_pad_1: [0; 64],
87            _cache_pad_2: [0; 64],
88            _cache_pad_3: [0; 64],
89            _lifetime: PhantomData,
90        };
91    }
92    pub fn clear(&self) {
93        let mut tail = self.tail.lock();
94        let mut head = self.head.lock();
95        for i in 0..self.capacity {
96            unsafe {
97                self.buffer
98                    .as_ptr()
99                    .offset(i as isize)
100                    .as_ref()
101                    .unwrap()
102                    .store(QUEUE_NULL, Ordering::Relaxed);
103            }
104        }
105        tail.write(0);
106        head.write(0);
107    }
108
109    pub fn enqueue(&self, value: NonZeroUsize) -> bool {
110        let v = value.get();
111        debug_assert_ne!(v, QUEUE_NULL);
112
113        let mut tail = self.tail.lock();
114        let tail_value = tail.read();
115
116        let storage = unsafe {
117            self.buffer
118                .as_ptr()
119                .offset(tail_value as isize)
120                .as_ref()
121                .unwrap()
122        }; //self.get_storage(tail_value as usize);
123        let stored_value = storage.load(Ordering::Relaxed);
124        if stored_value != QUEUE_NULL {
125            return false;
126        }
127        storage.store(v, Ordering::Relaxed);
128        tail.write(tail_value.wrapping_add(1) & self.buffer_capacity_mask);
129        return true;
130    }
131
132    pub fn dequeue(&self) -> Option<NonZeroUsize> {
133        let mut head = self.head.lock();
134        let head_value = head.read();
135        let storage = unsafe {
136            self.buffer
137                .as_ptr()
138                .offset(head_value as isize)
139                .as_ref()
140                .unwrap()
141        }; //self.get_storage(head_value as usize);
142        let stored_value = storage.load(Ordering::Relaxed);
143        if stored_value == QUEUE_NULL {
144            return None;
145        }
146        storage.store(QUEUE_NULL, Ordering::Relaxed);
147        head.write(head_value.wrapping_add(1) & self.buffer_capacity_mask);
148        unsafe {
149            return Some(NonZeroUsize::new_unchecked(stored_value));
150        }
151    }
152}
153
154unsafe impl<'a> Send for QueueUsize<'a> {}
155unsafe impl<'a> Sync for QueueUsize<'a> {}
156
157#[repr(C)]
158pub struct QueueU32<'a> {
159    _cache_pad_0: [u8; 64],
160    buffer: Unique<AtomicU32>,
161    // buffer_ptr : *const AtomicUsize,
162    capacity: u32,
163    // buffer_ptr : *const AtomicUsize,
164    buffer_capacity_mask: u32,
165    _cache_pad_1: [u8; 64],
166    head: IndexSpinlock,
167    _cache_pad_2: [u8; 64],
168    tail: IndexSpinlock,
169    _cache_pad_3: [u8; 64],
170    _lifetime: PhantomData<&'a AtomicUsize>,
171}
172
173pub const QUEUE_U32_NULL: u32 = 0xFFFFFFFF;
174impl<'a> QueueU32<'a> {
175    // const CAPACITY_MASK : u32 = CAPACITY as u32 - 1;
176
177    // #[cfg(any(test, feature = "std"))]
178    // pub fn new(capacity: usize) -> QueueU32 {
179    //     return QueueU32 {
180    //         head: IndexSpinlock::new(0),
181    //         tail: IndexSpinlock::new(0),
182    //         buffer: Unique::new(slice),
183    //         capacity: capacity as u32,
184    //         // buffer_ptr : slice.as_ptr() as *const AtomicUsize,
185    //         buffer_capacity_mask: capacity as u32 - 1,
186    //         _cache_pad_0: [0; 64],
187    //         _cache_pad_1: [0; 64],
188    //         _cache_pad_2: [0; 64],
189    //         _cache_pad_3: [0; 64],
190    //     };
191
192    // }
193    /// This method is a kludge to work around lack of stable const-generics, const unions, etc.  
194    /// It is up to the caller to ensure that the pointer passed in is truly static, and is not mutated externally.
195    /// Capacity must be a non-zero power of two.
196    pub const unsafe fn from_static(slice: &'a *mut AtomicU32, capacity: usize) -> QueueU32<'a> {
197        //pub const fn new(buffer_ptr : *const usize, capacity : usize)->Queue{
198
199        return QueueU32 {
200            head: IndexSpinlock::new(0),
201            tail: IndexSpinlock::new(0),
202            buffer: Unique::new(*slice),
203            capacity: capacity as u32,
204            // buffer_ptr : slice.as_ptr() as *const AtomicUsize,
205            buffer_capacity_mask: capacity as u32 - 1,
206            _cache_pad_0: [0; 64],
207            _cache_pad_1: [0; 64],
208            _cache_pad_2: [0; 64],
209            _cache_pad_3: [0; 64],
210            _lifetime: PhantomData,
211        };
212    }
213    pub fn clear(&self) {
214        let mut tail = self.tail.lock();
215        let mut head = self.head.lock();
216        for i in 0..self.capacity {
217            unsafe {
218                self.buffer
219                    .as_ptr()
220                    .offset(i as isize)
221                    .as_ref()
222                    .unwrap()
223                    .store(QUEUE_U32_NULL, Ordering::Relaxed);
224            }
225        }
226        tail.write(0);
227        head.write(0);
228    }
229
230    pub fn enqueue(&self, value: u32) -> bool {
231        debug_assert_ne!(value, QUEUE_U32_NULL);
232
233        let mut tail = self.tail.lock();
234        let tail_value = tail.read();
235
236        let storage = unsafe {
237            self.buffer
238                .as_ptr()
239                .offset(tail_value as isize)
240                .as_ref()
241                .unwrap()
242        }; //self.get_storage(tail_value as usize);
243        let stored_value = storage.load(Ordering::Relaxed);
244        if stored_value != QUEUE_U32_NULL {
245            return false;
246        }
247        storage.store(value, Ordering::Relaxed);
248        tail.write(tail_value.wrapping_add(1) & self.buffer_capacity_mask);
249        return true;
250    }
251
252    pub fn dequeue(&self) -> Option<u32> {
253        let mut head = self.head.lock();
254        let head_value = head.read();
255        let storage = unsafe {
256            self.buffer
257                .as_ptr()
258                .offset(head_value as isize)
259                .as_ref()
260                .unwrap()
261        }; //self.get_storage(head_value as usize);
262        let stored_value = storage.load(Ordering::Relaxed);
263        if stored_value == QUEUE_U32_NULL {
264            return None;
265        }
266        storage.store(QUEUE_U32_NULL, Ordering::Relaxed);
267        head.write(head_value.wrapping_add(1) & self.buffer_capacity_mask);
268
269        return Some(stored_value);
270    }
271}
272
273unsafe impl<'a> Send for QueueU32<'a> {}
274unsafe impl<'a> Sync for QueueU32<'a> {}
275
276#[cfg(test)]
277mod test;