yep_coc/
queue_alloc_helpers.rs

1cfg_if::cfg_if! {
2    if #[cfg(feature = "futex")] {
3        use std::sync::atomic::AtomicI32;
4        use crate::YCFutexQueue;
5    }
6}
7
8use crate::{YCQueue, YCQueueError, YCQueueSharedMeta};
9use std::sync::atomic::{AtomicU16, AtomicU64};
10
11#[derive(Debug)]
12pub struct YCQueueOwnedMeta {
13    pub slot_count: AtomicU16,
14    pub slot_size: AtomicU16,
15    pub u64_meta: AtomicU64,
16    pub ownership: Vec<AtomicU64>,
17}
18
19impl YCQueueOwnedMeta {
20    pub fn new(slot_count_u16: u16, slot_size_u16: u16) -> YCQueueOwnedMeta {
21        let slot_count = AtomicU16::new(slot_count_u16);
22        let slot_size = AtomicU16::new(slot_size_u16);
23        let u64_meta = AtomicU64::new(0);
24        let mut ownership =
25            Vec::<AtomicU64>::with_capacity((slot_count_u16 as usize).div_ceil(u64::BITS as usize));
26
27        for _i in 0..ownership.capacity() {
28            ownership.push(AtomicU64::new(0));
29        }
30
31        YCQueueOwnedMeta {
32            slot_count,
33            slot_size,
34            u64_meta,
35            ownership,
36        }
37    }
38}
39
40impl<'a> YCQueueSharedMeta<'a> {
41    pub fn new(meta_ref: &'a YCQueueOwnedMeta) -> YCQueueSharedMeta<'a> {
42        YCQueueSharedMeta {
43            slot_count: &meta_ref.slot_count,
44            slot_size: &meta_ref.slot_size,
45            u64_meta: &meta_ref.u64_meta,
46            ownership: &meta_ref.ownership,
47        }
48    }
49
50    pub fn new_from_mut_ptr(ptr: *mut u8) -> Result<YCQueueSharedMeta<'a>, YCQueueError> {
51        if ptr.is_null() {
52            return Err(YCQueueError::InvalidArgs);
53        }
54
55        let slot_count_ptr = ptr as *mut AtomicU16;
56        let slot_size_ptr = unsafe { slot_count_ptr.add(1) };
57        let u64_meta_ptr = unsafe { slot_size_ptr.add(1) as *mut AtomicU64 };
58
59        let slot_count = unsafe { &*slot_count_ptr };
60        let slot_size = unsafe { &*slot_size_ptr };
61        let u64_meta = unsafe { &*u64_meta_ptr };
62
63        let slot_count_u16 = slot_count.load(std::sync::atomic::Ordering::Acquire);
64        let ownership_count = (slot_count_u16 as usize).div_ceil(u64::BITS as usize);
65        let ownership_ptr = unsafe { u64_meta_ptr.add(1) };
66
67        let ownership_slice =
68            unsafe { std::slice::from_raw_parts_mut(ownership_ptr, ownership_count) };
69
70        Ok(YCQueueSharedMeta {
71            slot_count,
72            slot_size,
73            u64_meta,
74            ownership: ownership_slice,
75        })
76    }
77}
78
79/// A way to hold a YCQueueSharedMeta to share between threads of a particular rust program.
80#[derive(Debug)]
81pub struct YCQueueOwnedData {
82    pub meta: YCQueueOwnedMeta,
83    pub data: Vec<u8>,
84    pub raw_ptr: *mut u8,
85}
86
87impl YCQueueOwnedData {
88    pub fn new(slot_count_u16: u16, slot_size_u16: u16) -> YCQueueOwnedData {
89        let meta = YCQueueOwnedMeta::new(slot_count_u16, slot_size_u16);
90        let mut data = vec![0_u8; (slot_count_u16 * slot_size_u16) as usize];
91        let raw_ptr = data.as_mut_ptr();
92
93        YCQueueOwnedData {
94            meta,
95            data,
96            raw_ptr,
97        }
98    }
99}
100
101#[derive(Debug)]
102pub struct YCQueueSharedData<'a> {
103    pub meta: YCQueueSharedMeta<'a>,
104    pub data: &'a mut [u8],
105}
106
107/**
108 * Create a YC queue view from non-owned data. Since This relies on referencing
109 * memory we do not own but need to mutably borrow (the queue slots) that is shared with
110 * other threads that will be also mutably borrowing the same slices (though not at the
111 * same time), we allow data access through `unsafe`. It's not clear to me if this *can*
112 * be safe, for inter-process comms you would need to map this memory with unsafe too
113 * since it could "change underneath you". (of course, gated by the atomics and proper
114 * memory barriers, since that is the point of this library.)
115 */
116impl<'a> YCQueueSharedData<'a> {
117    pub fn from_owned_data(owned: &'a YCQueueOwnedData) -> YCQueueSharedData<'a> {
118        let meta = YCQueueSharedMeta::new(&owned.meta);
119        let data = owned.raw_ptr;
120
121        YCQueueSharedData {
122            meta,
123            data: unsafe { std::slice::from_raw_parts_mut(data, owned.data.len()) },
124        }
125    }
126}
127
128impl<'a> YCQueue<'a> {
129    pub fn from_owned_data(owned: &'a YCQueueOwnedData) -> Result<YCQueue<'a>, YCQueueError> {
130        let shared_view = YCQueueSharedData::from_owned_data(owned);
131        YCQueue::new(shared_view.meta, shared_view.data)
132    }
133
134    pub fn from_shared_data(shared: YCQueueSharedData<'a>) -> Result<YCQueue<'a>, YCQueueError> {
135        YCQueue::new(shared.meta, shared.data)
136    }
137}
138
139#[cfg(feature = "futex")]
140#[derive(Debug)]
141pub struct YCFutexQueueOwnedData {
142    pub data: YCQueueOwnedData,
143    pub count: AtomicI32,
144}
145
146#[cfg(feature = "futex")]
147impl YCFutexQueueOwnedData {
148    pub fn new(slot_count_u16: u16, slot_size_u16: u16) -> YCFutexQueueOwnedData {
149        let data = YCQueueOwnedData::new(slot_count_u16, slot_size_u16);
150        let count = AtomicI32::new(0);
151
152        YCFutexQueueOwnedData { data, count }
153    }
154}
155
156#[cfg(feature = "futex")]
157#[derive(Debug)]
158pub struct YCFutexQueueSharedData<'a> {
159    pub data: YCQueueSharedData<'a>,
160    pub count: &'a AtomicI32,
161}
162
163#[cfg(feature = "futex")]
164impl<'a> YCFutexQueueSharedData<'a> {
165    pub fn from_owned_data(futex_queue: &'a YCFutexQueueOwnedData) -> YCFutexQueueSharedData<'a> {
166        let data = YCQueueSharedData::from_owned_data(&futex_queue.data);
167        let count = &futex_queue.count;
168
169        YCFutexQueueSharedData { data, count }
170    }
171}
172
173#[cfg(feature = "futex")]
174impl<'a> YCFutexQueue<'a> {
175    pub fn from_shared_data(
176        shared: YCFutexQueueSharedData<'a>,
177    ) -> Result<YCFutexQueue<'a>, YCQueueError> {
178        let queue = YCQueue::new(shared.data.meta, shared.data.data)?;
179
180        Ok(YCFutexQueue {
181            queue,
182            count: shared.count,
183        })
184    }
185
186    pub fn from_owned_data(
187        owned: &'a YCFutexQueueOwnedData,
188    ) -> Result<YCFutexQueue<'a>, YCQueueError> {
189        let queue = YCQueue::from_owned_data(&owned.data)?;
190
191        Ok(YCFutexQueue {
192            queue,
193            count: &owned.count,
194        })
195    }
196}
197
198#[cfg(test)]
199mod queue_alloc_helpers_tests {
200    use super::*;
201    use std::sync::atomic::Ordering;
202
203    #[test]
204    fn test_shared_meta() {
205        let slot_count: u16 = 128;
206        let slot_size: u16 = 64;
207        let owned_meta = YCQueueOwnedMeta::new(slot_count, slot_size);
208
209        let shared_meta: YCQueueSharedMeta<'_> = YCQueueSharedMeta::new(&owned_meta);
210
211        assert_eq!(
212            owned_meta.u64_meta.load(Ordering::Acquire),
213            shared_meta.u64_meta.load(Ordering::Acquire)
214        );
215
216        // validate initial memory
217        let num_atomics = owned_meta.ownership.len();
218        for i in 0..num_atomics {
219            assert_eq!(
220                owned_meta.ownership[i].load(Ordering::Acquire),
221                shared_meta.ownership[i].load(Ordering::Acquire)
222            );
223        }
224
225        // write to owned and see it reflect in shared
226        let new_value: u64 = 12345;
227        owned_meta.u64_meta.store(new_value, Ordering::Release);
228        assert_eq!(shared_meta.u64_meta.load(Ordering::Acquire), new_value);
229        for i in 0..num_atomics {
230            owned_meta.ownership[i].store(i as u64, Ordering::Release);
231        }
232        for i in 0..num_atomics {
233            assert_eq!(i as u64, shared_meta.ownership[i].load(Ordering::Acquire));
234        }
235
236        // write to shared and see it reflect in owned
237        let new_new_value = 54321;
238        shared_meta.u64_meta.store(new_new_value, Ordering::Release);
239        assert_eq!(owned_meta.u64_meta.load(Ordering::Acquire), new_new_value);
240        for i in 0..num_atomics {
241            shared_meta.ownership[i].store(new_new_value + (i as u64), Ordering::Release);
242        }
243        for i in 0..num_atomics {
244            assert_eq!(
245                new_new_value + (i as u64),
246                owned_meta.ownership[i].load(Ordering::Acquire)
247            );
248        }
249    }
250
251    #[test]
252    fn test_shared_data() {
253        let slot_count: u16 = 4;
254        let slot_size: u16 = 8;
255        let mut owned_data = YCQueueOwnedData::new(slot_count, slot_size);
256
257        for (idx, byte) in owned_data.data.iter_mut().enumerate() {
258            *byte = idx as u8;
259        }
260
261        let data_len = owned_data.data.len();
262
263        // mutate using transient shared data
264        {
265            let shared = YCQueueSharedData::from_owned_data(&owned_data);
266            let data = shared.data;
267            assert_eq!(data, owned_data.data.as_slice());
268
269            data[0] = 0xAA;
270            data[data_len - 1] = 0xBB;
271        }
272
273        // check is OK
274        assert_eq!(owned_data.data[0], 0xAA);
275        assert_eq!(owned_data.data[data_len - 1], 0xBB);
276
277        owned_data.data[1] = 0xCC;
278        let mid = data_len / 2;
279        owned_data.data[mid] = 0xDD;
280
281        // should show up in shared data
282        let shared = YCQueueSharedData::from_owned_data(&owned_data);
283        let data = shared.data;
284        assert_eq!(data[1], 0xCC);
285        assert_eq!(data[mid], 0xDD);
286    }
287
288    #[cfg(feature = "futex")]
289    #[test]
290    fn test_futex_shared_queue_and_counter() {
291        let slot_count: u16 = 4;
292        let slot_size: u16 = 16;
293
294        let mut owned = YCFutexQueueOwnedData::new(slot_count, slot_size);
295
296        // Seed underlying queue data through owned handle.
297        for (idx, byte) in owned.data.data.iter_mut().enumerate() {
298            *byte = idx as u8;
299        }
300
301        let data_len = owned.data.data.len();
302        {
303            let shared = YCFutexQueueSharedData::from_owned_data(&owned);
304            let data = shared.data.data;
305            assert_eq!(data, owned.data.data.as_slice());
306
307            // Mutate through the shared handle and ensure the owned buffer sees it.
308            data[0] = 0xAA;
309            data[data.len() - 1] = 0xBB;
310        }
311        assert_eq!(owned.data.data[0], 0xAA);
312        assert_eq!(owned.data.data[data_len - 1], 0xBB);
313
314        // Mutate through owned handle and verify the shared slice reflects changes.
315        owned.data.data[1] = 0xCC;
316        owned.data.data[data_len / 2] = 0xDD;
317        {
318            let shared_again = YCFutexQueueSharedData::from_owned_data(&owned);
319            let data_again = shared_again.data.data;
320            assert_eq!(data_again[1], 0xCC);
321            assert_eq!(data_again[data_again.len() / 2], 0xDD);
322        }
323
324        // Verify the atomic counter is shared.
325        owned.count.store(123, Ordering::Release);
326        {
327            let shared = YCFutexQueueSharedData::from_owned_data(&owned);
328            assert_eq!(shared.count.load(Ordering::Acquire), 123);
329
330            shared.count.store(77, Ordering::Release);
331        }
332        assert_eq!(owned.count.load(Ordering::Acquire), 77);
333    }
334}