rocketmq_rust/blocking_queue.rs
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::collections::VecDeque;
19
20use tokio::sync::Mutex;
21use tokio::sync::Notify;
22use tokio::time;
23
24/// A thread-safe bounded blocking queue. To replace Java `LinkedBlockingQueue`.
25///
26/// This queue allows multiple producers and consumers to add and remove items
27/// concurrently. It uses a `tokio::sync::Mutex` to ensure mutual exclusion and
28/// a `tokio::sync::Notify` to notify waiting tasks.
29pub struct BlockingQueue<T> {
30 /// The underlying queue storing the items.
31 queue: Mutex<VecDeque<T>>,
32 /// The maximum capacity of the queue.
33 capacity: usize,
34 /// A notification mechanism to wake up waiting tasks.
35 notify: Notify,
36}
37
38impl<T> BlockingQueue<T> {
39 /// Creates a new `BlockingQueue` with the specified capacity.
40 ///
41 /// # Arguments
42 ///
43 /// * `capacity` - The maximum number of items the queue can hold.
44 ///
45 /// # Returns
46 ///
47 /// A new instance of `BlockingQueue`.
48 pub fn new(capacity: usize) -> Self {
49 BlockingQueue {
50 queue: Mutex::new(VecDeque::with_capacity(capacity)),
51 capacity,
52 notify: Notify::new(),
53 }
54 }
55
56 /// Adds an item to the queue, waiting if necessary for space to become available.
57 ///
58 /// This method will block the current task until space is available in the queue.
59 ///
60 /// # Arguments
61 ///
62 /// * `item` - The item to be added to the queue.
63 pub async fn put(&self, item: T) {
64 loop {
65 {
66 let mut queue = self.queue.lock().await;
67 if queue.len() < self.capacity {
68 queue.push_back(item);
69 self.notify.notify_one(); // Notify only after successful push
70 return;
71 }
72 }
73 self.notify.notified().await;
74 }
75 }
76
77 /// Attempts to add an item to the queue within a specified timeout.
78 ///
79 /// This method will block the current task until space is available in the queue
80 /// or the timeout is reached.
81 ///
82 /// # Arguments
83 ///
84 /// * `item` - The item to be added to the queue.
85 /// * `timeout` - The maximum duration to wait for space to become available.
86 ///
87 /// # Returns
88 ///
89 /// `true` if the item was added to the queue, `false` if the timeout was reached.
90 pub async fn offer(&self, item: T, timeout: std::time::Duration) -> bool {
91 time::timeout(timeout, self.put(item)).await.is_ok()
92 }
93
94 /// Removes and returns an item from the queue, waiting if necessary until an item is available.
95 ///
96 /// This method will block the current task until an item is available in the queue.
97 ///
98 /// # Returns
99 ///
100 /// The item removed from the queue.
101 pub async fn take(&self) -> T {
102 loop {
103 {
104 let mut queue = self.queue.lock().await;
105 if let Some(item) = queue.pop_front() {
106 self.notify.notify_one(); // Notify only after successful pop
107 return item;
108 }
109 }
110 self.notify.notified().await;
111 }
112 }
113
114 /// Attempts to remove and return an item from the queue within a specified timeout.
115 ///
116 /// This method will block the current task until an item is available in the queue
117 /// or the timeout is reached.
118 ///
119 /// # Arguments
120 ///
121 /// * `timeout` - The maximum duration to wait for an item to become available.
122 ///
123 /// # Returns
124 ///
125 /// `Some(item)` if an item was removed from the queue, `None` if the timeout was reached.
126 pub async fn poll(&self, timeout: std::time::Duration) -> Option<T> {
127 time::timeout(timeout, self.take()).await.ok()
128 }
129
130 /// Attempts to remove and return an item from the queue without waiting.
131 ///
132 /// This method acquires a lock on the queue and attempts to remove an item.
133 /// If the queue is empty, it will notify waiting tasks.
134 ///
135 /// # Returns
136 ///
137 /// `Some(item)` if an item was removed from the queue, `None` if the queue was empty.
138 pub async fn try_poll(&self) -> Option<T> {
139 let mut queue = self.queue.lock().await;
140 let item = queue.pop_front();
141 if item.is_none() {
142 self.notify.notify_one(); // Notify only after successful pop
143 }
144 item
145 }
146
147 /// Checks if the queue is empty.
148 ///
149 /// This method acquires a lock on the queue and checks if it contains any items.
150 ///
151 /// # Returns
152 ///
153 /// `true` if the queue is empty, `false` otherwise.
154 pub async fn is_empty(&self) -> bool {
155 let queue = self.queue.lock().await;
156 queue.is_empty()
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use tokio::time::Duration;
163
164 use super::*;
165
166 #[tokio::test]
167 async fn put_item_in_queue() {
168 let queue = BlockingQueue::new(2);
169 queue.put(1).await;
170 let item = queue.take().await;
171 assert_eq!(item, 1);
172 }
173
174 #[tokio::test]
175 async fn offer_item_within_timeout() {
176 let queue = BlockingQueue::new(1);
177 let result = queue.offer(1, Duration::from_millis(100)).await;
178 assert!(result);
179 }
180
181 #[tokio::test]
182 async fn offer_item_exceeds_timeout() {
183 let queue = BlockingQueue::new(1);
184 queue.put(1).await;
185 let result = queue.offer(2, Duration::from_millis(100)).await;
186 assert!(!result);
187 }
188
189 #[tokio::test]
190 async fn poll_item_within_timeout() {
191 let queue = BlockingQueue::new(1);
192 queue.put(1).await;
193 let item = queue.poll(Duration::from_millis(100)).await;
194 assert_eq!(item, Some(1));
195 }
196
197 #[tokio::test]
198 async fn poll_item_exceeds_timeout() {
199 let queue = BlockingQueue::<()>::new(1);
200 let item = queue.poll(Duration::from_millis(100)).await;
201 assert_eq!(item, None);
202 }
203}