1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
//! Examples of the claim mutual exclusion pattern, including an
//! example of the claim_queue, which allows multiple workers to enqueue work
//! and ensures that exactly one worker running if there is work
//! to be done.
//!
//! TODO: The example claim queue is strange, since it combines
//! a counter with the claim queue logic. This is a decent example
//! of composing semi-related algorithms with atomic_try_update,
//! but it is unclear whether the example is general-purpose enough
//! to be included here.
use std::ptr::null_mut;
use super::{atomic_try_update, bits::FlagU64, Atom, Node, NodeIterator};
/// A special purpose trait for WriteOrderingQueue
pub trait Countable {
fn get_count(&self) -> u64;
}
struct CountingClaimHead<T: Countable> {
next: *mut Node<T>,
/// Number of bytes inserted into this queue so far (according to Countable::get_count).
/// The flag is the claim bit. The invariant is that if the queue is non-empty, then
/// it is claimed by something (so the claim bit is set). Strictly speaking, we could
/// store the claim bit implicitly for this use case, but this is a common pattern, and
/// we leave it explicit so this data structure can be used as example code.
count_and_claim: FlagU64,
}
pub struct WriteOrderingQueue<T>
where
T: Send + Countable,
{
head: Atom<CountingClaimHead<T>, u128>,
}
impl<T> Default for WriteOrderingQueue<T>
where
T: Send + Countable,
{
fn default() -> WriteOrderingQueue<T> {
WriteOrderingQueue::<T> {
head: Atom::default(),
}
}
}
/// This is a multi-producer "claim" queue.
impl<T> WriteOrderingQueue<T>
where
T: Send + Countable,
{
/// This returns the offset of the write, and true iff we have the claim.
/// If we have the claim, we are responsible for calling consume_or_release_claim
/// until we manage to release it.
pub fn push(&self, val: T) -> (u64, bool) {
let sz = val.get_count();
#[allow(unused_mut)]
let mut node = Box::into_raw(Box::new(Node {
val,
next: std::ptr::null_mut(),
}));
unsafe {
atomic_try_update(&self.head, |head: &mut CountingClaimHead<T>| {
(*node).next = head.next;
head.next = node;
let old_count = head.count_and_claim.get_val();
let have_claim = !head.count_and_claim.get_flag();
// TODO: need to check for overflow without panic
head.count_and_claim.set_val(old_count + sz);
head.count_and_claim.set_flag(true); // either it was already set to true, or we need to set it to true!
(true, (old_count, have_claim))
})
// Can safely panic on overflow here.
}
}
/// This removes everything from the queue. If queue is already empty, it releases the claim and returns false
pub fn consume_or_release_claim(&self) -> (NodeIterator<T>, bool) {
let (node, had_claim, claimed) = unsafe {
atomic_try_update(&self.head, |head| {
let ret = head.next;
let had_claim = head.count_and_claim.get_flag();
head.next = null_mut();
if ret.is_null() {
head.count_and_claim.set_flag(false);
(true, (ret, had_claim, false)) // no longer have claim
} else {
(true, (ret, had_claim, true))
}
})
};
assert!(
had_claim,
"cannot call consume_or_release_claim unless you have the claim!"
);
(NodeIterator::new(node).rev(), claimed)
}
pub fn get_offset(&self) -> u64 {
unsafe { atomic_try_update(&self.head, |head| (false, head.count_and_claim.get_val())) }
}
}