atomic_try_update/
claim.rs

1//! Examples of the claim mutual exclusion pattern, including an
2//! example of the claim_queue, which allows multiple workers to enqueue work
3//! and ensures that exactly one worker running if there is work
4//! to be done.
5//!
6//! TODO: The example claim queue is strange, since it combines
7//! a counter with the claim queue logic.  This is a decent example
8//! of composing semi-related algorithms with atomic_try_update,
9//! but it is unclear whether the example is general-purpose enough
10//! to be included here.
11
12use std::ptr::null_mut;
13
14use super::{atomic_try_update, bits::FlagU64, Atom, Node, NodeIterator};
15/// A special purpose trait for WriteOrderingQueue
16pub trait Countable {
17    fn get_count(&self) -> u64;
18}
19
20struct CountingClaimHead<T: Countable> {
21    next: *mut Node<T>,
22    /// Number of bytes inserted into this queue so far (according to Countable::get_count).
23    /// The flag is the claim bit. The invariant is that if the queue is non-empty, then
24    /// it is claimed by something (so the claim bit is set).  Strictly speaking, we could
25    /// store the claim bit implicitly for this use case, but this is a common pattern, and
26    /// we leave it explicit so this data structure can be used as example code.
27    count_and_claim: FlagU64,
28}
29
30pub struct WriteOrderingQueue<T>
31where
32    T: Send + Countable,
33{
34    head: Atom<CountingClaimHead<T>, u128>,
35}
36
37impl<T> Default for WriteOrderingQueue<T>
38where
39    T: Send + Countable,
40{
41    fn default() -> WriteOrderingQueue<T> {
42        WriteOrderingQueue::<T> {
43            head: Atom::default(),
44        }
45    }
46}
47
48/// This is a multi-producer "claim" queue.
49impl<T> WriteOrderingQueue<T>
50where
51    T: Send + Countable,
52{
53    /// This returns the offset of the write, and true iff we have the claim.
54    /// If we have the claim, we are responsible for calling consume_or_release_claim
55    /// until we manage to release it.
56    pub fn push(&self, val: T) -> (u64, bool) {
57        let sz = val.get_count();
58        #[allow(unused_mut)]
59        let mut node = Box::into_raw(Box::new(Node {
60            val,
61            next: std::ptr::null_mut(),
62        }));
63
64        unsafe {
65            atomic_try_update(&self.head, |head: &mut CountingClaimHead<T>| {
66                (*node).next = head.next;
67                head.next = node;
68                let old_count = head.count_and_claim.get_val();
69                let have_claim = !head.count_and_claim.get_flag();
70                // TODO: need to check for overflow without panic
71                head.count_and_claim.set_val(old_count + sz);
72                head.count_and_claim.set_flag(true); // either it was already set to true, or we need to set it to true!
73                (true, (old_count, have_claim))
74            })
75            // Can safely panic on overflow here.
76        }
77    }
78    /// This removes everything from the queue.  If queue is already empty, it releases the claim and returns false
79    pub fn consume_or_release_claim(&self) -> (NodeIterator<T>, bool) {
80        let (node, had_claim, claimed) = unsafe {
81            atomic_try_update(&self.head, |head| {
82                let ret = head.next;
83                let had_claim = head.count_and_claim.get_flag();
84                head.next = null_mut();
85                if ret.is_null() {
86                    head.count_and_claim.set_flag(false);
87                    (true, (ret, had_claim, false)) // no longer have claim
88                } else {
89                    (true, (ret, had_claim, true))
90                }
91            })
92        };
93        assert!(
94            had_claim,
95            "cannot call consume_or_release_claim unless you have the claim!"
96        );
97        (NodeIterator::new(node).rev(), claimed)
98    }
99
100    pub fn get_offset(&self) -> u64 {
101        unsafe { atomic_try_update(&self.head, |head| (false, head.count_and_claim.get_val())) }
102    }
103}