queen_io/queue/
mpmc.rs

1use std::sync::Arc;
2use std::sync::atomic::AtomicUsize;
3use std::sync::atomic::Ordering::{Relaxed, Acquire, AcqRel};
4use std::os::unix::io::{AsRawFd, RawFd};
5use std::io;
6
7use crate::plus::mpmc_queue;
8use crate::waker::Waker;
9use crate::epoll::{Ready, Source, Epoll, Token, EpollOpt};
10
11pub struct Queue<T: Send> {
12    inner: Arc<Inner<T>>
13}
14
15struct Inner<T> {
16    queue: mpmc_queue::Queue<T>,
17    pending: AtomicUsize,
18    waker: Waker
19}
20
21impl <T: Send> Queue<T> {
22    pub fn with_capacity(capacity: usize) -> io::Result<Queue<T>> {
23        Ok(Queue {
24            inner: Arc::new(Inner {
25                queue: mpmc_queue::Queue::with_capacity(capacity),
26                pending: AtomicUsize::new(0),
27                waker: Waker::new()?
28            })
29        })
30    }
31
32    fn inc(&self) -> io::Result<()> {
33        let cnt = self.inner.pending.fetch_add(1, Acquire);
34
35        if 0 == cnt {
36            self.inner.waker.set_readiness(Ready::readable())?;
37        }
38        Ok(())
39    }
40
41    fn dec(&self) -> io::Result<()> {
42        let first = self.inner.pending.load(Acquire);
43
44        if first == 1 {
45            self.inner.waker.set_readiness(Ready::empty())?;
46        }
47
48        let second = self.inner.pending.fetch_sub(1, AcqRel);
49
50        if first == 1 && second > 1 {
51            self.inner.waker.set_readiness(Ready::readable())?;
52        }
53
54        Ok(())
55    }
56
57    pub fn push(&self, value: T) -> Result<(), T> {
58        self.inner.queue.push(value).and_then(|_| {let _ = self.inc(); Ok(())})
59    }
60
61    pub fn pop(&self) -> Option<T> {
62        self.inner.queue.pop().and_then(|res| {let _ = self.dec(); Some(res)})
63    }
64
65    pub fn pending(&self) -> usize {
66        self.inner.pending.load(Relaxed)
67    }
68}
69
70impl<T: Send> Clone for Queue<T> {
71    fn clone(&self) -> Queue<T> {
72        Queue {
73            inner: self.inner.clone()
74        }
75    }
76}
77
78impl<T: Send> AsRawFd for Queue<T> {
79    fn as_raw_fd(&self) -> RawFd {
80        self.inner.waker.as_raw_fd()
81    }
82}
83
84impl<T: Send> Source for Queue<T> {
85    fn add(&self, epoll: &Epoll, token: Token, interest: Ready, opts: EpollOpt) -> io::Result<()> {
86        self.inner.waker.add(epoll, token, interest, opts)?;
87
88        if self.inner.pending.load(Relaxed) > 0 {
89            self.inner.waker.set_readiness(Ready::readable())?;
90        }
91
92        Ok(())
93    }
94
95    fn modify(&self, epoll: &Epoll, token: Token, interest: Ready, opts: EpollOpt) -> io::Result<()> {
96        self.inner.waker.modify(epoll, token, interest, opts)
97    }
98
99    fn delete(&self, epoll: &Epoll) -> io::Result<()> {
100        self.inner.waker.delete(epoll)
101    }
102}