1use alloc::boxed::Box;
2use core::{marker::PhantomData, ptr::null_mut};
3
4use crossbeam_utils::CachePadded;
5
6use crate::{
7 Growable,
8 MPMCQueue,
9 core::{
10 AsPackedValue,
11 queue::QueueCore,
12 slots::{Auto, SlotType},
13 },
14 growable::NewSized,
15 owned::buffer::BoxedBuffer,
16 sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering},
17 utils::Backoff,
18};
19
20pub(crate) struct GrowableQueueCore<T, Q, S = Auto> {
22 cores: [AtomicPtr<Q>; 2],
23 push_epoch: CachePadded<AtomicUsize>,
24 pop_epoch: CachePadded<AtomicUsize>,
25 active_pushes: CachePadded<[AtomicUsize; 2]>,
26 active_reads: CachePadded<[AtomicUsize; 2]>,
27 is_resizing: AtomicBool,
28 _slot: PhantomData<(S, T)>,
29}
30
31impl<T, Q> GrowableQueueCore<T, Q, Auto>
32where
33 Q: NewSized,
34{
35 pub(crate) fn with_slot<S>(size: usize) -> GrowableQueueCore<T, Q, S> {
38 GrowableQueueCore {
39 cores: [
40 AtomicPtr::new(Box::into_raw(Box::new(Q::with_size(size)))),
41 AtomicPtr::new(Box::into_raw(Box::new(Q::with_size(1)))),
42 ],
43 active_pushes: [AtomicUsize::new(0), AtomicUsize::new(0)].into(),
44 active_reads: [AtomicUsize::new(0), AtomicUsize::new(0)].into(),
45 push_epoch: AtomicUsize::new(0).into(),
46 pop_epoch: AtomicUsize::new(0).into(),
47 is_resizing: AtomicBool::new(false),
48 _slot: PhantomData,
49 }
50 }
51}
52
53impl<T, Q, S> Drop for GrowableQueueCore<T, Q, S> {
54 fn drop(&mut self) {
55 let left = self.cores[0].swap(null_mut(), Ordering::Acquire);
56 _ = unsafe { Box::from_raw(left) };
60
61 let right = self.cores[1].swap(null_mut(), Ordering::Acquire);
62 _ = unsafe { Box::from_raw(right) };
66 }
67}
68
69impl<T, Q, S> Growable for GrowableQueueCore<T, Q, S>
70where
71 Q: NewSized + MPMCQueue<Item = T>,
72{
73 fn grow_by(&self, by: usize) -> bool {
74 let pop_epoch = self.pop_epoch.load(Ordering::Acquire);
75 let push_epoch = self.push_epoch.load(Ordering::Acquire);
76
77 if pop_epoch != push_epoch {
78 return false;
79 }
80
81 if self.active_reads[(push_epoch + 1) % 2].load(Ordering::Acquire) != 0 {
82 return false;
84 }
85
86 if self.is_resizing.swap(true, Ordering::AcqRel) {
87 return false;
88 }
89
90 if self.push_epoch.load(Ordering::Acquire) != push_epoch {
91 self.is_resizing.store(false, Ordering::Release);
93 return false;
94 }
95
96 let old_idx = (push_epoch + 1) % 2;
102 let mut backoff = Backoff::new();
103
104 while self.active_reads[old_idx].load(Ordering::Acquire) != 0 {
105 backoff.backoff();
106 }
107
108 debug_assert_eq!(
109 self.active_pushes[(push_epoch + 1) % 2].load(Ordering::SeqCst),
110 0
111 );
112
113 let new_queue = Box::into_raw(Box::new(Q::with_size(self.capacity() + by)));
114
115 let old_queue = self.cores[(push_epoch + 1) % 2].swap(new_queue, Ordering::AcqRel);
119 self.push_epoch.fetch_add(1, Ordering::Release);
120
121 let q = unsafe { Box::from_raw(old_queue) };
124
125 debug_assert!(q.pop().is_none());
126
127 self.is_resizing.store(false, Ordering::Release);
128 true
129 }
130}
131
132impl<T, Q, S> GrowableQueueCore<T, Q, S> {
133 fn get_queue(&self, epoch: usize) -> &Q {
134 let queue = self.cores[epoch % 2].load(Ordering::Acquire);
135 unsafe { &*queue }
139 }
140
141 fn register_reader(&self, target_epoch: usize) -> bool {
142 self.active_reads[target_epoch % 2].fetch_add(1, Ordering::Release);
143
144 let current_push = self.push_epoch.load(Ordering::SeqCst);
145 let current_pop = self.pop_epoch.load(Ordering::SeqCst);
146
147 if target_epoch != current_push && target_epoch != current_pop {
149 self.deregister_reader(target_epoch);
150 return false;
151 }
152 true
153 }
154
155 fn deregister_reader(&self, epoch: usize) {
156 self.active_reads[epoch % 2].fetch_sub(1, Ordering::Release);
157 }
158}
159
160impl<T, Q, S> MPMCQueue for GrowableQueueCore<T, Q, S>
161where
162 Q: MPMCQueue<Item = T>,
163{
164 type Item = T;
165
166 fn push(&self, item: Self::Item) -> Result<(), Self::Item> {
167 loop {
168 let push_epoch = self.push_epoch.load(Ordering::Acquire);
169 self.active_pushes[push_epoch % 2].fetch_add(1, Ordering::Release);
170
171 if self.push_epoch.load(Ordering::SeqCst) == push_epoch {
172 let r = self.get_queue(push_epoch).push(item);
173
174 self.active_pushes[push_epoch % 2].fetch_sub(1, Ordering::Release);
175 return r;
176 }
177 self.active_pushes[push_epoch % 2].fetch_sub(1, Ordering::Release);
178 }
179 }
180
181 fn pop(&self) -> Option<Self::Item> {
182 loop {
183 let pop_epoch = self.pop_epoch.load(Ordering::Acquire);
184 let push_epoch = self.push_epoch.load(Ordering::Acquire);
185
186 if pop_epoch != push_epoch {
187 if !self.register_reader(pop_epoch) {
190 continue;
191 }
192
193 let item = self.get_queue(pop_epoch).pop();
195
196 self.deregister_reader(pop_epoch);
197
198 if item.is_some() {
199 return item;
200 }
201
202 if self.active_pushes[pop_epoch % 2].load(Ordering::Acquire) == 0 {
203 if !self.register_reader(pop_epoch) {
204 continue;
205 }
206
207 let final_item = self.get_queue(pop_epoch).pop();
208
209 self.deregister_reader(pop_epoch);
210
211 if final_item.is_some() {
212 return final_item;
213 }
214
215 _ = self.pop_epoch.compare_exchange_weak(
216 pop_epoch,
217 pop_epoch + 1,
218 Ordering::AcqRel,
219 Ordering::Relaxed,
220 );
221 }
222
223 continue;
224 }
225
226 if !self.register_reader(push_epoch) {
227 continue;
228 }
229
230 let item = self.get_queue(push_epoch).pop();
231
232 self.deregister_reader(push_epoch);
233
234 return item;
235 }
236 }
237
238 fn capacity(&self) -> usize {
239 loop {
241 let push_epoch = self.push_epoch.load(Ordering::Acquire);
242 if !self.register_reader(push_epoch) {
243 continue;
244 }
245 let cap = self.get_queue(push_epoch).capacity();
246 self.deregister_reader(push_epoch);
247 return cap;
248 }
249 }
250
251 fn len(&self) -> usize {
252 loop {
254 let push_epoch = self.push_epoch.load(Ordering::Acquire);
255 if !self.register_reader(push_epoch) {
256 continue;
257 }
258
259 let pop_epoch = self.pop_epoch.load(Ordering::Acquire);
260 let pop_len = if pop_epoch != push_epoch {
261 if !self.register_reader(pop_epoch) {
262 self.deregister_reader(push_epoch);
263 continue;
264 }
265
266 let pop_len = self.get_queue(pop_epoch).len();
267 self.deregister_reader(pop_epoch);
268 pop_len
269 } else {
270 0
271 };
272
273 let len = self.get_queue(push_epoch).len() + pop_len;
274 self.deregister_reader(push_epoch);
275 return len;
276 }
277 }
278
279 fn is_empty(&self) -> bool {
280 loop {
282 let push_epoch = self.push_epoch.load(Ordering::Acquire);
283 if !self.register_reader(push_epoch) {
284 continue;
285 }
286
287 let pop_epoch = self.pop_epoch.load(Ordering::Acquire);
288 let pop_is_empty = if pop_epoch != push_epoch {
289 if !self.register_reader(pop_epoch) {
290 self.deregister_reader(push_epoch);
291 continue;
292 }
293
294 let pop_is_empty = self.get_queue(pop_epoch).is_empty();
295 self.deregister_reader(pop_epoch);
296 pop_is_empty
297 } else {
298 true
299 };
300
301 let is_empty = self.get_queue(push_epoch).is_empty() && pop_is_empty;
302 self.deregister_reader(push_epoch);
303 return is_empty;
304 }
305 }
306
307 fn is_full(&self) -> bool {
308 loop {
310 let push_epoch = self.push_epoch.load(Ordering::Acquire);
311 if !self.register_reader(push_epoch) {
312 continue;
313 }
314 let is_full = self.get_queue(push_epoch).is_full();
315 self.deregister_reader(push_epoch);
316
317 return is_full;
318 }
319 }
320}
321
322impl<T, Q, S> NewSized for GrowableQueueCore<T, Q, S>
323where
324 Q: NewSized,
325{
326 fn with_size(size: usize) -> GrowableQueueCore<T, Q, S> {
327 GrowableQueueCore::with_slot(size)
328 }
329}
330
331impl<S> NewSized for QueueCore<BoxedBuffer<S>>
332where
333 S: Default,
334{
335 fn with_size(size: usize) -> Self {
336 Self::new_in(BoxedBuffer::new(size))
337 }
338}
339
340pub struct DynamicQueue<T, S = Auto>
342where
343 S: SlotType<T>,
344 T: AsPackedValue,
345{
346 inner: GrowableQueueCore<T, QueueCore<BoxedBuffer<S::Slot>>, S>,
347}
348
349impl<T> DynamicQueue<T, Auto>
350where
351 T: AsPackedValue,
352{
353 pub fn new(size: usize) -> Self {
356 Self::with_slot::<Auto>(size)
357 }
358
359 pub fn with_slot<S>(size: usize) -> DynamicQueue<T, S>
362 where
363 S: SlotType<T>,
364 {
365 DynamicQueue {
366 inner: GrowableQueueCore::with_slot::<S>(size),
367 }
368 }
369}
370
371impl<T, S> MPMCQueue for DynamicQueue<T, S>
372where
373 T: AsPackedValue,
374 S: SlotType<T>,
375{
376 type Item = T;
377
378 fn push(&self, item: Self::Item) -> Result<(), Self::Item> {
379 self.inner.push(item)
380 }
381
382 fn pop(&self) -> Option<Self::Item> {
383 self.inner.pop()
384 }
385
386 fn len(&self) -> usize {
387 self.inner.len()
388 }
389
390 fn capacity(&self) -> usize {
391 self.inner.capacity()
392 }
393
394 fn is_empty(&self) -> bool {
395 self.inner.is_empty()
396 }
397
398 fn is_full(&self) -> bool {
399 self.inner.is_full()
400 }
401}
402
403impl<T, S> Growable for DynamicQueue<T, S>
404where
405 T: AsPackedValue,
406 S: SlotType<T>,
407{
408 fn grow_by(&self, by: usize) -> bool {
409 self.inner.grow_by(by)
410 }
411}