1#![cfg_attr(
70 all(nightly, target_arch = "aarch64"),
71 feature(stdarch_aarch64_prefetch)
72)]
73use core::alloc::Layout;
74use core::cell::UnsafeCell;
75use core::marker::PhantomData;
76use core::mem::{MaybeUninit, size_of};
77use core::ptr;
78use core::sync::atomic::{AtomicUsize, Ordering};
79use std::alloc::{alloc, dealloc, handle_alloc_error};
80use std::sync::Arc;
81
82#[cfg_attr(
84 any(
85 target_arch = "x86_64",
86 target_arch = "aarch64",
87 target_arch = "arm64ec",
88 target_arch = "powerpc64",
89 ),
90 repr(C, align(128))
91)]
92#[cfg_attr(
93 any(
94 target_arch = "arm",
95 target_arch = "mips",
96 target_arch = "mips32r6",
97 target_arch = "mips64",
98 target_arch = "mips64r6",
99 target_arch = "sparc",
100 target_arch = "hexagon",
101 ),
102 repr(C, align(32))
103)]
104#[cfg_attr(
105 not(any(
106 target_arch = "x86_64",
107 target_arch = "aarch64",
108 target_arch = "arm64ec",
109 target_arch = "powerpc64",
110 target_arch = "arm",
111 target_arch = "mips",
112 target_arch = "mips32r6",
113 target_arch = "mips64",
114 target_arch = "mips64r6",
115 target_arch = "sparc",
116 target_arch = "hexagon",
117 )),
118 repr(C, align(64))
119)]
120struct CachePadded<T>(T);
121
122#[cfg(any(
123 target_arch = "x86_64",
124 target_arch = "aarch64",
125 target_arch = "arm64ec",
126 target_arch = "powerpc64",
127))]
128const CACHE_LINE_SIZE: usize = 128;
129
130#[cfg(any(
131 target_arch = "arm",
132 target_arch = "mips",
133 target_arch = "mips32r6",
134 target_arch = "mips64",
135 target_arch = "mips64r6",
136 target_arch = "sparc",
137 target_arch = "hexagon",
138))]
139const CACHE_LINE_SIZE: usize = 32;
140
141#[cfg(not(any(
142 target_arch = "x86_64",
143 target_arch = "aarch64",
144 target_arch = "arm64ec",
145 target_arch = "powerpc64",
146 target_arch = "arm",
147 target_arch = "mips",
148 target_arch = "mips32r6",
149 target_arch = "mips64",
150 target_arch = "mips64r6",
151 target_arch = "sparc",
152 target_arch = "hexagon",
153)))]
154const CACHE_LINE_SIZE: usize = 64;
155
156pub struct FastQueue<T> {
158 mask: CachePadded<usize>,
160
161 capacity: CachePadded<usize>,
163
164 buffer: CachePadded<*mut MaybeUninit<T>>,
166
167 head: CachePadded<AtomicUsize>,
169
170 tail: CachePadded<AtomicUsize>,
172
173 _pd: PhantomData<T>,
174}
175
176unsafe impl<T: Send> Send for FastQueue<T> {}
177unsafe impl<T: Send> Sync for FastQueue<T> {}
178
179impl<T> FastQueue<T> {
180 #[allow(clippy::new_ret_no_self)]
194 pub fn new(capacity: usize) -> (Producer<T>, Consumer<T>) {
195 let capacity = capacity.next_power_of_two().max(2);
196 let mask = capacity - 1;
197
198 let layout =
199 Layout::from_size_align(capacity * size_of::<MaybeUninit<T>>(), CACHE_LINE_SIZE)
200 .expect("layout");
201 let buffer = unsafe { alloc(layout) as *mut MaybeUninit<T> };
202
203 if buffer.is_null() {
204 handle_alloc_error(layout);
205 }
206
207 let queue = Arc::new(FastQueue {
208 mask: CachePadded(mask),
209 capacity: CachePadded(capacity),
210 buffer: CachePadded(buffer),
211 head: CachePadded(AtomicUsize::new(0)),
212 tail: CachePadded(AtomicUsize::new(0)),
213 _pd: PhantomData,
214 });
215
216 let producer = Producer {
217 queue: CachePadded(Arc::clone(&queue)),
218 head: CachePadded(UnsafeCell::new(0)),
219 cached_tail: CachePadded(UnsafeCell::new(0)),
220 _pd: PhantomData,
221 };
222
223 let consumer = Consumer {
224 queue: CachePadded(queue),
225 tail: CachePadded(UnsafeCell::new(0)),
226 cached_head: CachePadded(UnsafeCell::new(0)),
227 _pd: PhantomData,
228 };
229
230 (producer, consumer)
231 }
232}
233
234impl<T> Drop for FastQueue<T> {
235 fn drop(&mut self) {
237 let head = self.head.0.load(Ordering::Relaxed);
238 let mut tail = self.tail.0.load(Ordering::Relaxed);
239
240 while tail != head {
241 unsafe {
242 let index = tail & self.mask.0;
243 let slot = self.buffer.0.add(index);
244 ptr::drop_in_place((*slot).as_mut_ptr());
245 }
246 tail = tail.wrapping_add(1);
247 }
248
249 unsafe {
250 let layout = Layout::from_size_align(
251 self.capacity.0 * size_of::<MaybeUninit<T>>(),
252 CACHE_LINE_SIZE,
253 )
254 .expect("layout");
255 dealloc(self.buffer.0 as *mut u8, layout);
256 }
257 }
258}
259
260pub struct Producer<T> {
262 queue: CachePadded<Arc<FastQueue<T>>>,
263 head: CachePadded<UnsafeCell<usize>>,
264 cached_tail: CachePadded<UnsafeCell<usize>>,
265 _pd: PhantomData<T>,
266}
267
268unsafe impl<T: Send> Send for Producer<T> {}
269
270impl<T> Producer<T> {
272 #[inline(always)]
282 pub fn push(&mut self, value: T) -> Result<(), T> {
283 let head = unsafe { *self.head.0.get() };
284 let next_head = head.wrapping_add(1);
285
286 self.prefetch_write(next_head);
287
288 let cached_tail = unsafe { *self.cached_tail.0.get() };
289
290 if next_head.wrapping_sub(cached_tail) > self.queue.0.capacity.0 {
291 let tail = self.queue.0.tail.0.load(Ordering::Acquire);
293
294 if tail != cached_tail {
295 unsafe {
297 *self.cached_tail.0.get() = tail;
298 }
299 }
300
301 if next_head.wrapping_sub(tail) > self.queue.0.capacity.0 {
303 return Err(value);
304 }
305 }
306
307 unsafe {
308 let index = head & self.queue.0.mask.0;
309 let slot = self.queue.0.buffer.0.add(index);
310 (*slot).as_mut_ptr().write(value);
311 *self.head.0.get() = next_head;
312 }
313
314 self.queue.0.head.0.store(next_head, Ordering::Release);
315
316 Ok(())
317 }
318
319 #[inline(always)]
331 pub fn len(&self) -> usize {
332 let head = unsafe { *self.head.0.get() };
333 let tail = self.queue.0.tail.0.load(Ordering::Relaxed);
334 head.wrapping_sub(tail)
335 }
336
337 #[inline(always)]
349 pub fn is_empty(&self) -> bool {
350 self.len() == 0
351 }
352
353 #[inline(always)]
365 pub fn is_full(&self) -> bool {
366 self.len() >= self.queue.0.capacity.0
367 }
368
369 #[inline(always)]
370 fn prefetch_write(&self, index: usize) {
371 let slot_index = index & self.queue.0.mask.0;
372 let _slot = unsafe { self.queue.0.buffer.0.add(slot_index) };
373
374 #[cfg(all(target_arch = "x86_64", target_feature = "sse"))]
375 unsafe {
376 core::arch::x86_64::_mm_prefetch(_slot as *const i8, core::arch::x86_64::_MM_HINT_T0);
377 }
378
379 #[cfg(all(target_arch = "x86_64", target_feature = "prfchw"))]
380 unsafe {
381 core::arch::x86_64::_mm_prefetch(_slot as *const i8, core::arch::x86_64::_MM_HINT_ET0);
382 }
383
384 #[cfg(target_arch = "x86")]
385 unsafe {
386 core::arch::x86::_mm_prefetch(_slot as *const i8, core::arch::x86::_MM_HINT_ET0);
387 }
388
389 #[cfg(all(feature = "unstable", nightly, target_arch = "aarch64"))]
390 unsafe {
391 core::arch::aarch64::_prefetch::<
392 { core::arch::aarch64::_PREFETCH_WRITE },
393 { core::arch::aarch64::_PREFETCH_LOCALITY0 },
394 >(_slot as *const i8);
395 }
396 }
397}
398
399pub struct Consumer<T> {
401 queue: CachePadded<Arc<FastQueue<T>>>,
402 tail: CachePadded<UnsafeCell<usize>>,
403 cached_head: CachePadded<UnsafeCell<usize>>,
404 _pd: PhantomData<T>,
405}
406
407unsafe impl<T: Send> Send for Consumer<T> {}
408
409impl<T> Consumer<T> {
411 #[inline(always)]
421 pub fn pop(&mut self) -> Option<T> {
422 let tail = unsafe { *self.tail.0.get() };
423
424 self.prefetch_read(tail.wrapping_add(1));
425
426 let cached_head = unsafe { *self.cached_head.0.get() };
428
429 if tail == cached_head {
430 let head = self.queue.0.head.0.load(Ordering::Acquire);
432
433 if head != cached_head {
434 unsafe {
436 *self.cached_head.0.get() = head;
437 }
438 }
439
440 if tail == head {
442 return None;
443 }
444 }
445
446 let value = unsafe {
447 let index = tail & self.queue.0.mask.0;
448 let slot = self.queue.0.buffer.0.add(index);
449 (*slot).as_ptr().read()
450 };
451
452 let next_tail = tail.wrapping_add(1);
453 unsafe { *self.tail.0.get() = next_tail };
454 self.queue.0.tail.0.store(next_tail, Ordering::Release);
455
456 Some(value)
457 }
458
459 #[inline(always)]
469 pub fn peek(&self) -> Option<&T> {
470 let tail = unsafe { *self.tail.0.get() };
471 let head = self.queue.0.head.0.load(Ordering::Acquire);
472
473 if tail == head {
474 return None;
475 }
476
477 unsafe {
478 let index = tail & self.queue.0.mask.0;
479 let slot = self.queue.0.buffer.0.add(index);
480 Some(&*(*slot).as_ptr())
481 }
482 }
483
484 #[inline(always)]
496 pub fn len(&self) -> usize {
497 let head = self.queue.0.head.0.load(Ordering::Relaxed);
498 let tail = unsafe { *self.tail.0.get() };
499 head.wrapping_sub(tail)
500 }
501
502 #[inline(always)]
514 pub fn is_empty(&self) -> bool {
515 self.len() == 0
516 }
517
518 #[inline(always)]
519 fn prefetch_read(&self, index: usize) {
520 let slot_index = index & self.queue.0.mask.0;
521 let _slot = unsafe { self.queue.0.buffer.0.add(slot_index) };
522
523 #[cfg(all(target_arch = "x86_64", target_feature = "sse"))]
524 unsafe {
525 core::arch::x86_64::_mm_prefetch(_slot as *const i8, core::arch::x86_64::_MM_HINT_T0);
526 }
527
528 #[cfg(target_arch = "x86")]
529 unsafe {
530 core::arch::x86::_mm_prefetch(_slot as *const i8, core::arch::x86::_MM_HINT_T0);
531 }
532
533 #[cfg(all(feature = "unstable", nightly, target_arch = "aarch64"))]
534 unsafe {
535 core::arch::aarch64::_prefetch::<
536 { core::arch::aarch64::_PREFETCH_READ },
537 { core::arch::aarch64::_PREFETCH_LOCALITY0 },
538 >(_slot as *const i8);
539 }
540 }
541}
542
543impl<T> Iterator for Consumer<T> {
544 type Item = T;
545
546 #[inline(always)]
560 fn next(&mut self) -> Option<Self::Item> {
561 self.pop()
562 }
563
564 #[inline(always)]
566 fn size_hint(&self) -> (usize, Option<usize>) {
567 (self.len(), None)
569 }
570}
571
572#[cfg(test)]
573mod tests {
574 use super::*;
575 use std::sync::atomic::{AtomicBool, Ordering};
576 use std::thread;
577
578 #[test]
579 fn test_basic_push_pop() {
580 let (mut producer, mut consumer) = FastQueue::<usize>::new(2);
581
582 assert!(producer.push(42).is_ok());
583 assert_eq!(consumer.pop(), Some(42));
584 assert_eq!(consumer.pop(), None);
585 }
586
587 #[test]
588 fn test_capacity() {
589 let (mut producer, mut consumer) = FastQueue::<usize>::new(4);
590
591 assert!(producer.push(1).is_ok());
592 assert!(producer.push(2).is_ok());
593 assert!(producer.push(3).is_ok());
594 assert!(producer.push(4).is_ok());
595 assert!(producer.push(5).is_err()); assert_eq!(consumer.pop(), Some(1));
598 assert!(producer.push(5).is_ok()); assert_eq!(consumer.pop(), Some(2));
600 assert_eq!(consumer.pop(), Some(3));
601 assert_eq!(consumer.pop(), Some(4));
602 assert_eq!(consumer.pop(), Some(5));
603 }
604
605 #[test]
606 fn test_concurrent() {
607 const COUNT: usize = 1_000_000;
608 let (mut producer, mut consumer) = FastQueue::<usize>::new(1024);
609
610 let done = Arc::new(AtomicBool::new(false));
611 let done_clone = Arc::clone(&done);
612
613 let producer_thread = thread::spawn(move || {
615 for i in 0..COUNT {
616 while producer.push(i).is_err() {
617 std::hint::spin_loop();
618 }
619 }
620 done_clone.store(true, Ordering::Release);
621 });
622
623 let consumer_thread = thread::spawn(move || {
625 let mut count = 0;
626 while count < COUNT {
627 if let Some(val) = consumer.pop() {
628 assert_eq!(val, count);
629 count += 1;
630 } else if done.load(Ordering::Acquire) && consumer.is_empty() {
631 break;
632 } else {
633 std::hint::spin_loop();
634 }
635 }
636 assert_eq!(count, COUNT);
637 });
638
639 producer_thread.join().unwrap();
640 consumer_thread.join().unwrap();
641 }
642
643 #[test]
644 fn test_wraparound() {
645 let (mut producer, mut consumer) = FastQueue::<usize>::new(4);
646
647 for i in 0..4 {
649 assert!(producer.push(i).is_ok());
650 }
651
652 for i in 0..2 {
654 assert_eq!(consumer.pop(), Some(i));
655 }
656
657 for i in 4..6 {
659 assert!(producer.push(i).is_ok());
660 }
661
662 for i in 2..6 {
664 assert_eq!(consumer.pop(), Some(i));
665 }
666
667 assert!(consumer.pop().is_none());
668 }
669}