atomic_try_update/claim.rs
1//! Examples of the claim mutual exclusion pattern, including an
2//! example of the claim_queue, which allows multiple workers to enqueue work
3//! and ensures that exactly one worker running if there is work
4//! to be done.
5//!
6//! TODO: The example claim queue is strange, since it combines
7//! a counter with the claim queue logic. This is a decent example
8//! of composing semi-related algorithms with atomic_try_update,
9//! but it is unclear whether the example is general-purpose enough
10//! to be included here.
11
12use std::ptr::null_mut;
13
14use super::{atomic_try_update, bits::FlagU64, Atom, Node, NodeIterator};
15/// A special purpose trait for WriteOrderingQueue
16pub trait Countable {
17 fn get_count(&self) -> u64;
18}
19
20struct CountingClaimHead<T: Countable> {
21 next: *mut Node<T>,
22 /// Number of bytes inserted into this queue so far (according to Countable::get_count).
23 /// The flag is the claim bit. The invariant is that if the queue is non-empty, then
24 /// it is claimed by something (so the claim bit is set). Strictly speaking, we could
25 /// store the claim bit implicitly for this use case, but this is a common pattern, and
26 /// we leave it explicit so this data structure can be used as example code.
27 count_and_claim: FlagU64,
28}
29
30pub struct WriteOrderingQueue<T>
31where
32 T: Send + Countable,
33{
34 head: Atom<CountingClaimHead<T>, u128>,
35}
36
37impl<T> Default for WriteOrderingQueue<T>
38where
39 T: Send + Countable,
40{
41 fn default() -> WriteOrderingQueue<T> {
42 WriteOrderingQueue::<T> {
43 head: Atom::default(),
44 }
45 }
46}
47
48/// This is a multi-producer "claim" queue.
49impl<T> WriteOrderingQueue<T>
50where
51 T: Send + Countable,
52{
53 /// This returns the offset of the write, and true iff we have the claim.
54 /// If we have the claim, we are responsible for calling consume_or_release_claim
55 /// until we manage to release it.
56 pub fn push(&self, val: T) -> (u64, bool) {
57 let sz = val.get_count();
58 #[allow(unused_mut)]
59 let mut node = Box::into_raw(Box::new(Node {
60 val,
61 next: std::ptr::null_mut(),
62 }));
63
64 unsafe {
65 atomic_try_update(&self.head, |head: &mut CountingClaimHead<T>| {
66 (*node).next = head.next;
67 head.next = node;
68 let old_count = head.count_and_claim.get_val();
69 let have_claim = !head.count_and_claim.get_flag();
70 // TODO: need to check for overflow without panic
71 head.count_and_claim.set_val(old_count + sz);
72 head.count_and_claim.set_flag(true); // either it was already set to true, or we need to set it to true!
73 (true, (old_count, have_claim))
74 })
75 // Can safely panic on overflow here.
76 }
77 }
78 /// This removes everything from the queue. If queue is already empty, it releases the claim and returns false
79 pub fn consume_or_release_claim(&self) -> (NodeIterator<T>, bool) {
80 let (node, had_claim, claimed) = unsafe {
81 atomic_try_update(&self.head, |head| {
82 let ret = head.next;
83 let had_claim = head.count_and_claim.get_flag();
84 head.next = null_mut();
85 if ret.is_null() {
86 head.count_and_claim.set_flag(false);
87 (true, (ret, had_claim, false)) // no longer have claim
88 } else {
89 (true, (ret, had_claim, true))
90 }
91 })
92 };
93 assert!(
94 had_claim,
95 "cannot call consume_or_release_claim unless you have the claim!"
96 );
97 (NodeIterator::new(node).rev(), claimed)
98 }
99
100 pub fn get_offset(&self) -> u64 {
101 unsafe { atomic_try_update(&self.head, |head| (false, head.count_and_claim.get_val())) }
102 }
103}