rustpool/atomic_queue.rs
1//! Concurrent atomic object queue
2
3use crate::atomic_circular_buffer::AtomicCircularBuffer;
4use crate::buffer::RPBuffer;
5use crate::queue::RPQueue;
6use std::sync::Arc;
7
8use std::time::Duration;
9
10/// A thread safe, lock-free atomic queue. Objects are added to the queue via a closure and are owned by the queue as well
11/// This means objects in the queue will only be dropped when the queue's backing buffer is also destroyed.
12pub struct AtomicQueue<'a, T> {
13 buffer: AtomicCircularBuffer<'a, T>,
14}
15
16/// ```
17///
18/// use rustpool::AtomicQueue;
19/// use rustpool::RPQueue;
20
21/// fn main() {
22/// // create a blocking queue with 3 elements set to value 0
23/// let q = AtomicQueue::<isize>::new(3, || 0);
24///
25/// println!("simple blocking queue size {}", q.capacity());
26///
27/// // take the first element
28/// let v1 = q.take().unwrap();
29///
30/// println!("first element is {}", v1);
31///
32/// // remove the remaining items
33/// let v2 = q.take().unwrap();
34/// let v3 = q.take().unwrap();
35///
36/// // how many items available now?
37/// println!("items in the queue {}", q.available());
38///
39/// // no more items to return but the atomic queue will never block
40/// let empty = q.take_wait(std::time::Duration::from_secs(1));
41///
42/// assert_eq!(None, empty);
43///
44/// // put everything back
45/// q.offer(v1);
46/// q.offer(v2);
47/// q.offer(v3);
48///
49/// // how many items available now?
50/// println!("items in the queue {}", q.available());
51///
52/// let new_value: isize = 1;
53///
54/// // we can't add more items than the original capacity
55/// q.offer(&new_value);
56///
57/// // we still have the same number of available items
58/// println!("items in the queue {}", q.available());
59/// }
60/// ```
61
62impl<'a, T> AtomicQueue<'a, T> {
63 /// Create a new queue with fixed size `size` using custom create function
64 pub fn new(size: usize, f: impl FnMut() -> T) -> Self {
65 let buf = AtomicQueue::<T>::allocate(size, f);
66
67 return AtomicQueue { buffer: buf };
68 }
69
70 /// Create a new queue with fixed `size` wrapped as an atomic reference counter using a custom create function
71 pub fn new_arc(size: usize, f: impl FnMut() -> T) -> Arc<Self> {
72 let q = AtomicQueue::new(size, f);
73 return Arc::new(q);
74 }
75
76 fn allocate(size: usize, mut f: impl FnMut() -> T) -> AtomicCircularBuffer<'a, T> {
77 let mut buf: AtomicCircularBuffer<T> = AtomicCircularBuffer::new(size);
78 for _ in 0..size {
79 buf.add(f());
80 }
81 return buf;
82 }
83}
84
85impl<'a, T> RPQueue<'a, T> for AtomicQueue<'a, T> {
86 #[inline]
87 /// Returns the size of the queue in total number objects originally allocated
88 fn capacity(&self) -> usize {
89 return self.buffer.capacity();
90 }
91
92 #[inline]
93 /// Returns the number of available items in the buffer
94 fn available(&self) -> usize {
95 return self.buffer.available();
96 }
97
98 /// Returns an object from the queue or None if the pool is empty
99 #[inline]
100 fn take(&self) -> Option<&'a mut T> {
101 return self.buffer.take();
102 }
103
104 /// This method behaves the same as [`AtomicQueue::take`] as the atomic implementation will never block if the pool is empty
105 #[inline]
106 fn take_wait(&self, _timeout: Duration) -> Option<&'a mut T> {
107 return self.take();
108 }
109
110 /// Returns an object to the pool
111 #[inline]
112 fn offer(&self, item: &'a T) -> usize {
113 return self.buffer.offer(item);
114 }
115}
116
117unsafe impl<'a, T> Sync for AtomicQueue<'a, T> {}
118unsafe impl<'a, T> Send for AtomicQueue<'a, T> {}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123 use std::collections::HashMap;
124
125 #[test]
126 fn test_aq_take_offer() {
127 let mut count = 0;
128 let multiplier = 3;
129 let mut queue_items = HashMap::new();
130
131 let create_fn = || {
132 count = count + 1;
133 queue_items.insert(count, 0);
134 return count;
135 };
136
137 let queue = AtomicQueue::new_arc(11, create_fn);
138
139 for _ in 0..count * multiplier {
140 let item = queue.take().unwrap();
141
142 let current = queue_items[item];
143 queue_items.insert(*item, current + 1);
144 queue.offer(item);
145 }
146
147 for _ in 0..count {
148 let item = queue.take().unwrap();
149
150 let current = queue_items.remove(item);
151
152 assert_eq!(multiplier, current.unwrap());
153 }
154
155 assert_eq!(0, queue_items.len());
156 }
157}