queue_rs/
queue.rs

1#[cfg(test)]
2extern crate env_logger;
3
4use std::cmp::Ordering;
5use std::fmt::Debug;
6use std::sync::Arc;
7
8use anyhow::Result;
9use thiserror::Error;
10
11pub use blocking_queue::*;
12pub use queue_mpsc::*;
13pub use queue_vec::*;
14
15mod blocking_queue;
16mod queue_mpsc;
17mod queue_vec;
18
19pub trait Element: Debug + Clone + Send + Sync {}
20
21impl Element for i8 {}
22
23impl Element for i16 {}
24
25impl Element for i32 {}
26
27impl Element for i64 {}
28
29impl Element for u8 {}
30
31impl Element for u16 {}
32
33impl Element for u32 {}
34
35impl Element for u64 {}
36
37impl Element for usize {}
38
39impl Element for f32 {}
40
41impl Element for f64 {}
42
43impl Element for String {}
44
45impl<T: Debug + Clone + Send + Sync> Element for Box<T> {}
46
47impl<T: Debug + Clone + Send + Sync> Element for Arc<T> {}
48
49#[derive(Error, Debug)]
50pub enum QueueError<E> {
51  #[error("Failed to offer an element: {0:?}")]
52  OfferError(E),
53  #[error("Failed to pool an element")]
54  PoolError,
55  #[error("Failed to peek an element")]
56  PeekError,
57}
58
59#[derive(Debug, Clone)]
60pub enum QueueSize {
61  Limitless,
62  Limited(usize),
63}
64
65impl QueueSize {
66  fn increment(&mut self) {
67    match self {
68      QueueSize::Limited(c) => {
69        *c += 1;
70      }
71      _ => {}
72    }
73  }
74
75  fn decrement(&mut self) {
76    match self {
77      QueueSize::Limited(c) => {
78        *c -= 1;
79      }
80      _ => {}
81    }
82  }
83}
84
85impl PartialEq<Self> for QueueSize {
86  fn eq(&self, other: &Self) -> bool {
87    match (self, other) {
88      (QueueSize::Limitless, QueueSize::Limitless) => true,
89      (QueueSize::Limited(l), QueueSize::Limited(r)) => l == r,
90      _ => false,
91    }
92  }
93}
94
95impl PartialOrd<QueueSize> for QueueSize {
96  fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
97    match (self, other) {
98      (QueueSize::Limitless, QueueSize::Limitless) => Some(Ordering::Equal),
99      (QueueSize::Limitless, _) => Some(Ordering::Greater),
100      (_, QueueSize::Limitless) => Some(Ordering::Less),
101      (QueueSize::Limited(l), QueueSize::Limited(r)) => l.partial_cmp(r),
102    }
103  }
104}
105
106pub trait QueueBehavior<E>: Send + Sized {
107  /// Returns whether this queue is empty.<br/>
108  /// このキューが空かどうかを返します。
109  fn is_empty(&self) -> bool {
110    self.len() == QueueSize::Limited(0)
111  }
112
113  /// Returns whether this queue is non-empty.<br/>
114  /// このキューが空でないかどうかを返します。
115  fn non_empty(&self) -> bool {
116    !self.is_empty()
117  }
118
119  /// Returns whether the queue size has reached its capacity.<br/>
120  /// このキューのサイズが容量まで到達したかどうかを返します。
121  fn is_full(&self) -> bool {
122    self.capacity() == self.len()
123  }
124
125  /// Returns whether the queue size has not reached its capacity.<br/>
126  /// このキューのサイズが容量まで到達してないかどうかを返します。
127  fn non_full(&self) -> bool {
128    !self.is_full()
129  }
130
131  /// Returns the length of this queue.<br/>
132  /// このキューの長さを返します。
133  fn len(&self) -> QueueSize;
134
135  /// Returns the capacity of this queue.<br/>
136  /// このキューの最大容量を返します。
137  fn capacity(&self) -> QueueSize;
138
139  /// The specified element will be inserted into this queue,
140  /// if the queue can be executed immediately without violating the capacity limit.<br/>
141  /// 容量制限に違反せずにすぐ実行できる場合は、指定された要素をこのキューに挿入します。
142  fn offer(&mut self, e: E) -> Result<()>;
143
144  /// Retrieves and deletes the head of the queue. Returns None if the queue is empty.<br/>
145  /// キューの先頭を取得および削除します。キューが空の場合は None を返します。
146  fn poll(&mut self) -> Result<Option<E>>;
147}
148
149pub trait HasPeekBehavior<E: Element>: QueueBehavior<E> {
150  /// Gets the head of the queue, but does not delete it. Returns None if the queue is empty.<br/>
151  /// キューの先頭を取得しますが、削除しません。キューが空の場合は None を返します。
152  fn peek(&self) -> Result<Option<E>>;
153}
154
155pub trait BlockingQueueBehavior<E: Element>: QueueBehavior<E> + Send {
156  /// Inserts the specified element into this queue. If necessary, waits until space is available.<br/>
157  /// 指定された要素をこのキューに挿入します。必要に応じて、空きが生じるまで待機します。
158  fn put(&mut self, e: E) -> Result<()>;
159
160  /// Retrieve the head of this queue and delete it. If necessary, wait until an element becomes available.<br/>
161  /// このキューの先頭を取得して削除します。必要に応じて、要素が利用可能になるまで待機します。
162  fn take(&mut self) -> Result<Option<E>>;
163}
164
165pub enum QueueType {
166  Vec,
167  MPSC,
168}
169
170#[derive(Debug, Clone)]
171pub enum Queue<T> {
172  Vec(QueueVec<T>),
173  MPSC(QueueMPSC<T>),
174}
175
176impl<T: Element + 'static> Queue<T> {
177  pub fn with_blocking(self) -> BlockingQueue<T, Queue<T>> {
178    BlockingQueue::new(self)
179  }
180}
181
182impl<T: Element + 'static> QueueBehavior<T> for Queue<T> {
183  fn len(&self) -> QueueSize {
184    match self {
185      Queue::Vec(inner) => inner.len(),
186      Queue::MPSC(inner) => inner.len(),
187    }
188  }
189
190  fn capacity(&self) -> QueueSize {
191    match self {
192      Queue::Vec(inner) => inner.capacity(),
193      Queue::MPSC(inner) => inner.capacity(),
194    }
195  }
196
197  fn offer(&mut self, e: T) -> Result<()> {
198    match self {
199      Queue::Vec(inner) => inner.offer(e),
200      Queue::MPSC(inner) => inner.offer(e),
201    }
202  }
203
204  fn poll(&mut self) -> Result<Option<T>> {
205    match self {
206      Queue::Vec(inner) => inner.poll(),
207      Queue::MPSC(inner) => inner.poll(),
208    }
209  }
210}
211
212pub fn create_queue<T: Element + 'static>(queue_type: QueueType, num_elements: Option<usize>) -> Queue<T> {
213  match (queue_type, num_elements) {
214    (QueueType::Vec, None) => Queue::Vec(QueueVec::<T>::new()),
215    (QueueType::Vec, Some(num)) => Queue::Vec(QueueVec::<T>::with_num_elements(num)),
216    (QueueType::MPSC, None) => Queue::MPSC(QueueMPSC::<T>::new()),
217    (QueueType::MPSC, Some(num)) => Queue::MPSC(QueueMPSC::<T>::with_num_elements(num)),
218  }
219}
220
221#[cfg(test)]
222mod tests {
223  use std::thread::sleep;
224  use std::time::Duration;
225  use std::{env, thread};
226
227  use fp_rust::sync::CountDownLatch;
228
229  use crate::queue::BlockingQueueBehavior;
230  use crate::queue::{create_queue, QueueBehavior, QueueType};
231
232  fn init_logger() {
233    env::set_var("RUST_LOG", "debug");
234    // env::set_var("RUST_LOG", "trace");
235    let _ = env_logger::try_init();
236  }
237
238  fn test_queue_vec<Q>(queue: Q)
239  where
240    Q: QueueBehavior<i32> + Clone + 'static, {
241    let cdl = CountDownLatch::new(1);
242    let cdl2 = cdl.clone();
243
244    let mut q1 = queue;
245    let mut q2 = q1.clone();
246
247    let max = 5;
248
249    let handler1 = thread::spawn(move || {
250      cdl2.countdown();
251      for i in 1..=max {
252        log::debug!("take: start: {}", i);
253        let n = q2.poll();
254        log::debug!("take: finish: {},{:?}", i, n);
255      }
256    });
257
258    cdl.wait();
259
260    let handler2 = thread::spawn(move || {
261      sleep(Duration::from_secs(3));
262
263      for i in 1..=max {
264        log::debug!("put: start: {}", i);
265        q1.offer(i).unwrap();
266        log::debug!("put: finish: {}", i);
267      }
268    });
269
270    handler1.join().unwrap();
271    handler2.join().unwrap();
272  }
273
274  fn test_blocking_queue_vec<Q>(queue: Q)
275  where
276    Q: BlockingQueueBehavior<i32> + Clone + 'static, {
277    let cdl = CountDownLatch::new(1);
278    let cdl2 = cdl.clone();
279
280    let mut bqv1 = queue;
281    let mut bqv2 = bqv1.clone();
282
283    let max = 5;
284
285    let handler1 = thread::spawn(move || {
286      cdl2.countdown();
287      for i in 1..=max {
288        log::debug!("take: start: {}", i);
289        let n = bqv2.take();
290        log::debug!("take: finish: {},{:?}", i, n);
291      }
292    });
293
294    cdl.wait();
295
296    let handler2 = thread::spawn(move || {
297      sleep(Duration::from_secs(3));
298
299      for i in 1..=max {
300        log::debug!("put: start: {}", i);
301        bqv1.offer(i).unwrap();
302        log::debug!("put: finish: {}", i);
303      }
304    });
305
306    handler1.join().unwrap();
307    handler2.join().unwrap();
308  }
309
310  #[test]
311  fn test() {
312    init_logger();
313
314    let q = create_queue(QueueType::Vec, Some(32));
315    test_queue_vec(q);
316
317    let bq = create_queue(QueueType::Vec, Some(32)).with_blocking();
318    test_blocking_queue_vec(bq);
319  }
320}