1use std::sync::Arc;
2use std::sync::atomic::AtomicUsize;
3use std::sync::atomic::Ordering::{Relaxed, Acquire, AcqRel};
4use std::os::unix::io::{AsRawFd, RawFd};
5use std::io;
6
7use crate::plus::mpmc_queue;
8use crate::waker::Waker;
9use crate::epoll::{Ready, Source, Epoll, Token, EpollOpt};
10
11pub struct Queue<T: Send> {
12 inner: Arc<Inner<T>>
13}
14
15struct Inner<T> {
16 queue: mpmc_queue::Queue<T>,
17 pending: AtomicUsize,
18 waker: Waker
19}
20
21impl <T: Send> Queue<T> {
22 pub fn with_capacity(capacity: usize) -> io::Result<Queue<T>> {
23 Ok(Queue {
24 inner: Arc::new(Inner {
25 queue: mpmc_queue::Queue::with_capacity(capacity),
26 pending: AtomicUsize::new(0),
27 waker: Waker::new()?
28 })
29 })
30 }
31
32 fn inc(&self) -> io::Result<()> {
33 let cnt = self.inner.pending.fetch_add(1, Acquire);
34
35 if 0 == cnt {
36 self.inner.waker.set_readiness(Ready::readable())?;
37 }
38 Ok(())
39 }
40
41 fn dec(&self) -> io::Result<()> {
42 let first = self.inner.pending.load(Acquire);
43
44 if first == 1 {
45 self.inner.waker.set_readiness(Ready::empty())?;
46 }
47
48 let second = self.inner.pending.fetch_sub(1, AcqRel);
49
50 if first == 1 && second > 1 {
51 self.inner.waker.set_readiness(Ready::readable())?;
52 }
53
54 Ok(())
55 }
56
57 pub fn push(&self, value: T) -> Result<(), T> {
58 self.inner.queue.push(value).and_then(|_| {let _ = self.inc(); Ok(())})
59 }
60
61 pub fn pop(&self) -> Option<T> {
62 self.inner.queue.pop().and_then(|res| {let _ = self.dec(); Some(res)})
63 }
64
65 pub fn pending(&self) -> usize {
66 self.inner.pending.load(Relaxed)
67 }
68}
69
70impl<T: Send> Clone for Queue<T> {
71 fn clone(&self) -> Queue<T> {
72 Queue {
73 inner: self.inner.clone()
74 }
75 }
76}
77
78impl<T: Send> AsRawFd for Queue<T> {
79 fn as_raw_fd(&self) -> RawFd {
80 self.inner.waker.as_raw_fd()
81 }
82}
83
84impl<T: Send> Source for Queue<T> {
85 fn add(&self, epoll: &Epoll, token: Token, interest: Ready, opts: EpollOpt) -> io::Result<()> {
86 self.inner.waker.add(epoll, token, interest, opts)?;
87
88 if self.inner.pending.load(Relaxed) > 0 {
89 self.inner.waker.set_readiness(Ready::readable())?;
90 }
91
92 Ok(())
93 }
94
95 fn modify(&self, epoll: &Epoll, token: Token, interest: Ready, opts: EpollOpt) -> io::Result<()> {
96 self.inner.waker.modify(epoll, token, interest, opts)
97 }
98
99 fn delete(&self, epoll: &Epoll) -> io::Result<()> {
100 self.inner.waker.delete(epoll)
101 }
102}