rocketmq_rust/
blocking_queue.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::collections::VecDeque;
19
20use tokio::sync::Mutex;
21use tokio::sync::Notify;
22use tokio::time;
23
24/// A thread-safe bounded blocking queue. To replace Java `LinkedBlockingQueue`.
25///
26/// This queue allows multiple producers and consumers to add and remove items
27/// concurrently. It uses a `tokio::sync::Mutex` to ensure mutual exclusion and
28/// a `tokio::sync::Notify` to notify waiting tasks.
29pub struct BlockingQueue<T> {
30    /// The underlying queue storing the items.
31    queue: Mutex<VecDeque<T>>,
32    /// The maximum capacity of the queue.
33    capacity: usize,
34    /// A notification mechanism to wake up waiting tasks.
35    notify: Notify,
36}
37
38impl<T> BlockingQueue<T> {
39    /// Creates a new `BlockingQueue` with the specified capacity.
40    ///
41    /// # Arguments
42    ///
43    /// * `capacity` - The maximum number of items the queue can hold.
44    ///
45    /// # Returns
46    ///
47    /// A new instance of `BlockingQueue`.
48    pub fn new(capacity: usize) -> Self {
49        BlockingQueue {
50            queue: Mutex::new(VecDeque::with_capacity(capacity)),
51            capacity,
52            notify: Notify::new(),
53        }
54    }
55
56    /// Adds an item to the queue, waiting if necessary for space to become available.
57    ///
58    /// This method will block the current task until space is available in the queue.
59    ///
60    /// # Arguments
61    ///
62    /// * `item` - The item to be added to the queue.
63    pub async fn put(&self, item: T) {
64        loop {
65            {
66                let mut queue = self.queue.lock().await;
67                if queue.len() < self.capacity {
68                    queue.push_back(item);
69                    self.notify.notify_one(); // Notify only after successful push
70                    return;
71                }
72            }
73            self.notify.notified().await;
74        }
75    }
76
77    /// Attempts to add an item to the queue within a specified timeout.
78    ///
79    /// This method will block the current task until space is available in the queue
80    /// or the timeout is reached.
81    ///
82    /// # Arguments
83    ///
84    /// * `item` - The item to be added to the queue.
85    /// * `timeout` - The maximum duration to wait for space to become available.
86    ///
87    /// # Returns
88    ///
89    /// `true` if the item was added to the queue, `false` if the timeout was reached.
90    pub async fn offer(&self, item: T, timeout: std::time::Duration) -> bool {
91        time::timeout(timeout, self.put(item)).await.is_ok()
92    }
93
94    /// Removes and returns an item from the queue, waiting if necessary until an item is available.
95    ///
96    /// This method will block the current task until an item is available in the queue.
97    ///
98    /// # Returns
99    ///
100    /// The item removed from the queue.
101    pub async fn take(&self) -> T {
102        loop {
103            {
104                let mut queue = self.queue.lock().await;
105                if let Some(item) = queue.pop_front() {
106                    self.notify.notify_one(); // Notify only after successful pop
107                    return item;
108                }
109            }
110            self.notify.notified().await;
111        }
112    }
113
114    /// Attempts to remove and return an item from the queue within a specified timeout.
115    ///
116    /// This method will block the current task until an item is available in the queue
117    /// or the timeout is reached.
118    ///
119    /// # Arguments
120    ///
121    /// * `timeout` - The maximum duration to wait for an item to become available.
122    ///
123    /// # Returns
124    ///
125    /// `Some(item)` if an item was removed from the queue, `None` if the timeout was reached.
126    pub async fn poll(&self, timeout: std::time::Duration) -> Option<T> {
127        time::timeout(timeout, self.take()).await.ok()
128    }
129
130    /// Attempts to remove and return an item from the queue without waiting.
131    ///
132    /// This method acquires a lock on the queue and attempts to remove an item.
133    /// If the queue is empty, it will notify waiting tasks.
134    ///
135    /// # Returns
136    ///
137    /// `Some(item)` if an item was removed from the queue, `None` if the queue was empty.
138    pub async fn try_poll(&self) -> Option<T> {
139        let mut queue = self.queue.lock().await;
140        let item = queue.pop_front();
141        if item.is_none() {
142            self.notify.notify_one(); // Notify only after successful pop
143        }
144        item
145    }
146
147    /// Checks if the queue is empty.
148    ///
149    /// This method acquires a lock on the queue and checks if it contains any items.
150    ///
151    /// # Returns
152    ///
153    /// `true` if the queue is empty, `false` otherwise.
154    pub async fn is_empty(&self) -> bool {
155        let queue = self.queue.lock().await;
156        queue.is_empty()
157    }
158}
159
160#[cfg(test)]
161mod tests {
162    use tokio::time::Duration;
163
164    use super::*;
165
166    #[tokio::test]
167    async fn put_item_in_queue() {
168        let queue = BlockingQueue::new(2);
169        queue.put(1).await;
170        let item = queue.take().await;
171        assert_eq!(item, 1);
172    }
173
174    #[tokio::test]
175    async fn offer_item_within_timeout() {
176        let queue = BlockingQueue::new(1);
177        let result = queue.offer(1, Duration::from_millis(100)).await;
178        assert!(result);
179    }
180
181    #[tokio::test]
182    async fn offer_item_exceeds_timeout() {
183        let queue = BlockingQueue::new(1);
184        queue.put(1).await;
185        let result = queue.offer(2, Duration::from_millis(100)).await;
186        assert!(!result);
187    }
188
189    #[tokio::test]
190    async fn poll_item_within_timeout() {
191        let queue = BlockingQueue::new(1);
192        queue.put(1).await;
193        let item = queue.poll(Duration::from_millis(100)).await;
194        assert_eq!(item, Some(1));
195    }
196
197    #[tokio::test]
198    async fn poll_item_exceeds_timeout() {
199        let queue = BlockingQueue::<()>::new(1);
200        let item = queue.poll(Duration::from_millis(100)).await;
201        assert_eq!(item, None);
202    }
203}