queue_rs/queue/
queue_mpsc.rs1use crate::queue::{Element, QueueBehavior, QueueError, QueueSize};
2use std::fmt::Debug;
3use std::sync::mpsc::{channel, Receiver, SendError, Sender, TryRecvError};
4use std::sync::{Arc, Mutex};
5
6use anyhow::Result;
7
8#[derive(Debug, Clone)]
9pub struct QueueMPSC<E> {
10 rx: Arc<Mutex<Receiver<E>>>,
11 tx: Sender<E>,
12 count: Arc<Mutex<QueueSize>>,
13 capacity: Arc<Mutex<QueueSize>>,
14}
15
16impl<E: Element + 'static> QueueBehavior<E> for QueueMPSC<E> {
17 fn len(&self) -> QueueSize {
18 let count_guard = self.count.lock().unwrap();
19 count_guard.clone()
20 }
21
22 fn capacity(&self) -> QueueSize {
23 let capacity_guard = self.capacity.lock().unwrap();
24 capacity_guard.clone()
25 }
26
27 fn offer(&mut self, e: E) -> anyhow::Result<()> {
28 match self.tx.send(e) {
29 Ok(_) => {
30 let mut count_guard = self.count.lock().unwrap();
31 count_guard.increment();
32 Ok(())
33 }
34 Err(SendError(e)) => Err(anyhow::Error::new(QueueError::OfferError(e))),
35 }
36 }
37
38 fn poll(&mut self) -> Result<Option<E>> {
39 let receiver_guard = self.rx.lock().unwrap();
40 match receiver_guard.try_recv() {
41 Ok(e) => {
42 let mut count_guard = self.count.lock().unwrap();
43 count_guard.decrement();
44 Ok(Some(e))
45 }
46 Err(TryRecvError::Empty) => Ok(None),
47 Err(TryRecvError::Disconnected) => Err(anyhow::Error::new(QueueError::<E>::PoolError)),
48 }
49 }
50}
51
52impl<E: Element + 'static> QueueMPSC<E> {
53 pub fn new() -> Self {
54 let (tx, rx) = channel();
55 Self {
56 rx: Arc::new(Mutex::new(rx)),
57 tx,
58 count: Arc::new(Mutex::new(QueueSize::Limited(0))),
59 capacity: Arc::new(Mutex::new(QueueSize::Limitless)),
60 }
61 }
62
63 pub fn with_num_elements(num_elements: usize) -> Self {
64 let (tx, rx) = channel();
65 Self {
66 rx: Arc::new(Mutex::new(rx)),
67 tx,
68 count: Arc::new(Mutex::new(QueueSize::Limited(0))),
69 capacity: Arc::new(Mutex::new(QueueSize::Limited(num_elements))),
70 }
71 }
72}