queue_rs/queue/
blocking_queue.rs

1use std::fmt::Debug;
2use std::marker::PhantomData;
3use std::sync::{Arc, Condvar, Mutex};
4
5use anyhow::Result;
6
7use crate::queue::{BlockingQueueBehavior, Element, QueueBehavior, QueueSize};
8
9#[derive(Debug, Clone)]
10pub struct BlockingQueue<E, Q: QueueBehavior<E>> {
11  underlying: Arc<(Mutex<Q>, Condvar, Condvar)>,
12  p: PhantomData<E>,
13}
14
15impl<E: Element + 'static, Q: QueueBehavior<E>> QueueBehavior<E> for BlockingQueue<E, Q> {
16  fn len(&self) -> QueueSize {
17    let (queue_vec_mutex, _, _) = &*self.underlying;
18    let queue_vec_mutex_guard = queue_vec_mutex.lock().unwrap();
19    queue_vec_mutex_guard.len()
20  }
21
22  fn capacity(&self) -> QueueSize {
23    let (queue_vec_mutex, _, _) = &*self.underlying;
24    let queue_vec_mutex_guard = queue_vec_mutex.lock().unwrap();
25    queue_vec_mutex_guard.capacity()
26  }
27
28  fn offer(&mut self, e: E) -> Result<()> {
29    let (queue_vec_mutex, _, not_empty) = &*self.underlying;
30    let mut queue_vec_mutex_guard = queue_vec_mutex.lock().unwrap();
31    let result = queue_vec_mutex_guard.offer(e);
32    not_empty.notify_one();
33    result
34  }
35
36  fn poll(&mut self) -> Result<Option<E>> {
37    let (queue_vec_mutex, not_full, _) = &*self.underlying;
38    let mut queue_vec_mutex_guard = queue_vec_mutex.lock().unwrap();
39    let result = queue_vec_mutex_guard.poll();
40    not_full.notify_one();
41    result
42  }
43
44  // fn peek(&self) -> Result<Option<E>> {
45  //   let (queue_vec_mutex, not_full, _) = &*self.underlying;
46  //   let queue_vec_mutex_guard = queue_vec_mutex.lock().unwrap();
47  //   let result = queue_vec_mutex_guard.peek();
48  //   not_full.notify_one();
49  //   result
50  // }
51}
52
53impl<E: Element + 'static, Q: QueueBehavior<E>> BlockingQueueBehavior<E> for BlockingQueue<E, Q> {
54  fn put(&mut self, e: E) -> Result<()> {
55    let (queue_vec_mutex, not_full, not_empty) = &*self.underlying;
56    let mut queue_vec_mutex_guard = queue_vec_mutex.lock().unwrap();
57    while queue_vec_mutex_guard.is_full() {
58      queue_vec_mutex_guard = not_full.wait(queue_vec_mutex_guard).unwrap();
59    }
60    let result = queue_vec_mutex_guard.offer(e);
61    not_empty.notify_one();
62    result
63  }
64
65  fn take(&mut self) -> Result<Option<E>> {
66    let (queue_vec_mutex, not_full, not_empty) = &*self.underlying;
67    let mut queue_vec_mutex_guard = queue_vec_mutex.lock().unwrap();
68    while queue_vec_mutex_guard.is_empty() {
69      queue_vec_mutex_guard = not_empty.wait(queue_vec_mutex_guard).unwrap();
70    }
71    let result = queue_vec_mutex_guard.poll();
72    not_full.notify_one();
73    result
74  }
75}
76
77impl<E, Q: QueueBehavior<E>> BlockingQueue<E, Q> {
78  pub fn new(queue: Q) -> Self {
79    Self {
80      underlying: Arc::new((Mutex::new(queue), Condvar::new(), Condvar::new())),
81      p: PhantomData::default(),
82    }
83  }
84}