Skip to main content

rocketmq_rust/
blocking_queue.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16
17use tokio::sync::Mutex;
18use tokio::sync::Notify;
19use tokio::time;
20
21/// A thread-safe bounded blocking queue. To replace Java `LinkedBlockingQueue`.
22///
23/// This queue allows multiple producers and consumers to add and remove items
24/// concurrently. It uses a `tokio::sync::Mutex` to ensure mutual exclusion and
25/// a `tokio::sync::Notify` to notify waiting tasks.
26pub struct BlockingQueue<T> {
27    /// The underlying queue storing the items.
28    queue: Mutex<VecDeque<T>>,
29    /// The maximum capacity of the queue.
30    capacity: usize,
31    /// A notification mechanism to wake up waiting tasks.
32    notify: Notify,
33}
34
35impl<T> BlockingQueue<T> {
36    /// Creates a new `BlockingQueue` with the specified capacity.
37    ///
38    /// # Arguments
39    ///
40    /// * `capacity` - The maximum number of items the queue can hold.
41    ///
42    /// # Returns
43    ///
44    /// A new instance of `BlockingQueue`.
45    pub fn new(capacity: usize) -> Self {
46        BlockingQueue {
47            queue: Mutex::new(VecDeque::with_capacity(capacity)),
48            capacity,
49            notify: Notify::new(),
50        }
51    }
52
53    /// Adds an item to the queue, waiting if necessary for space to become available.
54    ///
55    /// This method will block the current task until space is available in the queue.
56    ///
57    /// # Arguments
58    ///
59    /// * `item` - The item to be added to the queue.
60    pub async fn put(&self, item: T) {
61        loop {
62            {
63                let mut queue = self.queue.lock().await;
64                if queue.len() < self.capacity {
65                    queue.push_back(item);
66                    self.notify.notify_one(); // Notify only after successful push
67                    return;
68                }
69            }
70            self.notify.notified().await;
71        }
72    }
73
74    /// Attempts to add an item to the queue within a specified timeout.
75    ///
76    /// This method will block the current task until space is available in the queue
77    /// or the timeout is reached.
78    ///
79    /// # Arguments
80    ///
81    /// * `item` - The item to be added to the queue.
82    /// * `timeout` - The maximum duration to wait for space to become available.
83    ///
84    /// # Returns
85    ///
86    /// `true` if the item was added to the queue, `false` if the timeout was reached.
87    pub async fn offer(&self, item: T, timeout: std::time::Duration) -> bool {
88        time::timeout(timeout, self.put(item)).await.is_ok()
89    }
90
91    /// Removes and returns an item from the queue, waiting if necessary until an item is available.
92    ///
93    /// This method will block the current task until an item is available in the queue.
94    ///
95    /// # Returns
96    ///
97    /// The item removed from the queue.
98    pub async fn take(&self) -> T {
99        loop {
100            {
101                let mut queue = self.queue.lock().await;
102                if let Some(item) = queue.pop_front() {
103                    self.notify.notify_one(); // Notify only after successful pop
104                    return item;
105                }
106            }
107            self.notify.notified().await;
108        }
109    }
110
111    /// Attempts to remove and return an item from the queue within a specified timeout.
112    ///
113    /// This method will block the current task until an item is available in the queue
114    /// or the timeout is reached.
115    ///
116    /// # Arguments
117    ///
118    /// * `timeout` - The maximum duration to wait for an item to become available.
119    ///
120    /// # Returns
121    ///
122    /// `Some(item)` if an item was removed from the queue, `None` if the timeout was reached.
123    pub async fn poll(&self, timeout: std::time::Duration) -> Option<T> {
124        time::timeout(timeout, self.take()).await.ok()
125    }
126
127    /// Attempts to remove and return an item from the queue without waiting.
128    ///
129    /// This method acquires a lock on the queue and attempts to remove an item.
130    /// If the queue is empty, it will notify waiting tasks.
131    ///
132    /// # Returns
133    ///
134    /// `Some(item)` if an item was removed from the queue, `None` if the queue was empty.
135    pub async fn try_poll(&self) -> Option<T> {
136        let mut queue = self.queue.lock().await;
137        let item = queue.pop_front();
138        if item.is_none() {
139            self.notify.notify_one(); // Notify only after successful pop
140        }
141        item
142    }
143
144    /// Checks if the queue is empty.
145    ///
146    /// This method acquires a lock on the queue and checks if it contains any items.
147    ///
148    /// # Returns
149    ///
150    /// `true` if the queue is empty, `false` otherwise.
151    pub async fn is_empty(&self) -> bool {
152        let queue = self.queue.lock().await;
153        queue.is_empty()
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use tokio::time::Duration;
160
161    use super::*;
162
163    #[tokio::test]
164    async fn put_item_in_queue() {
165        let queue = BlockingQueue::new(2);
166        queue.put(1).await;
167        let item = queue.take().await;
168        assert_eq!(item, 1);
169    }
170
171    #[tokio::test]
172    async fn offer_item_within_timeout() {
173        let queue = BlockingQueue::new(1);
174        let result = queue.offer(1, Duration::from_millis(100)).await;
175        assert!(result);
176    }
177
178    #[tokio::test]
179    async fn offer_item_exceeds_timeout() {
180        let queue = BlockingQueue::new(1);
181        queue.put(1).await;
182        let result = queue.offer(2, Duration::from_millis(100)).await;
183        assert!(!result);
184    }
185
186    #[tokio::test]
187    async fn poll_item_within_timeout() {
188        let queue = BlockingQueue::new(1);
189        queue.put(1).await;
190        let item = queue.poll(Duration::from_millis(100)).await;
191        assert_eq!(item, Some(1));
192    }
193
194    #[tokio::test]
195    async fn poll_item_exceeds_timeout() {
196        let queue = BlockingQueue::<()>::new(1);
197        let item = queue.poll(Duration::from_millis(100)).await;
198        assert_eq!(item, None);
199    }
200}