remote_trait_object/
queue.rs

1use crossbeam::channel::{
2    bounded, Receiver,
3    RecvTimeoutError::{Disconnected, Timeout},
4    Sender,
5};
6use parking_lot::Mutex;
7
8/// Blocking concurrent Queue. (Crossbeam's queue doens't block)
9/// Please use the queue with Arc
10#[derive(Debug)]
11pub struct Queue<T> {
12    // sender is None only when the close is called
13    sender: Mutex<Option<Sender<T>>>,
14    // receiver is None only when the close is called
15    recver: Mutex<Option<Receiver<T>>>,
16}
17
18impl<T> Queue<T> {
19    pub fn new(size: usize) -> Self {
20        let (sender, recver) = bounded(size);
21        Queue {
22            sender: Mutex::new(Some(sender)),
23            recver: Mutex::new(Some(recver)),
24        }
25    }
26
27    pub fn push(&self, x: T) -> Result<(), QueueClosed> {
28        let guard = self.sender.lock();
29        let sender = guard.as_ref().ok_or(QueueClosed)?;
30        sender.send(x).map_err(|_| QueueClosed)
31    }
32    pub fn pop(&self, timeout: Option<std::time::Duration>) -> Result<T, PopError> {
33        let guard = self.recver.lock();
34        let recver = guard.as_ref().ok_or(PopError::QueueClosed)?;
35        if let Some(duration) = timeout {
36            recver.recv_timeout(duration).map_err(|err| match err {
37                Timeout => PopError::Timeout,
38                Disconnected => PopError::QueueClosed,
39            })
40        } else {
41            recver.recv().map_err(|_| PopError::QueueClosed)
42        }
43    }
44}
45
46#[derive(Debug)]
47pub struct QueueClosed;
48
49#[derive(Debug)]
50pub enum PopError {
51    Timeout,
52    QueueClosed,
53}