rustpool/
atomic_queue.rs

1//! Concurrent atomic object queue
2
3use crate::atomic_circular_buffer::AtomicCircularBuffer;
4use crate::buffer::RPBuffer;
5use crate::queue::RPQueue;
6use std::sync::Arc;
7
8use std::time::Duration;
9
10/// A thread safe, lock-free atomic queue. Objects are added to the queue via a closure and are owned by the queue as well
11/// This means objects in the queue will only be dropped when the queue's backing buffer is also destroyed.
12pub struct AtomicQueue<'a, T> {
13    buffer: AtomicCircularBuffer<'a, T>,
14}
15
16/// ```
17///
18/// use rustpool::AtomicQueue;
19/// use rustpool::RPQueue;
20
21/// fn main() {
22///     // create a blocking queue with 3 elements set to value 0
23///     let q = AtomicQueue::<isize>::new(3, || 0);
24///
25///     println!("simple blocking queue size {}", q.capacity());
26///
27///     // take the first element
28///     let v1 = q.take().unwrap();
29///
30///     println!("first element is {}", v1);
31///
32///     // remove the remaining items
33///     let v2 = q.take().unwrap();
34///     let v3 = q.take().unwrap();
35///
36///     // how many items available now?
37///     println!("items in the queue {}", q.available());
38///
39///     // no more items to return but the atomic queue will never block
40///     let empty = q.take_wait(std::time::Duration::from_secs(1));
41///
42///     assert_eq!(None, empty);
43///
44///     // put everything back
45///     q.offer(v1);
46///     q.offer(v2);
47///     q.offer(v3);
48///
49///     // how many items available now?
50///     println!("items in the queue {}", q.available());
51///
52///     let new_value: isize = 1;
53///
54///     // we can't add more items than the original capacity
55///     q.offer(&new_value);
56///
57///     // we still have the same number of available items
58///     println!("items in the queue {}", q.available());
59/// }
60/// ```
61
62impl<'a, T> AtomicQueue<'a, T> {
63    /// Create a new queue with fixed size `size` using custom create function
64    pub fn new(size: usize, f: impl FnMut() -> T) -> Self {
65        let buf = AtomicQueue::<T>::allocate(size, f);
66
67        return AtomicQueue { buffer: buf };
68    }
69
70    /// Create a new queue with fixed `size` wrapped as an atomic reference counter using a custom create function
71    pub fn new_arc(size: usize, f: impl FnMut() -> T) -> Arc<Self> {
72        let q = AtomicQueue::new(size, f);
73        return Arc::new(q);
74    }
75
76    fn allocate(size: usize, mut f: impl FnMut() -> T) -> AtomicCircularBuffer<'a, T> {
77        let mut buf: AtomicCircularBuffer<T> = AtomicCircularBuffer::new(size);
78        for _ in 0..size {
79            buf.add(f());
80        }
81        return buf;
82    }
83}
84
85impl<'a, T> RPQueue<'a, T> for AtomicQueue<'a, T> {
86    #[inline]
87    /// Returns the size of the queue in total number objects originally allocated
88    fn capacity(&self) -> usize {
89        return self.buffer.capacity();
90    }
91
92    #[inline]
93    /// Returns the number of available items in the buffer
94    fn available(&self) -> usize {
95        return self.buffer.available();
96    }
97
98    /// Returns an object from the queue or None if the pool is empty
99    #[inline]
100    fn take(&self) -> Option<&'a mut T> {
101        return self.buffer.take();
102    }
103
104    /// This method behaves the same as [`AtomicQueue::take`] as the atomic implementation will never block if the pool is empty
105    #[inline]
106    fn take_wait(&self, _timeout: Duration) -> Option<&'a mut T> {
107        return self.take();
108    }
109
110    /// Returns an object to the pool
111    #[inline]
112    fn offer(&self, item: &'a T) -> usize {
113        return self.buffer.offer(item);
114    }
115}
116
117unsafe impl<'a, T> Sync for AtomicQueue<'a, T> {}
118unsafe impl<'a, T> Send for AtomicQueue<'a, T> {}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123    use std::collections::HashMap;
124
125    #[test]
126    fn test_aq_take_offer() {
127        let mut count = 0;
128        let multiplier = 3;
129        let mut queue_items = HashMap::new();
130
131        let create_fn = || {
132            count = count + 1;
133            queue_items.insert(count, 0);
134            return count;
135        };
136
137        let queue = AtomicQueue::new_arc(11, create_fn);
138
139        for _ in 0..count * multiplier {
140            let item = queue.take().unwrap();
141
142            let current = queue_items[item];
143            queue_items.insert(*item, current + 1);
144            queue.offer(item);
145        }
146
147        for _ in 0..count {
148            let item = queue.take().unwrap();
149
150            let current = queue_items.remove(item);
151
152            assert_eq!(multiplier, current.unwrap());
153        }
154
155        assert_eq!(0, queue_items.len());
156    }
157}