queue_rs/queue/
queue_mpsc.rs

1use crate::queue::{Element, QueueBehavior, QueueError, QueueSize};
2use std::fmt::Debug;
3use std::sync::mpsc::{channel, Receiver, SendError, Sender, TryRecvError};
4use std::sync::{Arc, Mutex};
5
6use anyhow::Result;
7
8#[derive(Debug, Clone)]
9pub struct QueueMPSC<E> {
10  rx: Arc<Mutex<Receiver<E>>>,
11  tx: Sender<E>,
12  count: Arc<Mutex<QueueSize>>,
13  capacity: Arc<Mutex<QueueSize>>,
14}
15
16impl<E: Element + 'static> QueueBehavior<E> for QueueMPSC<E> {
17  fn len(&self) -> QueueSize {
18    let count_guard = self.count.lock().unwrap();
19    count_guard.clone()
20  }
21
22  fn capacity(&self) -> QueueSize {
23    let capacity_guard = self.capacity.lock().unwrap();
24    capacity_guard.clone()
25  }
26
27  fn offer(&mut self, e: E) -> anyhow::Result<()> {
28    match self.tx.send(e) {
29      Ok(_) => {
30        let mut count_guard = self.count.lock().unwrap();
31        count_guard.increment();
32        Ok(())
33      }
34      Err(SendError(e)) => Err(anyhow::Error::new(QueueError::OfferError(e))),
35    }
36  }
37
38  fn poll(&mut self) -> Result<Option<E>> {
39    let receiver_guard = self.rx.lock().unwrap();
40    match receiver_guard.try_recv() {
41      Ok(e) => {
42        let mut count_guard = self.count.lock().unwrap();
43        count_guard.decrement();
44        Ok(Some(e))
45      }
46      Err(TryRecvError::Empty) => Ok(None),
47      Err(TryRecvError::Disconnected) => Err(anyhow::Error::new(QueueError::<E>::PoolError)),
48    }
49  }
50}
51
52impl<E: Element + 'static> QueueMPSC<E> {
53  pub fn new() -> Self {
54    let (tx, rx) = channel();
55    Self {
56      rx: Arc::new(Mutex::new(rx)),
57      tx,
58      count: Arc::new(Mutex::new(QueueSize::Limited(0))),
59      capacity: Arc::new(Mutex::new(QueueSize::Limitless)),
60    }
61  }
62
63  pub fn with_num_elements(num_elements: usize) -> Self {
64    let (tx, rx) = channel();
65    Self {
66      rx: Arc::new(Mutex::new(rx)),
67      tx,
68      count: Arc::new(Mutex::new(QueueSize::Limited(0))),
69      capacity: Arc::new(Mutex::new(QueueSize::Limited(num_elements))),
70    }
71  }
72}