Skip to main content

nblf_queue/growable/
queue.rs

1use alloc::boxed::Box;
2use core::{marker::PhantomData, ptr::null_mut};
3
4use crossbeam_utils::CachePadded;
5
6use crate::{
7    Growable,
8    MPMCQueue,
9    core::{
10        AsPackedValue,
11        queue::QueueCore,
12        slots::{Auto, SlotType},
13    },
14    growable::NewSized,
15    owned::buffer::BoxedBuffer,
16    sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering},
17    utils::Backoff,
18};
19
20/// A lock-free non blocking queue, that may dynamically grow.
21pub(crate) struct GrowableQueueCore<T, Q, S = Auto> {
22    cores: [AtomicPtr<Q>; 2],
23    push_epoch: CachePadded<AtomicUsize>,
24    pop_epoch: CachePadded<AtomicUsize>,
25    active_pushes: CachePadded<[AtomicUsize; 2]>,
26    active_reads: CachePadded<[AtomicUsize; 2]>,
27    is_resizing: AtomicBool,
28    _slot: PhantomData<(S, T)>,
29}
30
31impl<T, Q> GrowableQueueCore<T, Q, Auto>
32where
33    Q: NewSized,
34{
35    /// Constructs a new `Queue` with capacity `size` and slot type `S`.
36    /// `T` must fit into the slot type `S`
37    pub(crate) fn with_slot<S>(size: usize) -> GrowableQueueCore<T, Q, S> {
38        GrowableQueueCore {
39            cores: [
40                AtomicPtr::new(Box::into_raw(Box::new(Q::with_size(size)))),
41                AtomicPtr::new(Box::into_raw(Box::new(Q::with_size(1)))),
42            ],
43            active_pushes: [AtomicUsize::new(0), AtomicUsize::new(0)].into(),
44            active_reads: [AtomicUsize::new(0), AtomicUsize::new(0)].into(),
45            push_epoch: AtomicUsize::new(0).into(),
46            pop_epoch: AtomicUsize::new(0).into(),
47            is_resizing: AtomicBool::new(false),
48            _slot: PhantomData,
49        }
50    }
51}
52
53impl<T, Q, S> Drop for GrowableQueueCore<T, Q, S> {
54    fn drop(&mut self) {
55        let left = self.cores[0].swap(null_mut(), Ordering::Acquire);
56        // Safety:
57        // No concurrent drops of this ds can happen.
58        // This queue was allocated in `new` or in `grow_by` with `Box::into_raw` and was not deallocated since then.
59        _ = unsafe { Box::from_raw(left) };
60
61        let right = self.cores[1].swap(null_mut(), Ordering::Acquire);
62        // Safety:
63        // No concurrent drops of this ds can happen.
64        // This queue was allocated in `new` or in `grow_by` with `Box::into_raw` and was not deallocated since then.
65        _ = unsafe { Box::from_raw(right) };
66    }
67}
68
69impl<T, Q, S> Growable for GrowableQueueCore<T, Q, S>
70where
71    Q: NewSized + MPMCQueue<Item = T>,
72{
73    fn grow_by(&self, by: usize) -> bool {
74        let pop_epoch = self.pop_epoch.load(Ordering::Acquire);
75        let push_epoch = self.push_epoch.load(Ordering::Acquire);
76
77        if pop_epoch != push_epoch {
78            return false;
79        }
80
81        if self.active_reads[(push_epoch + 1) % 2].load(Ordering::Acquire) != 0 {
82            // could happen if some thread started reading before pop_epoch got updated
83            return false;
84        }
85
86        if self.is_resizing.swap(true, Ordering::AcqRel) {
87            return false;
88        }
89
90        if self.push_epoch.load(Ordering::Acquire) != push_epoch {
91            // could happen if an entire resize happens between load and this check
92            self.is_resizing.store(false, Ordering::Release);
93            return false;
94        }
95
96        // at this poitn we know that
97        // a) no concurrent resize is happening
98        // b) since pop_epoch == push_epoch the old queue is empty.
99        // c) since pop_epoch == push_epoch AND active_reads == 0, we know that active_reads is STILL 0, becasue noone will acces the stale queue
100
101        let old_idx = (push_epoch + 1) % 2;
102        let mut backoff = Backoff::new();
103
104        while self.active_reads[old_idx].load(Ordering::Acquire) != 0 {
105            backoff.backoff();
106        }
107
108        debug_assert_eq!(
109            self.active_pushes[(push_epoch + 1) % 2].load(Ordering::SeqCst),
110            0
111        );
112
113        let new_queue = Box::into_raw(Box::new(Q::with_size(self.capacity() + by)));
114
115        // Safety:
116        // since pop_epoch == push_epoch all concurrent threads acces the queue at push_epoch % 2.
117        // pop ensures that no pushes are in flight to the old queue anymore and that it is empty. We can safely drop it.
118        let old_queue = self.cores[(push_epoch + 1) % 2].swap(new_queue, Ordering::AcqRel);
119        self.push_epoch.fetch_add(1, Ordering::Release);
120
121        // Safety:
122        // old_queue was ocnstucted from a Bos::into_raw and is dropped only once, as ensured by epoch guards
123        let q = unsafe { Box::from_raw(old_queue) };
124
125        debug_assert!(q.pop().is_none());
126
127        self.is_resizing.store(false, Ordering::Release);
128        true
129    }
130}
131
132impl<T, Q, S> GrowableQueueCore<T, Q, S> {
133    fn get_queue(&self, epoch: usize) -> &Q {
134        let queue = self.cores[epoch % 2].load(Ordering::Acquire);
135        // Safety:
136        // It is guranteed by `grow_by` that no concurrent mutable access can happen to any queue in cores.
137        // It is safe to access it concurrently via shared ref, as long as queue core is Sync.
138        unsafe { &*queue }
139    }
140
141    fn register_reader(&self, target_epoch: usize) -> bool {
142        self.active_reads[target_epoch % 2].fetch_add(1, Ordering::Release);
143
144        let current_push = self.push_epoch.load(Ordering::SeqCst);
145        let current_pop = self.pop_epoch.load(Ordering::SeqCst);
146
147        // It is safe to read if the target epoch is still structurally active
148        if target_epoch != current_push && target_epoch != current_pop {
149            self.deregister_reader(target_epoch);
150            return false;
151        }
152        true
153    }
154
155    fn deregister_reader(&self, epoch: usize) {
156        self.active_reads[epoch % 2].fetch_sub(1, Ordering::Release);
157    }
158}
159
160impl<T, Q, S> MPMCQueue for GrowableQueueCore<T, Q, S>
161where
162    Q: MPMCQueue<Item = T>,
163{
164    type Item = T;
165
166    fn push(&self, item: Self::Item) -> Result<(), Self::Item> {
167        loop {
168            let push_epoch = self.push_epoch.load(Ordering::Acquire);
169            self.active_pushes[push_epoch % 2].fetch_add(1, Ordering::Release);
170
171            if self.push_epoch.load(Ordering::SeqCst) == push_epoch {
172                let r = self.get_queue(push_epoch).push(item);
173
174                self.active_pushes[push_epoch % 2].fetch_sub(1, Ordering::Release);
175                return r;
176            }
177            self.active_pushes[push_epoch % 2].fetch_sub(1, Ordering::Release);
178        }
179    }
180
181    fn pop(&self) -> Option<Self::Item> {
182        loop {
183            let pop_epoch = self.pop_epoch.load(Ordering::Acquire);
184            let push_epoch = self.push_epoch.load(Ordering::Acquire);
185
186            if pop_epoch != push_epoch {
187                // drain old buffer
188
189                if !self.register_reader(pop_epoch) {
190                    continue;
191                }
192
193                // it is safe to call get_queue on pop_epoch here, since no resize can happen while we have not updated pop_epoch and reads on this epoch are happening
194                let item = self.get_queue(pop_epoch).pop();
195
196                self.deregister_reader(pop_epoch);
197
198                if item.is_some() {
199                    return item;
200                }
201
202                if self.active_pushes[pop_epoch % 2].load(Ordering::Acquire) == 0 {
203                    if !self.register_reader(pop_epoch) {
204                        continue;
205                    }
206
207                    let final_item = self.get_queue(pop_epoch).pop();
208
209                    self.deregister_reader(pop_epoch);
210
211                    if final_item.is_some() {
212                        return final_item;
213                    }
214
215                    _ = self.pop_epoch.compare_exchange_weak(
216                        pop_epoch,
217                        pop_epoch + 1,
218                        Ordering::AcqRel,
219                        Ordering::Relaxed,
220                    );
221                }
222
223                continue;
224            }
225
226            if !self.register_reader(push_epoch) {
227                continue;
228            }
229
230            let item = self.get_queue(push_epoch).pop();
231
232            self.deregister_reader(push_epoch);
233
234            return item;
235        }
236    }
237
238    fn capacity(&self) -> usize {
239        // the capacity of the currently active queue, i.e. the number of elements that can be pushed directly after resize
240        loop {
241            let push_epoch = self.push_epoch.load(Ordering::Acquire);
242            if !self.register_reader(push_epoch) {
243                continue;
244            }
245            let cap = self.get_queue(push_epoch).capacity();
246            self.deregister_reader(push_epoch);
247            return cap;
248        }
249    }
250
251    fn len(&self) -> usize {
252        // the total elements in the queue. Note that len can be > capacity.
253        loop {
254            let push_epoch = self.push_epoch.load(Ordering::Acquire);
255            if !self.register_reader(push_epoch) {
256                continue;
257            }
258
259            let pop_epoch = self.pop_epoch.load(Ordering::Acquire);
260            let pop_len = if pop_epoch != push_epoch {
261                if !self.register_reader(pop_epoch) {
262                    self.deregister_reader(push_epoch);
263                    continue;
264                }
265
266                let pop_len = self.get_queue(pop_epoch).len();
267                self.deregister_reader(pop_epoch);
268                pop_len
269            } else {
270                0
271            };
272
273            let len = self.get_queue(push_epoch).len() + pop_len;
274            self.deregister_reader(push_epoch);
275            return len;
276        }
277    }
278
279    fn is_empty(&self) -> bool {
280        // the queue is empty if pop() returns None
281        loop {
282            let push_epoch = self.push_epoch.load(Ordering::Acquire);
283            if !self.register_reader(push_epoch) {
284                continue;
285            }
286
287            let pop_epoch = self.pop_epoch.load(Ordering::Acquire);
288            let pop_is_empty = if pop_epoch != push_epoch {
289                if !self.register_reader(pop_epoch) {
290                    self.deregister_reader(push_epoch);
291                    continue;
292                }
293
294                let pop_is_empty = self.get_queue(pop_epoch).is_empty();
295                self.deregister_reader(pop_epoch);
296                pop_is_empty
297            } else {
298                true
299            };
300
301            let is_empty = self.get_queue(push_epoch).is_empty() && pop_is_empty;
302            self.deregister_reader(push_epoch);
303            return is_empty;
304        }
305    }
306
307    fn is_full(&self) -> bool {
308        // the queue is full if push() fails
309        loop {
310            let push_epoch = self.push_epoch.load(Ordering::Acquire);
311            if !self.register_reader(push_epoch) {
312                continue;
313            }
314            let is_full = self.get_queue(push_epoch).is_full();
315            self.deregister_reader(push_epoch);
316
317            return is_full;
318        }
319    }
320}
321
322impl<T, Q, S> NewSized for GrowableQueueCore<T, Q, S>
323where
324    Q: NewSized,
325{
326    fn with_size(size: usize) -> GrowableQueueCore<T, Q, S> {
327        GrowableQueueCore::with_slot(size)
328    }
329}
330
331impl<S> NewSized for QueueCore<BoxedBuffer<S>>
332where
333    S: Default,
334{
335    fn with_size(size: usize) -> Self {
336        Self::new_in(BoxedBuffer::new(size))
337    }
338}
339
340/// A lock-free, non-blocking queue, that may dynamically grow its capacity.
341pub struct DynamicQueue<T, S = Auto>
342where
343    S: SlotType<T>,
344    T: AsPackedValue,
345{
346    inner: GrowableQueueCore<T, QueueCore<BoxedBuffer<S::Slot>>, S>,
347}
348
349impl<T> DynamicQueue<T, Auto>
350where
351    T: AsPackedValue,
352{
353    /// Constructs a new `DynamicQueue` with capacity `size` and slot type `Auto`.
354    /// `T` must fit into the chosen slot type
355    pub fn new(size: usize) -> Self {
356        Self::with_slot::<Auto>(size)
357    }
358
359    /// Constructs a new `DynamicQueue` with capacity `size` and slot type `S`.
360    /// `T` must fit into the slot type `S`
361    pub fn with_slot<S>(size: usize) -> DynamicQueue<T, S>
362    where
363        S: SlotType<T>,
364    {
365        DynamicQueue {
366            inner: GrowableQueueCore::with_slot::<S>(size),
367        }
368    }
369}
370
371impl<T, S> MPMCQueue for DynamicQueue<T, S>
372where
373    T: AsPackedValue,
374    S: SlotType<T>,
375{
376    type Item = T;
377
378    fn push(&self, item: Self::Item) -> Result<(), Self::Item> {
379        self.inner.push(item)
380    }
381
382    fn pop(&self) -> Option<Self::Item> {
383        self.inner.pop()
384    }
385
386    fn len(&self) -> usize {
387        self.inner.len()
388    }
389
390    fn capacity(&self) -> usize {
391        self.inner.capacity()
392    }
393
394    fn is_empty(&self) -> bool {
395        self.inner.is_empty()
396    }
397
398    fn is_full(&self) -> bool {
399        self.inner.is_full()
400    }
401}
402
403impl<T, S> Growable for DynamicQueue<T, S>
404where
405    T: AsPackedValue,
406    S: SlotType<T>,
407{
408    fn grow_by(&self, by: usize) -> bool {
409        self.inner.grow_by(by)
410    }
411}