rocketmq_rust/blocking_queue.rs
1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16
17use tokio::sync::Mutex;
18use tokio::sync::Notify;
19use tokio::time;
20
21/// A thread-safe bounded blocking queue. To replace Java `LinkedBlockingQueue`.
22///
23/// This queue allows multiple producers and consumers to add and remove items
24/// concurrently. It uses a `tokio::sync::Mutex` to ensure mutual exclusion and
25/// a `tokio::sync::Notify` to notify waiting tasks.
26pub struct BlockingQueue<T> {
27 /// The underlying queue storing the items.
28 queue: Mutex<VecDeque<T>>,
29 /// The maximum capacity of the queue.
30 capacity: usize,
31 /// A notification mechanism to wake up waiting tasks.
32 notify: Notify,
33}
34
35impl<T> BlockingQueue<T> {
36 /// Creates a new `BlockingQueue` with the specified capacity.
37 ///
38 /// # Arguments
39 ///
40 /// * `capacity` - The maximum number of items the queue can hold.
41 ///
42 /// # Returns
43 ///
44 /// A new instance of `BlockingQueue`.
45 pub fn new(capacity: usize) -> Self {
46 BlockingQueue {
47 queue: Mutex::new(VecDeque::with_capacity(capacity)),
48 capacity,
49 notify: Notify::new(),
50 }
51 }
52
53 /// Adds an item to the queue, waiting if necessary for space to become available.
54 ///
55 /// This method will block the current task until space is available in the queue.
56 ///
57 /// # Arguments
58 ///
59 /// * `item` - The item to be added to the queue.
60 pub async fn put(&self, item: T) {
61 loop {
62 {
63 let mut queue = self.queue.lock().await;
64 if queue.len() < self.capacity {
65 queue.push_back(item);
66 self.notify.notify_one(); // Notify only after successful push
67 return;
68 }
69 }
70 self.notify.notified().await;
71 }
72 }
73
74 /// Attempts to add an item to the queue within a specified timeout.
75 ///
76 /// This method will block the current task until space is available in the queue
77 /// or the timeout is reached.
78 ///
79 /// # Arguments
80 ///
81 /// * `item` - The item to be added to the queue.
82 /// * `timeout` - The maximum duration to wait for space to become available.
83 ///
84 /// # Returns
85 ///
86 /// `true` if the item was added to the queue, `false` if the timeout was reached.
87 pub async fn offer(&self, item: T, timeout: std::time::Duration) -> bool {
88 time::timeout(timeout, self.put(item)).await.is_ok()
89 }
90
91 /// Removes and returns an item from the queue, waiting if necessary until an item is available.
92 ///
93 /// This method will block the current task until an item is available in the queue.
94 ///
95 /// # Returns
96 ///
97 /// The item removed from the queue.
98 pub async fn take(&self) -> T {
99 loop {
100 {
101 let mut queue = self.queue.lock().await;
102 if let Some(item) = queue.pop_front() {
103 self.notify.notify_one(); // Notify only after successful pop
104 return item;
105 }
106 }
107 self.notify.notified().await;
108 }
109 }
110
111 /// Attempts to remove and return an item from the queue within a specified timeout.
112 ///
113 /// This method will block the current task until an item is available in the queue
114 /// or the timeout is reached.
115 ///
116 /// # Arguments
117 ///
118 /// * `timeout` - The maximum duration to wait for an item to become available.
119 ///
120 /// # Returns
121 ///
122 /// `Some(item)` if an item was removed from the queue, `None` if the timeout was reached.
123 pub async fn poll(&self, timeout: std::time::Duration) -> Option<T> {
124 time::timeout(timeout, self.take()).await.ok()
125 }
126
127 /// Attempts to remove and return an item from the queue without waiting.
128 ///
129 /// This method acquires a lock on the queue and attempts to remove an item.
130 /// If the queue is empty, it will notify waiting tasks.
131 ///
132 /// # Returns
133 ///
134 /// `Some(item)` if an item was removed from the queue, `None` if the queue was empty.
135 pub async fn try_poll(&self) -> Option<T> {
136 let mut queue = self.queue.lock().await;
137 let item = queue.pop_front();
138 if item.is_none() {
139 self.notify.notify_one(); // Notify only after successful pop
140 }
141 item
142 }
143
144 /// Checks if the queue is empty.
145 ///
146 /// This method acquires a lock on the queue and checks if it contains any items.
147 ///
148 /// # Returns
149 ///
150 /// `true` if the queue is empty, `false` otherwise.
151 pub async fn is_empty(&self) -> bool {
152 let queue = self.queue.lock().await;
153 queue.is_empty()
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use tokio::time::Duration;
160
161 use super::*;
162
163 #[tokio::test]
164 async fn put_item_in_queue() {
165 let queue = BlockingQueue::new(2);
166 queue.put(1).await;
167 let item = queue.take().await;
168 assert_eq!(item, 1);
169 }
170
171 #[tokio::test]
172 async fn offer_item_within_timeout() {
173 let queue = BlockingQueue::new(1);
174 let result = queue.offer(1, Duration::from_millis(100)).await;
175 assert!(result);
176 }
177
178 #[tokio::test]
179 async fn offer_item_exceeds_timeout() {
180 let queue = BlockingQueue::new(1);
181 queue.put(1).await;
182 let result = queue.offer(2, Duration::from_millis(100)).await;
183 assert!(!result);
184 }
185
186 #[tokio::test]
187 async fn poll_item_within_timeout() {
188 let queue = BlockingQueue::new(1);
189 queue.put(1).await;
190 let item = queue.poll(Duration::from_millis(100)).await;
191 assert_eq!(item, Some(1));
192 }
193
194 #[tokio::test]
195 async fn poll_item_exceeds_timeout() {
196 let queue = BlockingQueue::<()>::new(1);
197 let item = queue.poll(Duration::from_millis(100)).await;
198 assert_eq!(item, None);
199 }
200}