yep_coc/
queue_alloc_helpers.rs1cfg_if::cfg_if! {
2 if #[cfg(feature = "futex")] {
3 use std::sync::atomic::AtomicI32;
4 use crate::YCFutexQueue;
5 }
6}
7
8use crate::{YCQueue, YCQueueError, YCQueueSharedMeta};
9use std::sync::atomic::{AtomicU16, AtomicU64};
10
11#[derive(Debug)]
12pub struct YCQueueOwnedMeta {
13 pub slot_count: AtomicU16,
14 pub slot_size: AtomicU16,
15 pub u64_meta: AtomicU64,
16 pub ownership: Vec<AtomicU64>,
17}
18
19impl YCQueueOwnedMeta {
20 pub fn new(slot_count_u16: u16, slot_size_u16: u16) -> YCQueueOwnedMeta {
21 let slot_count = AtomicU16::new(slot_count_u16);
22 let slot_size = AtomicU16::new(slot_size_u16);
23 let u64_meta = AtomicU64::new(0);
24 let mut ownership =
25 Vec::<AtomicU64>::with_capacity((slot_count_u16 as usize).div_ceil(u64::BITS as usize));
26
27 for _i in 0..ownership.capacity() {
28 ownership.push(AtomicU64::new(0));
29 }
30
31 YCQueueOwnedMeta {
32 slot_count,
33 slot_size,
34 u64_meta,
35 ownership,
36 }
37 }
38}
39
40impl<'a> YCQueueSharedMeta<'a> {
41 pub fn new(meta_ref: &'a YCQueueOwnedMeta) -> YCQueueSharedMeta<'a> {
42 YCQueueSharedMeta {
43 slot_count: &meta_ref.slot_count,
44 slot_size: &meta_ref.slot_size,
45 u64_meta: &meta_ref.u64_meta,
46 ownership: &meta_ref.ownership,
47 }
48 }
49
50 pub fn new_from_mut_ptr(ptr: *mut u8) -> Result<YCQueueSharedMeta<'a>, YCQueueError> {
51 if ptr.is_null() {
52 return Err(YCQueueError::InvalidArgs);
53 }
54
55 let slot_count_ptr = ptr as *mut AtomicU16;
56 let slot_size_ptr = unsafe { slot_count_ptr.add(1) };
57 let u64_meta_ptr = unsafe { slot_size_ptr.add(1) as *mut AtomicU64 };
58
59 let slot_count = unsafe { &*slot_count_ptr };
60 let slot_size = unsafe { &*slot_size_ptr };
61 let u64_meta = unsafe { &*u64_meta_ptr };
62
63 let slot_count_u16 = slot_count.load(std::sync::atomic::Ordering::Acquire);
64 let ownership_count = (slot_count_u16 as usize).div_ceil(u64::BITS as usize);
65 let ownership_ptr = unsafe { u64_meta_ptr.add(1) };
66
67 let ownership_slice =
68 unsafe { std::slice::from_raw_parts_mut(ownership_ptr, ownership_count) };
69
70 Ok(YCQueueSharedMeta {
71 slot_count,
72 slot_size,
73 u64_meta,
74 ownership: ownership_slice,
75 })
76 }
77}
78
79#[derive(Debug)]
81pub struct YCQueueOwnedData {
82 pub meta: YCQueueOwnedMeta,
83 pub data: Vec<u8>,
84 pub raw_ptr: *mut u8,
85}
86
87impl YCQueueOwnedData {
88 pub fn new(slot_count_u16: u16, slot_size_u16: u16) -> YCQueueOwnedData {
89 let meta = YCQueueOwnedMeta::new(slot_count_u16, slot_size_u16);
90 let mut data = vec![0_u8; (slot_count_u16 * slot_size_u16) as usize];
91 let raw_ptr = data.as_mut_ptr();
92
93 YCQueueOwnedData {
94 meta,
95 data,
96 raw_ptr,
97 }
98 }
99}
100
101#[derive(Debug)]
102pub struct YCQueueSharedData<'a> {
103 pub meta: YCQueueSharedMeta<'a>,
104 pub data: &'a mut [u8],
105}
106
107impl<'a> YCQueueSharedData<'a> {
117 pub fn from_owned_data(owned: &'a YCQueueOwnedData) -> YCQueueSharedData<'a> {
118 let meta = YCQueueSharedMeta::new(&owned.meta);
119 let data = owned.raw_ptr;
120
121 YCQueueSharedData {
122 meta,
123 data: unsafe { std::slice::from_raw_parts_mut(data, owned.data.len()) },
124 }
125 }
126}
127
128impl<'a> YCQueue<'a> {
129 pub fn from_owned_data(owned: &'a YCQueueOwnedData) -> Result<YCQueue<'a>, YCQueueError> {
130 let shared_view = YCQueueSharedData::from_owned_data(owned);
131 YCQueue::new(shared_view.meta, shared_view.data)
132 }
133
134 pub fn from_shared_data(shared: YCQueueSharedData<'a>) -> Result<YCQueue<'a>, YCQueueError> {
135 YCQueue::new(shared.meta, shared.data)
136 }
137}
138
139#[cfg(feature = "futex")]
140#[derive(Debug)]
141pub struct YCFutexQueueOwnedData {
142 pub data: YCQueueOwnedData,
143 pub count: AtomicI32,
144}
145
146#[cfg(feature = "futex")]
147impl YCFutexQueueOwnedData {
148 pub fn new(slot_count_u16: u16, slot_size_u16: u16) -> YCFutexQueueOwnedData {
149 let data = YCQueueOwnedData::new(slot_count_u16, slot_size_u16);
150 let count = AtomicI32::new(0);
151
152 YCFutexQueueOwnedData { data, count }
153 }
154}
155
156#[cfg(feature = "futex")]
157#[derive(Debug)]
158pub struct YCFutexQueueSharedData<'a> {
159 pub data: YCQueueSharedData<'a>,
160 pub count: &'a AtomicI32,
161}
162
163#[cfg(feature = "futex")]
164impl<'a> YCFutexQueueSharedData<'a> {
165 pub fn from_owned_data(futex_queue: &'a YCFutexQueueOwnedData) -> YCFutexQueueSharedData<'a> {
166 let data = YCQueueSharedData::from_owned_data(&futex_queue.data);
167 let count = &futex_queue.count;
168
169 YCFutexQueueSharedData { data, count }
170 }
171}
172
173#[cfg(feature = "futex")]
174impl<'a> YCFutexQueue<'a> {
175 pub fn from_shared_data(
176 shared: YCFutexQueueSharedData<'a>,
177 ) -> Result<YCFutexQueue<'a>, YCQueueError> {
178 let queue = YCQueue::new(shared.data.meta, shared.data.data)?;
179
180 Ok(YCFutexQueue {
181 queue,
182 count: shared.count,
183 })
184 }
185
186 pub fn from_owned_data(
187 owned: &'a YCFutexQueueOwnedData,
188 ) -> Result<YCFutexQueue<'a>, YCQueueError> {
189 let queue = YCQueue::from_owned_data(&owned.data)?;
190
191 Ok(YCFutexQueue {
192 queue,
193 count: &owned.count,
194 })
195 }
196}
197
198#[cfg(test)]
199mod queue_alloc_helpers_tests {
200 use super::*;
201 use std::sync::atomic::Ordering;
202
203 #[test]
204 fn test_shared_meta() {
205 let slot_count: u16 = 128;
206 let slot_size: u16 = 64;
207 let owned_meta = YCQueueOwnedMeta::new(slot_count, slot_size);
208
209 let shared_meta: YCQueueSharedMeta<'_> = YCQueueSharedMeta::new(&owned_meta);
210
211 assert_eq!(
212 owned_meta.u64_meta.load(Ordering::Acquire),
213 shared_meta.u64_meta.load(Ordering::Acquire)
214 );
215
216 let num_atomics = owned_meta.ownership.len();
218 for i in 0..num_atomics {
219 assert_eq!(
220 owned_meta.ownership[i].load(Ordering::Acquire),
221 shared_meta.ownership[i].load(Ordering::Acquire)
222 );
223 }
224
225 let new_value: u64 = 12345;
227 owned_meta.u64_meta.store(new_value, Ordering::Release);
228 assert_eq!(shared_meta.u64_meta.load(Ordering::Acquire), new_value);
229 for i in 0..num_atomics {
230 owned_meta.ownership[i].store(i as u64, Ordering::Release);
231 }
232 for i in 0..num_atomics {
233 assert_eq!(i as u64, shared_meta.ownership[i].load(Ordering::Acquire));
234 }
235
236 let new_new_value = 54321;
238 shared_meta.u64_meta.store(new_new_value, Ordering::Release);
239 assert_eq!(owned_meta.u64_meta.load(Ordering::Acquire), new_new_value);
240 for i in 0..num_atomics {
241 shared_meta.ownership[i].store(new_new_value + (i as u64), Ordering::Release);
242 }
243 for i in 0..num_atomics {
244 assert_eq!(
245 new_new_value + (i as u64),
246 owned_meta.ownership[i].load(Ordering::Acquire)
247 );
248 }
249 }
250
251 #[test]
252 fn test_shared_data() {
253 let slot_count: u16 = 4;
254 let slot_size: u16 = 8;
255 let mut owned_data = YCQueueOwnedData::new(slot_count, slot_size);
256
257 for (idx, byte) in owned_data.data.iter_mut().enumerate() {
258 *byte = idx as u8;
259 }
260
261 let data_len = owned_data.data.len();
262
263 {
265 let shared = YCQueueSharedData::from_owned_data(&owned_data);
266 let data = shared.data;
267 assert_eq!(data, owned_data.data.as_slice());
268
269 data[0] = 0xAA;
270 data[data_len - 1] = 0xBB;
271 }
272
273 assert_eq!(owned_data.data[0], 0xAA);
275 assert_eq!(owned_data.data[data_len - 1], 0xBB);
276
277 owned_data.data[1] = 0xCC;
278 let mid = data_len / 2;
279 owned_data.data[mid] = 0xDD;
280
281 let shared = YCQueueSharedData::from_owned_data(&owned_data);
283 let data = shared.data;
284 assert_eq!(data[1], 0xCC);
285 assert_eq!(data[mid], 0xDD);
286 }
287
288 #[cfg(feature = "futex")]
289 #[test]
290 fn test_futex_shared_queue_and_counter() {
291 let slot_count: u16 = 4;
292 let slot_size: u16 = 16;
293
294 let mut owned = YCFutexQueueOwnedData::new(slot_count, slot_size);
295
296 for (idx, byte) in owned.data.data.iter_mut().enumerate() {
298 *byte = idx as u8;
299 }
300
301 let data_len = owned.data.data.len();
302 {
303 let shared = YCFutexQueueSharedData::from_owned_data(&owned);
304 let data = shared.data.data;
305 assert_eq!(data, owned.data.data.as_slice());
306
307 data[0] = 0xAA;
309 data[data.len() - 1] = 0xBB;
310 }
311 assert_eq!(owned.data.data[0], 0xAA);
312 assert_eq!(owned.data.data[data_len - 1], 0xBB);
313
314 owned.data.data[1] = 0xCC;
316 owned.data.data[data_len / 2] = 0xDD;
317 {
318 let shared_again = YCFutexQueueSharedData::from_owned_data(&owned);
319 let data_again = shared_again.data.data;
320 assert_eq!(data_again[1], 0xCC);
321 assert_eq!(data_again[data_again.len() / 2], 0xDD);
322 }
323
324 owned.count.store(123, Ordering::Release);
326 {
327 let shared = YCFutexQueueSharedData::from_owned_data(&owned);
328 assert_eq!(shared.count.load(Ordering::Acquire), 123);
329
330 shared.count.store(77, Ordering::Release);
331 }
332 assert_eq!(owned.count.load(Ordering::Acquire), 77);
333 }
334}