1use std::sync::atomic::{AtomicUsize, Ordering};
5use std::cell::UnsafeCell;
6use std::mem::MaybeUninit;
7use std::hint;
8
9const CACHE_LINE: usize = 128;
10const PATIENCE: usize = 10;
11
12#[repr(align(128))]
13struct Slot<T> {
14 turn: AtomicUsize,
15 data: UnsafeCell<MaybeUninit<T>>,
16}
17
18unsafe impl<T: Send> Sync for Slot<T> {}
19
20#[repr(align(128))]
22pub struct FAAQueue<T> {
23 head: AtomicUsize,
24 _pad1: [u8; CACHE_LINE - 8],
25 tail: AtomicUsize,
26 _pad2: [u8; CACHE_LINE - 8],
27 buffer: Vec<Slot<T>>,
28 mask: usize,
29}
30
31impl<T> FAAQueue<T> {
32 pub fn new(capacity: usize) -> Self {
33 assert!(capacity.is_power_of_two());
34
35 let mut buffer = Vec::with_capacity(capacity);
36 for i in 0..capacity {
37 buffer.push(Slot {
38 turn: AtomicUsize::new(i),
39 data: UnsafeCell::new(MaybeUninit::uninit()),
40 });
41 }
42
43 FAAQueue {
44 head: AtomicUsize::new(0),
45 _pad1: [0; CACHE_LINE - 8],
46 tail: AtomicUsize::new(0),
47 _pad2: [0; CACHE_LINE - 8],
48 buffer,
49 mask: capacity - 1,
50 }
51 }
52
53 #[inline(always)]
54 pub fn enqueue(&self, item: T) -> bool {
55 let mut tail = self.tail.load(Ordering::Relaxed);
56
57 loop {
58 let slot = &self.buffer[tail & self.mask];
59 let turn = slot.turn.load(Ordering::Acquire);
60
61 if turn == tail {
62 match self.tail.compare_exchange_weak(
63 tail,
64 tail + 1,
65 Ordering::Relaxed,
66 Ordering::Relaxed
67 ) {
68 Ok(_) => {
69 unsafe {
70 (*slot.data.get()).write(item);
71 }
72 slot.turn.store(tail + 1, Ordering::Release);
73 return true;
74 }
75 Err(actual) => tail = actual,
76 }
77 } else if turn + self.buffer.len() == tail + 1 {
78 return false; } else {
80 tail = self.tail.load(Ordering::Relaxed);
81 }
82
83 hint::spin_loop();
84 }
85 }
86
87 #[inline(always)]
88 pub fn dequeue(&self) -> Option<T> {
89 let mut head = self.head.load(Ordering::Relaxed);
90
91 loop {
92 let slot = &self.buffer[head & self.mask];
93 let turn = slot.turn.load(Ordering::Acquire);
94
95 if turn == head + 1 {
96 match self.head.compare_exchange_weak(
97 head,
98 head + 1,
99 Ordering::Relaxed,
100 Ordering::Relaxed
101 ) {
102 Ok(_) => {
103 let data = unsafe {
104 (*slot.data.get()).assume_init_read()
105 };
106 slot.turn.store(head + self.buffer.len(), Ordering::Release);
107 return Some(data);
108 }
109 Err(actual) => head = actual,
110 }
111 } else if turn == head {
112 return None; } else {
114 head = self.head.load(Ordering::Relaxed);
115 }
116
117 hint::spin_loop();
118 }
119 }
120
121 #[inline(always)]
122 pub fn try_enqueue(&self, item: T) -> Result<(), T> {
123 let tail = self.tail.fetch_add(1, Ordering::Relaxed);
124 let slot = &self.buffer[tail & self.mask];
125
126 let mut spin = 0;
127 loop {
128 let turn = slot.turn.load(Ordering::Acquire);
129 if turn == tail {
130 unsafe {
131 (*slot.data.get()).write(item);
132 }
133 slot.turn.store(tail + 1, Ordering::Release);
134 return Ok(());
135 }
136
137 spin += 1;
138 if spin > PATIENCE {
139 return Err(item);
140 }
141 hint::spin_loop();
142 }
143 }
144
145 #[inline(always)]
146 pub fn try_dequeue(&self) -> Option<T> {
147 let head = self.head.fetch_add(1, Ordering::Relaxed);
148 let slot = &self.buffer[head & self.mask];
149
150 let mut spin = 0;
151 loop {
152 let turn = slot.turn.load(Ordering::Acquire);
153 if turn == head + 1 {
154 let data = unsafe {
155 (*slot.data.get()).assume_init_read()
156 };
157 slot.turn.store(head + self.buffer.len(), Ordering::Release);
158 return Some(data);
159 }
160
161 spin += 1;
162 if spin > PATIENCE {
163 return None;
164 }
165 hint::spin_loop();
166 }
167 }
168}
169
170unsafe impl<T: Send> Send for FAAQueue<T> {}
171unsafe impl<T: Send> Sync for FAAQueue<T> {}