debra_common/
bag.rs

1//! Data structures for storing and reclaiming retired records.
2
3#[cfg(not(feature = "std"))]
4use alloc::boxed::Box;
5
6use core::mem;
7
8use arrayvec::ArrayVec;
9use reclaim::{Reclaim, Retired};
10
11use crate::epoch::PossibleAge;
12use crate::EPOCH_CACHE_SIZE;
13
14////////////////////////////////////////////////////////////////////////////////////////////////////
15// BagPool
16////////////////////////////////////////////////////////////////////////////////////////////////////
17
18const BAG_POOL_SIZE: usize = 16;
19
20/// A pool for storing and recycling no longer used [`BagNode`]s of a thread.
21#[derive(Debug)]
22pub struct BagPool<R: Reclaim + 'static>(ArrayVec<[Box<BagNode<R>>; BAG_POOL_SIZE]>);
23
24impl<R: Reclaim + 'static> Default for BagPool<R> {
25    #[inline]
26    fn default() -> Self {
27        Self::new()
28    }
29}
30
31impl<R: Reclaim + 'static> BagPool<R> {
32    /// Creates a new (empty) [`BagPool`].
33    #[inline]
34    pub fn new() -> Self {
35        Self(ArrayVec::default())
36    }
37
38    /// Creates a new [`BagPool`] with the maximum number of pre-allocated bags.
39    #[inline]
40    pub fn with_bags() -> Self {
41        Self((0..BAG_POOL_SIZE).map(|_| BagNode::boxed()).collect())
42    }
43
44    /// Allocates a new [`BagNode`] from the pool or from the global allocator,
45    /// if the pools is currently empty.
46    #[inline]
47    fn allocate_bag(&mut self) -> Box<BagNode<R>> {
48        self.0.pop().unwrap_or_else(BagNode::boxed)
49    }
50
51    /// Recycles an empty [`BagNode`] back into the pool or deallocates it, if
52    /// the pool is currently full.
53    #[inline]
54    fn recycle_bag(&mut self, bag: Box<BagNode<R>>) {
55        debug_assert!(bag.is_empty());
56        if let Err(cap) = self.0.try_push(bag) {
57            mem::drop(cap.element());
58        }
59    }
60}
61
62////////////////////////////////////////////////////////////////////////////////////////////////////
63// BagQueue
64////////////////////////////////////////////////////////////////////////////////////////////////////
65
66const BAG_QUEUE_COUNT: usize = 3;
67
68/// A ring buffer with three [`BagQueue`]s
69///
70/// Each [`BagQueue`] caches the records that were retired in a specific epoch
71/// and these are continuously recycled, as the oldest records are reclaimed
72/// when the global epoch is advanced.
73#[derive(Debug)]
74pub struct EpochBagQueues<R: Reclaim + 'static> {
75    queues: [BagQueue<R>; BAG_QUEUE_COUNT],
76    curr_idx: usize,
77}
78
79impl<R: Reclaim + 'static> Default for EpochBagQueues<R> {
80    #[inline]
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86impl<R: Reclaim + 'static> EpochBagQueues<R> {
87    /// Creates a new set of [`EpochBagQueues`].
88    #[inline]
89    pub fn new() -> Self {
90        Self { queues: [BagQueue::new(), BagQueue::new(), BagQueue::new()], curr_idx: 0 }
91    }
92
93    /// Sorts the set of [`BagQueues`] from most recent to oldest and returns
94    /// the sorted array with three elements.
95    #[inline]
96    pub fn into_sorted(self) -> [BagQueue<R>; BAG_QUEUE_COUNT] {
97        let [a, b, c] = self.queues;
98        match self.curr_idx {
99            0 => [a, c, b],
100            1 => [b, a, c],
101            2 => [c, b, a],
102            _ => unreachable!(),
103        }
104    }
105
106    /// Retires the given `record` in the current [`BagQueue`].
107    #[inline]
108    pub fn retire_record(&mut self, record: Retired<R>, bag_pool: &mut BagPool<R>) {
109        self.retire_record_by_age(record, PossibleAge::SameEpoch, bag_pool);
110    }
111
112    /// Retires the given `record` in the appropriate [`BagQueue`] based on the
113    /// specified `age`.
114    #[inline]
115    pub fn retire_record_by_age(
116        &mut self,
117        record: Retired<R>,
118        age: PossibleAge,
119        bag_pool: &mut BagPool<R>,
120    ) {
121        let queue = match age {
122            PossibleAge::SameEpoch => &mut self.queues[self.curr_idx],
123            PossibleAge::OneEpoch => &mut self.queues[(self.curr_idx + 2) % BAG_QUEUE_COUNT],
124            PossibleAge::TwoEpochs => &mut self.queues[(self.curr_idx + 1) % BAG_QUEUE_COUNT],
125        };
126
127        queue.retire_record(record, bag_pool);
128    }
129
130    /// Retires the given `record` in the current [`BagQueue`] as the final
131    /// record of an exiting thread.
132    ///
133    /// # Safety
134    ///
135    /// After calling this method, no further calls to `retire_record`,
136    /// `retire_record_by_age` or `retire_final_record` must be made.
137    #[inline]
138    pub unsafe fn retire_final_record(&mut self, record: Retired<R>) {
139        let curr = &mut self.queues[self.curr_idx];
140        curr.head.retired_records.push_unchecked(record);
141    }
142
143    /// Advances the current epoch bag and reclaims all records in the oldest
144    /// bag.
145    ///
146    /// Full bags that are reclaimed during this process are emptied and sent
147    /// back into the `bag_pool`.
148    ///
149    /// # Safety
150    ///
151    /// It must ensured that two full epochs have actually passed since the
152    /// records in the oldest bag have been retired.
153    #[inline]
154    pub unsafe fn rotate_and_reclaim(&mut self, bag_pool: &mut BagPool<R>) {
155        self.curr_idx = (self.curr_idx + 1) % BAG_QUEUE_COUNT;
156        self.queues[self.curr_idx].reclaim_full_bags(bag_pool);
157    }
158}
159
160////////////////////////////////////////////////////////////////////////////////////////////////////
161// BagQueue
162////////////////////////////////////////////////////////////////////////////////////////////////////
163
164/// A LIFO queue of [`RetiredBag`]s.
165///
166/// All nodes except the first one are guaranteed to be full and the first one
167/// is guaranteed to always have enough space for at least one additional
168/// record.
169///
170/// # Note
171///
172/// A [`BagQueue`] must never be dropped when there are still un-reclaimed
173/// retired records in any of its [`BagNode`]s.
174#[derive(Debug)]
175pub struct BagQueue<R: Reclaim + 'static> {
176    head: Box<BagNode<R>>,
177}
178
179impl<R: Reclaim + 'static> BagQueue<R> {
180    /// Consumes `self`, returning the internal head [`BagNode`], it is
181    /// non-empty, otherwise returning [`None`] and dropping the [`BagQueue`].
182    #[inline]
183    pub fn into_non_empty(self) -> Option<Box<BagNode<R>>> {
184        if !self.is_empty() {
185            Some(self.head)
186        } else {
187            None
188        }
189    }
190
191    /// Creates a new [`BagQueue`].
192    #[inline]
193    fn new() -> Self {
194        Self { head: BagNode::boxed() }
195    }
196
197    /// Returns `true` if the head node is both empty and has no successor.
198    #[inline]
199    fn is_empty(&self) -> bool {
200        self.head.is_empty()
201    }
202
203    /// Retires the given `record`.
204    ///
205    /// If the operation requires a new bag to be allocated, it is attempted to
206    /// be taken from `bag_pool`.
207    #[inline]
208    fn retire_record(&mut self, record: Retired<R>, bag_pool: &mut BagPool<R>) {
209        // the head bag is guaranteed to never be full
210        unsafe { self.head.retired_records.push_unchecked(record) };
211        if self.head.retired_records.is_full() {
212            let mut old_head = bag_pool.allocate_bag();
213            mem::swap(&mut self.head, &mut old_head);
214            self.head.next = Some(old_head);
215        }
216    }
217
218    /// Reclaims all records in all **full** bags.
219    ///
220    /// # Safety
221    ///
222    /// It must be ensured that the contents of the queue are at least two
223    /// epochs old.
224    #[inline]
225    unsafe fn reclaim_full_bags(&mut self, bag_pool: &mut BagPool<R>) {
226        let mut node = self.head.next.take();
227        while let Some(mut bag) = node {
228            bag.reclaim_all();
229            node = bag.next.take();
230            bag_pool.recycle_bag(bag);
231        }
232    }
233}
234
235////////////////////////////////////////////////////////////////////////////////////////////////////
236// BagNode
237////////////////////////////////////////////////////////////////////////////////////////////////////
238
239/// A linked list node containing an inline vector for storing retired records.
240///
241/// # Panics
242///
243/// Dropping a non-empty [`BagNode`] leads to a panic in debug mode.
244#[derive(Debug)]
245pub struct BagNode<R: Reclaim + 'static> {
246    next: Option<Box<BagNode<R>>>,
247    retired_records: ArrayVec<[Retired<R>; EPOCH_CACHE_SIZE]>,
248}
249
250impl<R: Reclaim> BagNode<R> {
251    /// Reclaims all retired records in this [`BagNode`].
252    ///
253    /// # Safety
254    ///
255    /// It must be ensured that the contents of the queue are at least two
256    /// epochs old.
257    #[inline]
258    pub unsafe fn reclaim_all(&mut self) {
259        self.reclaim_inner();
260
261        let mut curr = self.next.take();
262        while let Some(mut node) = curr {
263            node.reclaim_inner();
264            curr = node.next.take();
265        }
266    }
267
268    /// Creates a new boxed [`BagNode`].
269    #[inline]
270    fn boxed() -> Box<Self> {
271        Box::new(Self { next: None, retired_records: ArrayVec::default() })
272    }
273
274    /// Returns `true` if the [`BagNode`] is empty.
275    #[inline]
276    fn is_empty(&self) -> bool {
277        self.next.is_none() && self.retired_records.len() == 0
278    }
279
280    #[inline]
281    unsafe fn reclaim_inner(&mut self) {
282        for mut record in self.retired_records.drain(..) {
283            record.reclaim();
284        }
285    }
286}
287
288impl<R: Reclaim + 'static> Drop for BagNode<R> {
289    #[inline]
290    fn drop(&mut self) {
291        debug_assert!(
292            self.is_empty(),
293            "`BagNode`s must not be dropped unless empty (would leak memory)"
294        );
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use std::ptr::NonNull;
301
302    use reclaim::leak::Leaking;
303
304    use super::{BAG_QUEUE_COUNT, EPOCH_CACHE_SIZE};
305    use crate::epoch::PossibleAge;
306
307    type EpochBagQueues = super::EpochBagQueues<Leaking>;
308    type BagPool = super::BagPool<Leaking>;
309    type BagQueue = super::BagQueue<Leaking>;
310    type Retired = reclaim::Retired<Leaking>;
311
312    ////////////////////////////////////////////////////////////////////////////////////////////////
313    // BagQueue tests
314    ////////////////////////////////////////////////////////////////////////////////////////////////
315
316    /// A factory for dummy retired records.
317    fn retired() -> Retired {
318        let ptr: NonNull<()> = NonNull::dangling();
319        unsafe { Retired::new_unchecked(ptr) }
320    }
321
322    #[test]
323    fn empty_bag_queue() {
324        let bag_queue = BagQueue::new();
325        assert!(bag_queue.is_empty());
326        assert!(bag_queue.into_non_empty().is_none());
327    }
328
329    #[test]
330    fn non_empty_bag_queue() {
331        let mut pool = BagPool::new();
332
333        let mut bag_queue = BagQueue::new();
334        for _ in 0..EPOCH_CACHE_SIZE - 1 {
335            bag_queue.retire_record(retired(), &mut pool);
336        }
337
338        // head is almost full and is the only node
339        assert!(!bag_queue.is_empty());
340        assert!(bag_queue.head.next.is_none());
341
342        bag_queue.retire_record(retired(), &mut pool);
343        // head node is empty but has a `next` node (the previous head)
344        assert_eq!(bag_queue.head.retired_records.len(), 0);
345        assert!(bag_queue.head.next.is_some());
346        assert!(!bag_queue.is_empty());
347
348        let mut node = bag_queue.into_non_empty().unwrap();
349        // bag queues must never be dropped when there are still retired records
350        // in them (would be a memory leak)
351        unsafe { node.reclaim_all() };
352    }
353
354    ////////////////////////////////////////////////////////////////////////////////////////////////
355    // EpochBagQueues tests
356    ////////////////////////////////////////////////////////////////////////////////////////////////
357
358    #[test]
359    fn rotate_and_reclaim() {
360        let mut pool = BagPool::new();
361
362        let mut bags = EpochBagQueues::new();
363        // insert one bag worth of records + one record (allocates a new node)
364        for _ in 0..=EPOCH_CACHE_SIZE {
365            bags.retire_record(retired(), &mut pool);
366        }
367
368        unsafe { bags.rotate_and_reclaim(&mut pool) };
369        unsafe { bags.rotate_and_reclaim(&mut pool) };
370        // upon completing the cycle, one full bag is reclaimed and returned to
371        // pool, one retired record remains in the old bag.
372        unsafe { bags.rotate_and_reclaim(&mut pool) };
373
374        assert_eq!(pool.0.len(), 1);
375        assert_eq!(bags.queues[0].head.retired_records.len(), 1);
376
377        unsafe { bags.queues[0].head.reclaim_all() };
378    }
379
380    #[test]
381    fn retire_by_age() {
382        let mut pool = BagPool::new();
383
384        // insert enough records so that the head node in all three epoch bags
385        // is one record away from being full, so no full bags are reclaimed on
386        // rotation
387        let mut bags = EpochBagQueues::new();
388        for _ in 0..BAG_QUEUE_COUNT {
389            for _ in 0..EPOCH_CACHE_SIZE - 1 {
390                bags.retire_record(retired(), &mut pool);
391                unsafe { bags.rotate_and_reclaim(&mut pool) };
392            }
393        }
394
395        // insert a record in the oldest bag at index 1
396        // this allocates a new bag from the global allocator
397        bags.retire_record_by_age(retired(), PossibleAge::TwoEpochs, &mut pool);
398        assert_eq!(bags.curr_idx, 0);
399        assert_eq!(bags.queues[1].head.retired_records.len(), 0);
400        assert!(bags.queues[1].head.next.is_some());
401
402        // rotates the epoch bags to index 1 and reclaims one full bag there,
403        // returning it to the pool
404        unsafe { bags.rotate_and_reclaim(&mut pool) };
405        assert_eq!(pool.0.len(), 1);
406
407        // insert a record in the previous bag at index 0
408        // this allocates a new bag from the pool
409        bags.retire_record_by_age(retired(), PossibleAge::OneEpoch, &mut pool);
410        assert_eq!(bags.curr_idx, 1);
411        assert_eq!(bags.queues[0].head.retired_records.len(), 0);
412        assert!(bags.queues[0].head.next.is_some());
413        assert_eq!(pool.0.len(), 0);
414
415        // rotates the epoch bags to index 2, no bag is reclaimed
416        unsafe { bags.rotate_and_reclaim(&mut pool) };
417
418        // insert a record in the current bag at index 2
419        // this allocates a new bag from the pool
420        bags.retire_record_by_age(retired(), PossibleAge::SameEpoch, &mut pool);
421        assert_eq!(bags.curr_idx, 2);
422        assert_eq!(pool.0.len(), 0);
423        assert_eq!(bags.queues[2].head.retired_records.len(), 0);
424        assert!(bags.queues[2].head.next.is_some());
425
426        // rotates the epoch bags to index 0 and reclaims one full bag there,
427        // returning it to the pool
428        unsafe { bags.rotate_and_reclaim(&mut pool) };
429        assert_eq!(bags.curr_idx, 0);
430        assert_eq!(pool.0.len(), 1);
431        unsafe { bags.rotate_and_reclaim(&mut pool) };
432        assert_eq!(bags.curr_idx, 1);
433        // rotates the epoch bags to index 2 and reclaims one full bag there,
434        //returning it to the pool
435        unsafe { bags.rotate_and_reclaim(&mut pool) };
436        assert_eq!(bags.curr_idx, 2);
437        assert_eq!(pool.0.len(), 2);
438
439        // all epoch queues are empty again
440    }
441}