queue_rs/queue/
blocking_queue.rs1use std::fmt::Debug;
2use std::marker::PhantomData;
3use std::sync::{Arc, Condvar, Mutex};
4
5use anyhow::Result;
6
7use crate::queue::{BlockingQueueBehavior, Element, QueueBehavior, QueueSize};
8
9#[derive(Debug, Clone)]
10pub struct BlockingQueue<E, Q: QueueBehavior<E>> {
11 underlying: Arc<(Mutex<Q>, Condvar, Condvar)>,
12 p: PhantomData<E>,
13}
14
15impl<E: Element + 'static, Q: QueueBehavior<E>> QueueBehavior<E> for BlockingQueue<E, Q> {
16 fn len(&self) -> QueueSize {
17 let (queue_vec_mutex, _, _) = &*self.underlying;
18 let queue_vec_mutex_guard = queue_vec_mutex.lock().unwrap();
19 queue_vec_mutex_guard.len()
20 }
21
22 fn capacity(&self) -> QueueSize {
23 let (queue_vec_mutex, _, _) = &*self.underlying;
24 let queue_vec_mutex_guard = queue_vec_mutex.lock().unwrap();
25 queue_vec_mutex_guard.capacity()
26 }
27
28 fn offer(&mut self, e: E) -> Result<()> {
29 let (queue_vec_mutex, _, not_empty) = &*self.underlying;
30 let mut queue_vec_mutex_guard = queue_vec_mutex.lock().unwrap();
31 let result = queue_vec_mutex_guard.offer(e);
32 not_empty.notify_one();
33 result
34 }
35
36 fn poll(&mut self) -> Result<Option<E>> {
37 let (queue_vec_mutex, not_full, _) = &*self.underlying;
38 let mut queue_vec_mutex_guard = queue_vec_mutex.lock().unwrap();
39 let result = queue_vec_mutex_guard.poll();
40 not_full.notify_one();
41 result
42 }
43
44 }
52
53impl<E: Element + 'static, Q: QueueBehavior<E>> BlockingQueueBehavior<E> for BlockingQueue<E, Q> {
54 fn put(&mut self, e: E) -> Result<()> {
55 let (queue_vec_mutex, not_full, not_empty) = &*self.underlying;
56 let mut queue_vec_mutex_guard = queue_vec_mutex.lock().unwrap();
57 while queue_vec_mutex_guard.is_full() {
58 queue_vec_mutex_guard = not_full.wait(queue_vec_mutex_guard).unwrap();
59 }
60 let result = queue_vec_mutex_guard.offer(e);
61 not_empty.notify_one();
62 result
63 }
64
65 fn take(&mut self) -> Result<Option<E>> {
66 let (queue_vec_mutex, not_full, not_empty) = &*self.underlying;
67 let mut queue_vec_mutex_guard = queue_vec_mutex.lock().unwrap();
68 while queue_vec_mutex_guard.is_empty() {
69 queue_vec_mutex_guard = not_empty.wait(queue_vec_mutex_guard).unwrap();
70 }
71 let result = queue_vec_mutex_guard.poll();
72 not_full.notify_one();
73 result
74 }
75}
76
77impl<E, Q: QueueBehavior<E>> BlockingQueue<E, Q> {
78 pub fn new(queue: Q) -> Self {
79 Self {
80 underlying: Arc::new((Mutex::new(queue), Condvar::new(), Condvar::new())),
81 p: PhantomData::default(),
82 }
83 }
84}