yep_coc/
queue_meta.rs

1use std::sync::atomic::{AtomicU16, AtomicU64};
2
3/// shared data associated with the metadata portion of the circular queue. All of these types are references,
4/// as they should point to some shared memory region.
5#[derive(Copy, Clone, Debug)]
6pub struct YCQueueSharedMeta<'a> {
7    pub(crate) slot_count: &'a AtomicU16,
8    pub(crate) slot_size: &'a AtomicU16,
9    pub(crate) u64_meta: &'a AtomicU64,
10    /// bitmap representing who owns which slot in the queue. 0 -> producer, 1 -> consumer.
11    pub(crate) ownership: &'a [AtomicU64],
12}
13
14/// Producer and consumer counters live here. This is all in one atomic u64 to allow for atomic updates of all
15/// fields at once. This has the drawback that the fields occupy the same cache line. Howeer, for a produce +
16/// consume operation, both producer and consumer will need to synchronize on both producer and consumer counters,
17/// so splitting it into separate cache lines may not be beneficial. It's likely hw dependent, but I don't know
18/// without data first.
19/// TODO: revisit this decision once I have the benchmarks and data.
20#[derive(Copy, Clone, Debug)]
21pub(crate) struct YCQueueU64Meta {
22    /// where to currently produce into
23    pub(crate) produce_idx: u16,
24    /// where to consume from
25    pub(crate) consume_idx: u16,
26    // how many in-flight messages there are
27    pub(crate) in_flight: u16,
28}
29
30impl YCQueueU64Meta {
31    pub(crate) fn from_u64(value: u64) -> YCQueueU64Meta {
32        let produce_idx = value as u16;
33        let consume_idx = (value >> u16::BITS) as u16;
34        let in_flight = (value >> (2 * u16::BITS)) as u16;
35
36        YCQueueU64Meta {
37            produce_idx,
38            consume_idx,
39            in_flight,
40        }
41    }
42
43    pub(crate) fn to_u64(self) -> u64 {
44        let mut value: u64 = self.consume_idx as u64;
45        value <<= u16::BITS;
46        value |= self.in_flight as u64;
47        value <<= u16::BITS;
48        value |= self.consume_idx as u64;
49        value <<= u16::BITS;
50        value |= self.produce_idx as u64;
51
52        value
53    }
54}
55
56#[cfg(test)]
57mod tests {
58    use super::YCQueueU64Meta;
59
60    #[test]
61    fn test_u64_meta() {
62        let mut meta: YCQueueU64Meta = YCQueueU64Meta::from_u64(0);
63
64        assert_eq!(meta.produce_idx, 0);
65        assert_eq!(meta.consume_idx, 0);
66        assert_eq!(meta.in_flight, 0);
67        assert_eq!(meta.to_u64(), 0);
68
69        let test_produce_idx: u16 = 1;
70        let test_consume_idx: u16 = 2;
71        let test_in_flight: u16 = 3;
72
73        meta.produce_idx = test_produce_idx;
74        meta.consume_idx = test_consume_idx;
75        meta.in_flight = test_in_flight;
76
77        // create new meta as copy from u64
78        let new_meta = YCQueueU64Meta::from_u64(meta.to_u64());
79
80        // validate u64 rep is same
81        assert_eq!(meta.to_u64(), new_meta.to_u64());
82
83        // validate fields are also the same
84        assert_eq!(meta.produce_idx, new_meta.produce_idx);
85        assert_eq!(meta.consume_idx, new_meta.consume_idx);
86        assert_eq!(meta.in_flight, new_meta.in_flight);
87    }
88}