Skip to main content

async_priority_lock/queue/
arena.rs

1//! Provides [ArenaQueue] - a [PriorityQueue] where entries are placed in an "arena."
2//!
3//! Instead of having many separate allocations like [BoxQueue], this places all entries in a
4//! single, sequential block of memory to leverage cpu cache more effectively.
5use super::*;
6use crate::priority::Priority;
7#[cfg(not(feature = "std"))]
8use alloc::alloc::{alloc, dealloc, realloc};
9use core::{alloc::Layout, cmp::Ordering, marker::PhantomData, mem::MaybeUninit};
10#[cfg(feature = "std")]
11use std::alloc::{alloc, dealloc, realloc};
12
13#[cfg(not(feature = "std"))]
14extern crate alloc;
15
16/// Helper trait used for [ArenaQueue] to support both single and dual link lists.
17///
18/// This isn't intended to be used externally.  Use [DualLinkArenaQueueNode] and
19/// [SingleLinkArenaQueueNode].
20#[doc(hidden)]
21pub trait ArenaQueueNode: Sized {
22    type Data: Priority;
23    fn next(&self) -> usize;
24    fn set_next(&mut self, val: usize);
25    fn prev(&self) -> usize;
26    fn set_prev(&mut self, val: usize);
27    fn data(&self) -> &Self::Data;
28    fn data_mut(&mut self) -> &mut MaybeUninit<Self::Data>;
29
30    const HAS_PREV: bool;
31}
32
33/// A single linked (next only) arena queue.
34///
35/// Short for [`ArenaQueue`]`<`[`SingleLinkArenaQueueNode<P>`]`>`
36pub type SingleLinkArenaQueue<P> = ArenaQueue<SingleLinkArenaQueueNode<P>>;
37/// A dual linked [ArenaQueue].
38///
39/// Short for [`ArenaQueue`]`<`[`DualLinkArenaQueueNode<P>`]`>`
40pub type DualLinkArenaQueue<P> = ArenaQueue<DualLinkArenaQueueNode<P>>;
41
42/// An arena queue - instead of having each node as a separate allocation, a block of memory is
43/// allocated for all of the nodes.
44///
45/// The block will never shrink, but will grow to accommodate more slots as needed.
46///
47/// Currently, the amount of slots allocated is always a power of two and is doubled when capacity is
48/// exceeded.
49///
50/// If this is an issue, [BoxQueue] can be used with the `box-queue` feature flag enabled.
51///
52/// It's likely easier to use this via the type aliases `SingleLinkArenaQueue` and
53/// `DualLinkArenaQueue`.
54pub struct ArenaQueue<N: ArenaQueueNode> {
55    /// A pointer to the block of nodes. This is null until entries are queued.
56    data: *mut N,
57    /// The count of slots allocated (always a power of 2)
58    size: usize,
59    /// The count of slots used
60    used: usize,
61    /// The last known "good"  / uninitialized slot.  This may not always be accurate (in which
62    /// case we iterate to find a free slot).
63    good: usize,
64    /// The offset of the head node from data.
65    head: usize,
66}
67
68#[cfg(feature = "const-default")]
69impl<N: ArenaQueueNode> const_default::ConstDefault for ArenaQueue<N> {
70    const DEFAULT: Self = Self {
71        data: std::ptr::null_mut(),
72        size: 0,
73        used: 0,
74        good: 0,
75        head: 0,
76    };
77}
78
79impl<D: ArenaQueueNode> core::fmt::Debug for ArenaQueue<D> {
80    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
81        f.debug_struct("SeqQueue")
82            .field("data", &self.data)
83            .field("size", &self.size)
84            .field("used", &self.used)
85            .field("good", &self.good)
86            .field("head", &self.head)
87            .finish()
88    }
89}
90
91unsafe impl<D: Send + ArenaQueueNode> Send for ArenaQueue<D> {}
92unsafe impl<D: Sync + ArenaQueueNode> Sync for ArenaQueue<D> {}
93
94impl<N: ArenaQueueNode> Default for ArenaQueue<N> {
95    fn default() -> Self {
96        Self {
97            data: Default::default(),
98            size: Default::default(),
99            used: Default::default(),
100            good: Default::default(),
101            head: 1,
102        }
103    }
104}
105
106/// The sentinel value used in the last node in the queue
107const SENTINEL: usize = 1;
108/// The value used in uninitialized nodes.
109const UNALLOCATED: usize = usize::MAX;
110
111// This is needed because in theory a usize could be 1 byte, and D could be a zero-width type,
112// making the size and alignment of this type to be 1.
113// (and thus an offset of 1 would be possible, which would conflict with our sentinel value)
114//
115// In practice this is probably never going to happen.
116#[repr(align(2))]
117
118/// A node for an [ArenaQueue] which links only to the next node.
119///
120/// This is likely a bit verbose for normal use; the [SingleLinkArenaQueue] type alias is
121/// recommended instead.
122pub struct SingleLinkArenaQueueNode<D: Priority> {
123    next: usize,
124    data: MaybeUninit<D>,
125}
126
127impl<D: Priority> ArenaQueueNode for SingleLinkArenaQueueNode<D> {
128    type Data = D;
129
130    #[inline(always)]
131    fn next(&self) -> usize {
132        self.next
133    }
134
135    #[inline(always)]
136    fn set_next(&mut self, val: usize) {
137        self.next = val
138    }
139
140    #[inline(always)]
141    fn prev(&self) -> usize {
142        unreachable!("this fn should never be compiled")
143    }
144
145    #[inline(always)]
146    fn set_prev(&mut self, _: usize) {}
147    //
148    #[inline(always)]
149    fn data(&self) -> &Self::Data {
150        unsafe { self.data.assume_init_ref() }
151    }
152
153    #[inline(always)]
154    fn data_mut(&mut self) -> &mut MaybeUninit<Self::Data> {
155        &mut self.data
156    }
157
158    const HAS_PREV: bool = false;
159}
160
161/// A node for an [ArenaQueue] which links to both the previous and next nodes.
162///
163/// This is likely a bit verbose for normal use; the [DualLinkArenaQueue] type alias is
164/// recommended instead.
165pub struct DualLinkArenaQueueNode<P: Priority> {
166    next: usize,
167    prev: usize,
168    data: MaybeUninit<P>,
169}
170
171impl<P: Priority> ArenaQueueNode for DualLinkArenaQueueNode<P> {
172    type Data = P;
173
174    #[inline(always)]
175    fn next(&self) -> usize {
176        self.next
177    }
178
179    #[inline(always)]
180    fn set_next(&mut self, val: usize) {
181        self.next = val;
182    }
183
184    #[inline(always)]
185    fn prev(&self) -> usize {
186        self.prev
187    }
188
189    #[inline(always)]
190    fn set_prev(&mut self, val: usize) {
191        self.prev = val
192    }
193
194    #[inline(always)]
195    fn data(&self) -> &Self::Data {
196        unsafe { self.data.assume_init_ref() }
197    }
198
199    #[inline(always)]
200    fn data_mut(&mut self) -> &mut MaybeUninit<Self::Data> {
201        &mut self.data
202    }
203
204    const HAS_PREV: bool = true;
205}
206
207/// An opaque handle to an [ArenaQueue] entry.
208pub struct ArenaQueueHandle<N: ArenaQueueNode> {
209    offset: usize,
210    data: PhantomData<N>,
211}
212
213impl<N: ArenaQueueNode> ArenaQueueHandle<N> {
214    const fn new(offset: usize) -> Self {
215        Self {
216            offset,
217            data: PhantomData,
218        }
219    }
220}
221
222#[repr(transparent)]
223/// An opaque "shared" handle to an [ArenaQueue] entry.
224pub struct SharedArenaQueueHandle<N: ArenaQueueNode>(ArenaQueueHandle<N>);
225
226impl<N: ArenaQueueNode> SharedArenaQueueHandle<N> {
227    #[inline(always)]
228    const fn maybe_new(offset: usize) -> Option<Self> {
229        if offset != SENTINEL {
230            Some(Self(ArenaQueueHandle::new(offset)))
231        } else {
232            None
233        }
234    }
235}
236
237impl<N: ArenaQueueNode> AsRef<ArenaQueueHandle<N>> for SharedArenaQueueHandle<N> {
238    #[inline(always)]
239    fn as_ref(&self) -> &ArenaQueueHandle<N> {
240        &self.0
241    }
242}
243
244impl<N: ArenaQueueNode> From<ArenaQueueHandle<N>> for SharedArenaQueueHandle<N> {
245    #[inline(always)]
246    fn from(value: ArenaQueueHandle<N>) -> Self {
247        Self(value)
248    }
249}
250
251impl<N: ArenaQueueNode> PriorityQueueHandle<N::Data> for ArenaQueueHandle<N> {
252    const LOAD_PURE: Option<unsafe fn(&Self) -> &N::Data> = None;
253}
254
255impl<N: ArenaQueueNode> ArenaQueue<N> {
256    #[inline]
257    fn nodes<'a>(&self) -> &'a mut [N] {
258        unsafe { core::slice::from_raw_parts_mut(self.data, self.size) }
259    }
260
261    #[inline]
262    fn node<'a>(&self, offset: usize) -> &'a mut N {
263        unsafe { &mut *self.data.byte_add(offset) }
264    }
265
266    #[inline]
267    fn offset_of(&self, node: *const N) -> usize {
268        node as usize - self.data as usize
269    }
270
271    #[inline]
272    fn iter_at(&self, start: usize) -> ArenaQueueIterator<'_, N> {
273        ArenaQueueIterator {
274            queue: self,
275            current: if start != SENTINEL {
276                Some(self.node(start))
277            } else {
278                None
279            },
280        }
281    }
282
283    #[inline]
284    fn expand(&mut self) {
285        let old_size = self.size;
286        self.data = if self.size == 0 {
287            self.size = 4;
288
289            unsafe {
290                // std::alloc::Global
291                alloc(Layout::array::<N>(self.size).unwrap())
292            }
293        } else {
294            let old_layout = Layout::array::<N>(self.size).unwrap();
295            self.size <<= 1;
296
297            unsafe { realloc(self.data.cast(), old_layout, old_layout.size() << 1) }
298        }
299        .cast::<N>();
300
301        for item in &mut self.nodes()[old_size..] {
302            item.set_next(UNALLOCATED);
303        }
304
305        self.good = old_size;
306    }
307
308    /// Returns a reference to the new node, plus:
309    /// (if found while searching for a location to place the node): a node before this one
310    /// (hopefully the latest one) and a bool which indicates that the returned previous node has
311    /// the same priority, thus we can place the new node immediately after it
312    /// This is only used via PriorityQueue.enqueue; it's a separate fn for readability
313    #[inline]
314    fn emplace<'a>(&mut self, data: N::Data) -> (&'a mut N, Option<&'a mut N>, bool) {
315        let nodes = self.nodes();
316
317        let mut lowest_prio_prev: Option<&mut N> = None;
318        let mut found_prev: bool = false;
319        // idx is alwayys a power of 2
320        let idx_mask = self.size - 1;
321        let mut idx_to_try = self.good;
322
323        loop {
324            let node = &mut self.nodes()[idx_to_try];
325            let node_data = node.data();
326
327            if node.next() == UNALLOCATED {
328                break;
329            }
330
331            // fast modulo (size is always a power of 2)
332            idx_to_try = (idx_to_try + 1) & idx_mask;
333
334            if found_prev {
335                continue;
336            }
337
338            match data.compare_new(node_data) {
339                Ordering::Greater => {}
340                Ordering::Equal => {
341                    lowest_prio_prev = Some(node);
342                    found_prev = true;
343                }
344                Ordering::Less => {
345                    if lowest_prio_prev
346                        .as_ref()
347                        .is_none_or(|b| node_data.compare(b.data()).is_gt())
348                    {
349                        lowest_prio_prev = Some(node);
350                    }
351                }
352            }
353        }
354
355        let new_node = &mut nodes[idx_to_try];
356        self.good = (idx_to_try + 1) & idx_mask;
357        self.used += 1;
358        new_node.data_mut().write(data);
359
360        (new_node, lowest_prio_prev, found_prev)
361    }
362
363    #[inline]
364    /// Assumes that the node definitely has a previous node (i.e. is not head)
365    fn prev_for<'a>(&self, node: &N, offset: usize) -> &'a mut N {
366        if N::HAS_PREV {
367            return self.node(node.prev());
368        }
369
370        // need to search for previous node
371        for x in self.nodes() {
372            if x.next() == offset {
373                return x;
374            }
375        }
376
377        unreachable!()
378    }
379
380    #[inline]
381    /// prev + next must be correlated with node.prev() & node.next()
382    fn reposition(
383        &mut self,
384        node: &mut N,
385        node_offset: usize,
386        prev: Option<&mut N>,
387        next: Option<&mut N>,
388    ) {
389        if let Some(pr) = prev {
390            pr.set_next(node.next());
391            if N::HAS_PREV {
392                if let Some(nxt) = next {
393                    nxt.set_prev(node.prev())
394                }
395            }
396
397            return self.insert(node, node_offset, None);
398        }
399
400        self.head = node.next();
401        if let Some(nxt) = next {
402            nxt.set_prev(SENTINEL);
403        }
404
405        // since this node was just in head and that was invalid, then we can start after head.
406        self.insert(node, node_offset, Some(self.node(self.head)));
407    }
408
409    #[inline]
410    fn insert(&mut self, new_node: &mut N, new_offset: usize, after: Option<&mut N>) {
411        let new_data = new_node.data();
412        // Now we need to find out where to start searching for a spot
413        let mut prev = match after {
414            // If, while finding a slot for our new node, we found an existing item with higher
415            // priority: we can start with that.
416            Some(x) => x,
417            None => {
418                // We didn't find an existing node with higher priority.
419                if self.head == SENTINEL {
420                    // The queue is empty (as we have no head node)
421                    // So we just need to place and set the head to the new node
422                    self.head = new_offset;
423                    new_node.set_next(SENTINEL);
424
425                    new_node.set_prev(SENTINEL);
426
427                    return;
428                }
429
430                let head = self.node(self.head);
431
432                // Check to see if we can place our new node before the current head (and if so, do
433                // it)
434                if new_data.compare_new(head.data()).is_ge() {
435                    new_node.set_next(self.head);
436                    self.head = new_offset;
437                    new_node.set_prev(SENTINEL);
438                    head.set_prev(new_offset);
439
440                    return;
441                }
442
443                head
444            }
445        };
446
447        // Iterate until we find a node we can place the new one after
448        while prev.next() != SENTINEL {
449            let next = self.node(prev.next());
450
451            if new_data.compare_new(next.data()).is_ge() {
452                // for single-ended nodes this is a no-op which should get optimized out
453                next.set_prev(new_offset);
454                break;
455            }
456
457            prev = next;
458        }
459
460        new_node.set_next(prev.next());
461        new_node.set_prev(self.offset_of(prev));
462        prev.set_next(new_offset);
463    }
464}
465
466struct ArenaQueueIterator<'a, N: ArenaQueueNode> {
467    queue: &'a ArenaQueue<N>,
468    current: Option<&'a N>,
469}
470
471impl<'a, N: ArenaQueueNode> Iterator for ArenaQueueIterator<'a, N> {
472    type Item = &'a N::Data;
473
474    #[inline]
475    fn next(&mut self) -> Option<Self::Item> {
476        let current = self.current.take()?;
477
478        if current.next() != 1 {
479            self.current = Some(self.queue.node(current.next()))
480        }
481
482        Some(current.data())
483    }
484}
485
486impl<N: ArenaQueueNode> Drop for ArenaQueue<N> {
487    fn drop(&mut self) {
488        if self.used != 0 {
489            #[cfg(debug_assertions)]
490            // should never happen in our code! but we can't just panic as this is technically a
491            // public struct + trait
492            panic!("dropped a queue with remaining nodes!!!! {:?}", self);
493
494            #[cfg(not(debug_assertions))]
495            for node in self.nodes() {
496                if node.next() != UNALLOCATED {
497                    unsafe {
498                        node.data_mut().assume_init_drop();
499                    }
500                }
501            }
502        }
503
504        if self.size != 0 {
505            unsafe { dealloc(self.data.cast(), Layout::array::<N>(self.size).unwrap()) };
506        };
507    }
508}
509
510unsafe impl<N: ArenaQueueNode> PriorityQueue<N::Data> for ArenaQueue<N> {
511    type Handle = ArenaQueueHandle<N>;
512    type SharedHandle = SharedArenaQueueHandle<N>;
513
514    #[inline]
515    fn enqueue(&mut self, data: N::Data) -> Self::Handle {
516        if self.size == self.used {
517            self.expand();
518        }
519
520        let (new_node, biggest_prev, found_prev) = self.emplace(data);
521        let new_offset = self.offset_of(new_node);
522        let handle = ArenaQueueHandle {
523            offset: new_offset,
524            data: PhantomData,
525        };
526
527        // While placing our node, we found a node we can place it directly after
528        if found_prev {
529            let prev = biggest_prev.unwrap();
530            let ex_next = prev.next();
531
532            new_node.set_next(ex_next);
533            prev.set_next(new_offset);
534            // for single-ended nodes this is a no-op which should get optimized out (including the
535            // lookup of ex_next)
536            if ex_next != SENTINEL {
537                self.node(ex_next).set_prev(new_offset);
538            }
539            new_node.set_prev(self.offset_of(prev));
540
541            return handle;
542        }
543
544        self.insert(new_node, new_offset, biggest_prev);
545
546        handle
547    }
548
549    #[inline]
550    fn dequeue(&mut self, handle: Self::Handle) -> (Option<&N::Data>, Option<Self::SharedHandle>) {
551        let old_node = self.node(handle.offset);
552        self.good = handle.offset / core::mem::size_of::<N>();
553
554        unsafe { old_node.data_mut().assume_init_drop() };
555        let old_next = old_node.next();
556
557        old_node.set_next(UNALLOCATED);
558        self.used -= 1;
559
560        if N::HAS_PREV && old_next != SENTINEL {
561            self.node(old_next).set_prev(old_node.prev());
562        }
563
564        // is the target node the head node? if so we don't need to search for previous (as it
565        // can't have one)
566        if handle.offset == self.head {
567            self.head = old_next;
568
569            return (None, SharedArenaQueueHandle::maybe_new(old_next));
570        };
571
572        // since we just confirmed that old_node wasn't the head node, there must be a previous
573        let prev = self.prev_for(&old_node, handle.offset);
574        prev.set_next(old_next);
575
576        return (
577            Some(prev.data()),
578            SharedArenaQueueHandle::maybe_new(old_next),
579        );
580    }
581
582    #[inline]
583    fn get_by_handle(&self, handle: &Self::Handle) -> &N::Data {
584        unsafe { &*self.data.byte_add(handle.offset) }.data()
585    }
586
587    #[inline(always)]
588    fn head_handle(&self) -> Option<Self::SharedHandle> {
589        (self.head != SENTINEL).then(|| ArenaQueueHandle::new(self.head).into())
590    }
591
592    #[inline]
593    fn iter_at<'a>(&'a self, after: Option<&Self::Handle>) -> impl Iterator<Item = &'a N::Data>
594    where
595        N::Data: 'a,
596    {
597        self.iter_at(after.map(|x| x.offset).unwrap_or_else(|| self.head))
598    }
599
600    fn update_node(&mut self, handle: &Self::Handle, update: impl FnOnce(&mut N::Data) -> bool) {
601        let node = self.node(handle.offset);
602
603        if !update(unsafe { node.data_mut().assume_init_mut() }) {
604            return;
605        }
606        let node_off = self.offset_of(node);
607
608        let data = node.data();
609        let next_off = node.next();
610
611        let mut maybe_prev =
612            (handle.offset != self.head).then(|| self.prev_for(node, handle.offset));
613
614        // check if the current position is valid after the current previous node (if not,
615        // reposition)
616        if let Some(prev) = maybe_prev.as_mut() {
617            if data.compare(prev.data()).is_gt() {
618                self.reposition(
619                    node,
620                    node_off,
621                    Some(prev),
622                    (next_off != SENTINEL).then(|| self.node(next_off)),
623                );
624
625                return;
626            }
627        }
628
629        // check if the current position is valid before the next (if not, reposition)
630        if next_off != SENTINEL {
631            let next = self.node(next_off);
632            if data.compare(next.data()).is_lt() {
633                self.reposition(node, node_off, maybe_prev, Some(next));
634            }
635        }
636    }
637
638    #[inline]
639    fn get_next_handle(&self, handle: &Self::Handle) -> Option<Self::SharedHandle> {
640        let next = self.node(handle.offset).next();
641
642        (next != SENTINEL).then(|| {
643            ArenaQueueHandle {
644                offset: next,
645                data: PhantomData,
646            }
647            .into()
648        })
649    }
650}