1use std::sync::Arc;
2use std::sync::atomic::AtomicUsize;
3use std::sync::atomic::Ordering::{Relaxed, Acquire, AcqRel};
4use std::os::unix::io::{AsRawFd, RawFd};
5use std::io;
6
7use crate::plus::spsc_queue;
8use crate::waker::Waker;
9use crate::epoll::{Ready, Source, Epoll, Token, EpollOpt};
10
11pub struct Queue<T: Send> {
12 inner: Arc<Inner<T>>
13}
14
15struct Inner<T> {
16 queue: spsc_queue::Queue<T>,
17 pending: AtomicUsize,
18 waker: Waker
19}
20
21impl <T: Send> Queue<T> {
22 pub fn with_cache(bound: usize) -> io::Result<Queue<T>> {
23 Ok(Queue {
24 inner: Arc::new(Inner {
25 queue: unsafe { spsc_queue::Queue::with_additions(bound, (), ()) },
26 pending: AtomicUsize::new(0),
27 waker: Waker::new()?
28 })
29 })
30 }
31
32 fn inc(&self) -> io::Result<()> {
33 let cnt = self.inner.pending.fetch_add(1, Acquire);
34
35 if 0 == cnt {
36 self.inner.waker.set_readiness(Ready::readable())?;
37 }
38 Ok(())
39 }
40
41 fn dec(&self) -> io::Result<()> {
42 let first = self.inner.pending.load(Acquire);
43
44 if first == 1 {
45 self.inner.waker.set_readiness(Ready::empty())?;
46 }
47
48 let second = self.inner.pending.fetch_sub(1, AcqRel);
49
50 if first == 1 && second > 1 {
51 self.inner.waker.set_readiness(Ready::readable())?;
52 }
53
54 Ok(())
55 }
56
57 pub fn push(&self, value: T) {
58 self.inner.queue.push(value);
59 let _ = self.inc();
60 }
61
62 pub fn pop(&self) -> Option<T> {
63 self.inner.queue.pop().and_then(|res| {let _ = self.dec(); Some(res)})
64 }
65
66 pub fn pending(&self) -> usize {
67 self.inner.pending.load(Relaxed)
68 }
69}
70
71impl<T: Send> Clone for Queue<T> {
72 fn clone(&self) -> Queue<T> {
73 Queue {
74 inner: self.inner.clone()
75 }
76 }
77}
78
79impl<T: Send> AsRawFd for Queue<T> {
80 fn as_raw_fd(&self) -> RawFd {
81 self.inner.waker.as_raw_fd()
82 }
83}
84
85impl<T: Send> Source for Queue<T> {
86 fn add(&self, epoll: &Epoll, token: Token, interest: Ready, opts: EpollOpt) -> io::Result<()> {
87 self.inner.waker.add(epoll, token, interest, opts)?;
88
89 if self.inner.pending.load(Relaxed) > 0 {
90 self.inner.waker.set_readiness(Ready::readable())?;
91 }
92
93 Ok(())
94 }
95
96 fn modify(&self, epoll: &Epoll, token: Token, interest: Ready, opts: EpollOpt) -> io::Result<()> {
97 self.inner.waker.modify(epoll, token, interest, opts)
98 }
99
100 fn delete(&self, epoll: &Epoll) -> io::Result<()> {
101 self.inner.waker.delete(epoll)
102 }
103}