ferrilog_core/
spsc_queue.rs1use core::mem::{MaybeUninit, size_of};
2use std::sync::atomic::{AtomicU32, Ordering};
3
4const MIB: usize = 1024 * 1024;
6
7#[cfg(feature = "buffer-64m")]
27pub const BUFFER_SIZE: usize = 64 * MIB;
28
29#[cfg(all(feature = "buffer-32m", not(feature = "buffer-64m")))]
31pub const BUFFER_SIZE: usize = 32 * MIB;
32
33#[cfg(all(feature = "buffer-16m", not(any(feature = "buffer-32m", feature = "buffer-64m"))))]
35pub const BUFFER_SIZE: usize = 16 * MIB;
36
37#[cfg(all(
39 feature = "buffer-8m",
40 not(any(feature = "buffer-16m", feature = "buffer-32m", feature = "buffer-64m"))
41))]
42pub const BUFFER_SIZE: usize = 8 * MIB;
43
44#[cfg(all(
46 feature = "buffer-4m",
47 not(any(
48 feature = "buffer-8m",
49 feature = "buffer-16m",
50 feature = "buffer-32m",
51 feature = "buffer-64m"
52 ))
53))]
54pub const BUFFER_SIZE: usize = 4 * MIB;
55
56#[cfg(all(
58 feature = "buffer-2m",
59 not(any(
60 feature = "buffer-4m",
61 feature = "buffer-8m",
62 feature = "buffer-16m",
63 feature = "buffer-32m",
64 feature = "buffer-64m"
65 ))
66))]
67pub const BUFFER_SIZE: usize = 2 * MIB;
68
69#[cfg(all(
71 feature = "buffer-1m",
72 not(any(
73 feature = "buffer-2m",
74 feature = "buffer-4m",
75 feature = "buffer-8m",
76 feature = "buffer-16m",
77 feature = "buffer-32m",
78 feature = "buffer-64m"
79 ))
80))]
81pub const BUFFER_SIZE: usize = MIB;
82
83#[cfg(not(any(
85 feature = "buffer-1m",
86 feature = "buffer-2m",
87 feature = "buffer-4m",
88 feature = "buffer-8m",
89 feature = "buffer-16m",
90 feature = "buffer-32m",
91 feature = "buffer-64m",
92)))]
93pub const BUFFER_SIZE: usize = 2 * MIB;
94
95const _: () = assert!(BUFFER_SIZE.is_power_of_two(), "BUFFER_SIZE must be a power of two");
96const _: () = assert!(BUFFER_SIZE >= MIB, "BUFFER_SIZE must be at least 1 MiB");
97const _: () = assert!(BUFFER_SIZE <= 64 * MIB, "BUFFER_SIZE must not exceed 64 MiB");
98
99pub const HEADER_SIZE: usize = core::mem::size_of::<MessageHeader>();
101
102pub const BLOCK_COUNT: u32 = (BUFFER_SIZE / HEADER_SIZE) as u32;
104
105pub const PAYLOAD_INFO_OFFSET: usize = 0;
107
108pub const PAYLOAD_TIMESTAMP_OFFSET: usize = PAYLOAD_INFO_OFFSET + size_of::<usize>();
110
111pub const PAYLOAD_ARGS_OFFSET: usize = PAYLOAD_TIMESTAMP_OFFSET + size_of::<i64>();
113
114pub const PAYLOAD_HEADER_SIZE: usize = PAYLOAD_ARGS_OFFSET;
116
117#[repr(C)]
130pub struct MessageHeader {
131 pub size: AtomicU32,
133 _padding: u32,
135}
136
137impl MessageHeader {
138 #[inline(always)]
141 pub fn push(&self, payload_size: u32) {
142 self.size.store(payload_size + HEADER_SIZE as u32, Ordering::Release);
143 }
144
145 #[inline(always)]
147 pub fn payload_pointer(&self) -> *mut u8 {
148 unsafe { (self as *const Self).add(1) as *mut u8 }
149 }
150}
151
152#[repr(C)]
159pub struct SpscVarQueue {
160 storage: [MaybeUninit<u64>; BLOCK_COUNT as usize],
162 pub(crate) write_index: u32,
164 pub(crate) free_write_count: u32,
166 _cache_line_padding: [u8; 120],
168 read_index: AtomicU32,
170}
171
172impl SpscVarQueue {
173 #[inline(always)]
175 fn header_ptr_at(&self, block_index: u32) -> *const MessageHeader {
176 debug_assert!(block_index < BLOCK_COUNT);
177 unsafe { self.storage.as_ptr().add(block_index as usize) as *const MessageHeader }
178 }
179
180 #[inline(always)]
182 fn header_mut_ptr_at(&mut self, block_index: u32) -> *mut MessageHeader {
183 debug_assert!(block_index < BLOCK_COUNT);
184 unsafe { self.storage.as_mut_ptr().add(block_index as usize) as *mut MessageHeader }
185 }
186
187 #[inline(always)]
192 pub unsafe fn producer_alloc(this: *mut Self, size: u32) -> Option<*mut MessageHeader> {
193 unsafe {
194 let queue = &mut *this;
195 let block_size = (size + HEADER_SIZE as u32 * 2 - 1) / HEADER_SIZE as u32;
196
197 if core::intrinsics::unlikely(block_size >= queue.free_write_count) {
200 let read_index_cache = queue.read_index.load(Ordering::Acquire);
201
202 if read_index_cache <= queue.write_index {
203 queue.free_write_count = BLOCK_COUNT - queue.write_index;
204 if block_size >= queue.free_write_count && read_index_cache != 0 {
205 (*queue.header_mut_ptr_at(0)).size.store(0, Ordering::Release);
207 (*queue.header_mut_ptr_at(queue.write_index))
208 .size
209 .store(1, Ordering::Release);
210 queue.write_index = 0;
211 queue.free_write_count = read_index_cache;
212 }
213 } else {
214 queue.free_write_count = read_index_cache - queue.write_index;
215 }
216
217 if queue.free_write_count <= block_size {
218 return None;
219 }
220 }
221
222 let result = queue.header_mut_ptr_at(queue.write_index);
223 queue.write_index += block_size;
224 queue.free_write_count -= block_size;
225 (*queue.header_mut_ptr_at(queue.write_index)).size.store(0, Ordering::Relaxed);
226 Some(result)
227 }
228 }
229
230 #[inline]
236 pub unsafe fn consumer_front(this: *const Self) -> Option<*const MessageHeader> {
237 unsafe {
238 let queue = &*this;
239 let index = queue.read_index.load(Ordering::Relaxed);
240 let size = (*queue.header_ptr_at(index)).size.load(Ordering::Acquire);
241
242 if size == 1 {
243 let size_at_zero = (*queue.header_ptr_at(0)).size.load(Ordering::Acquire);
244 if size_at_zero == 0 { None } else { Some(queue.header_ptr_at(0)) }
245 } else if size == 0 {
246 None
247 } else {
248 Some(queue.header_ptr_at(index))
249 }
250 }
251 }
252
253 #[inline]
258 pub unsafe fn consumer_pop(this: *mut Self) {
259 unsafe {
260 let queue = &mut *this;
261 let current_read_index = queue.read_index.load(Ordering::Relaxed);
262 let current_size =
263 (*queue.header_ptr_at(current_read_index)).size.load(Ordering::Relaxed);
264
265 if current_size == 1 {
266 let size_at_zero = (*queue.header_ptr_at(0)).size.load(Ordering::Relaxed);
267 let block_size = size_at_zero.div_ceil(HEADER_SIZE as u32);
268 queue.read_index.store(block_size, Ordering::Release);
269 } else {
270 let block_size = current_size.div_ceil(HEADER_SIZE as u32);
271 queue.read_index.store(current_read_index + block_size, Ordering::Release);
272 }
273 }
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280
281 fn alloc_queue_on_heap() -> *mut SpscVarQueue {
282 unsafe {
283 let layout = std::alloc::Layout::new::<SpscVarQueue>();
284 let pointer = std::alloc::alloc_zeroed(layout) as *mut SpscVarQueue;
285 if pointer.is_null() {
286 std::alloc::handle_alloc_error(layout);
287 }
288 (*pointer).free_write_count = BLOCK_COUNT;
289 pointer
290 }
291 }
292
293 unsafe fn free_queue(pointer: *mut SpscVarQueue) {
294 unsafe {
295 let layout = std::alloc::Layout::new::<SpscVarQueue>();
296 std::alloc::dealloc(pointer as *mut u8, layout);
297 }
298 }
299
300 #[test]
301 fn test_basic_alloc_push_front_pop() {
302 unsafe {
303 let queue = alloc_queue_on_heap();
304
305 let header = SpscVarQueue::producer_alloc(queue, 16).unwrap();
306 assert!(!header.is_null());
307
308 let payload = (*header).payload_pointer();
309 (payload as *mut u64).write_unaligned(0xDEADBEEF);
310 (*header).push(16);
311
312 let front = SpscVarQueue::consumer_front(queue).unwrap();
313 let front_payload = (*front).payload_pointer() as *const u64;
314 assert_eq!(front_payload.read_unaligned(), 0xDEADBEEF);
315
316 SpscVarQueue::consumer_pop(queue);
317 assert!(SpscVarQueue::consumer_front(queue).is_none());
318
319 free_queue(queue);
320 }
321 }
322
323 #[test]
324 fn test_multiple_messages() {
325 unsafe {
326 let queue = alloc_queue_on_heap();
327
328 for i in 0..4u32 {
329 let header = SpscVarQueue::producer_alloc(queue, 4).unwrap();
330 ((*header).payload_pointer() as *mut u32).write_unaligned(i * 10);
331 (*header).push(4);
332 }
333
334 for i in 0..4u32 {
335 let front = SpscVarQueue::consumer_front(queue).unwrap();
336 let value = ((*front).payload_pointer() as *const u32).read_unaligned();
337 assert_eq!(value, i * 10);
338 SpscVarQueue::consumer_pop(queue);
339 }
340
341 assert!(SpscVarQueue::consumer_front(queue).is_none());
342 free_queue(queue);
343 }
344 }
345
346 #[test]
347 fn test_queue_full_returns_none() {
348 unsafe {
349 let queue = alloc_queue_on_heap();
350
351 let big_size = BUFFER_SIZE as u32 / 2;
352 let result1 = SpscVarQueue::producer_alloc(queue, big_size);
353 assert!(result1.is_some());
354 let header = result1.unwrap();
355 (*header).push(big_size);
356
357 let result2 = SpscVarQueue::producer_alloc(queue, big_size);
359 assert!(result2.is_none());
360
361 free_queue(queue);
362 }
363 }
364}