queen_io/queue/
spsc.rs

1use std::sync::Arc;
2use std::sync::atomic::AtomicUsize;
3use std::sync::atomic::Ordering::{Relaxed, Acquire, AcqRel};
4use std::os::unix::io::{AsRawFd, RawFd};
5use std::io;
6
7use crate::plus::spsc_queue;
8use crate::waker::Waker;
9use crate::epoll::{Ready, Source, Epoll, Token, EpollOpt};
10
11pub struct Queue<T: Send> {
12    inner: Arc<Inner<T>>
13}
14
15struct Inner<T> {
16    queue: spsc_queue::Queue<T>,
17    pending: AtomicUsize,
18    waker: Waker
19}
20
21impl <T: Send> Queue<T> {
22    pub fn with_cache(bound: usize) -> io::Result<Queue<T>> {
23        Ok(Queue {
24            inner: Arc::new(Inner {
25                queue: unsafe { spsc_queue::Queue::with_additions(bound, (), ()) },
26                pending: AtomicUsize::new(0),
27                waker: Waker::new()?
28            })
29        })
30    }
31
32    fn inc(&self) -> io::Result<()> {
33        let cnt = self.inner.pending.fetch_add(1, Acquire);
34
35        if 0 == cnt {
36            self.inner.waker.set_readiness(Ready::readable())?;
37        }
38        Ok(())
39    }
40
41    fn dec(&self) -> io::Result<()> {
42        let first = self.inner.pending.load(Acquire);
43
44        if first == 1 {
45            self.inner.waker.set_readiness(Ready::empty())?;
46        }
47
48        let second = self.inner.pending.fetch_sub(1, AcqRel);
49
50        if first == 1 && second > 1 {
51            self.inner.waker.set_readiness(Ready::readable())?;
52        }
53
54        Ok(())
55    }
56
57    pub fn push(&self, value: T) {
58        self.inner.queue.push(value);
59        let _ = self.inc();
60    }
61
62    pub fn pop(&self) -> Option<T> {
63        self.inner.queue.pop().and_then(|res| {let _ = self.dec(); Some(res)})
64    }
65
66    pub fn pending(&self) -> usize {
67        self.inner.pending.load(Relaxed)
68    }
69}
70
71impl<T: Send> Clone for Queue<T> {
72    fn clone(&self) -> Queue<T> {
73        Queue {
74            inner: self.inner.clone()
75        }
76    }
77}
78
79impl<T: Send> AsRawFd for Queue<T> {
80    fn as_raw_fd(&self) -> RawFd {
81        self.inner.waker.as_raw_fd()
82    }
83}
84
85impl<T: Send> Source for Queue<T> {
86    fn add(&self, epoll: &Epoll, token: Token, interest: Ready, opts: EpollOpt) -> io::Result<()> {
87        self.inner.waker.add(epoll, token, interest, opts)?;
88
89        if self.inner.pending.load(Relaxed) > 0 {
90            self.inner.waker.set_readiness(Ready::readable())?;
91        }
92
93        Ok(())
94    }
95
96    fn modify(&self, epoll: &Epoll, token: Token, interest: Ready, opts: EpollOpt) -> io::Result<()> {
97        self.inner.waker.modify(epoll, token, interest, opts)
98    }
99
100    fn delete(&self, epoll: &Epoll) -> io::Result<()> {
101        self.inner.waker.delete(epoll)
102    }
103}