tokio_eventfd/
lib.rs

1//! This crate provides eventfd file-like objects support for tokio.
2//! eventfd object can be used as an event
3//! wait/notify mechanism by user-space applications, and by
4//! the kernel to notify user-space applications of events.
5//! The object contains an unsigned 64-bit integer counter
6//! that is maintained by the kernel.
7use std::io::{self, Read, Result, Write};
8use std::os::fd::{AsFd, BorrowedFd};
9use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13use futures_lite::ready;
14use tokio::io::unix::AsyncFd;
15use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
16
17struct Inner(RawFd);
18
19impl Inner {
20    fn new(init: u32, is_semaphore: bool) -> Result<Self> {
21        let flags = libc::EFD_NONBLOCK | libc::EFD_CLOEXEC;
22        let flags = if is_semaphore {
23            flags | libc::EFD_SEMAPHORE
24        } else {
25            flags
26        };
27        let rv = unsafe { libc::eventfd(init, flags) };
28        if rv < 0 {
29            return Err(io::Error::last_os_error());
30        }
31        Ok(Inner(rv))
32    }
33
34    fn try_clone(&self) -> Result<Self> {
35        let rv = unsafe { libc::dup(self.0) };
36        if rv < 0 {
37            return Err(io::Error::last_os_error());
38        }
39        Ok(Inner(rv))
40    }
41}
42
43impl Drop for Inner {
44    fn drop(&mut self) {
45        unsafe { libc::close(self.0) };
46    }
47}
48
49impl AsRawFd for Inner {
50    fn as_raw_fd(&self) -> RawFd {
51        self.0
52    }
53}
54
55impl<'a> io::Read for &'a Inner {
56    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
57        let rv =
58            unsafe { libc::read(self.0, buf.as_mut_ptr() as *mut std::ffi::c_void, buf.len()) };
59        if rv < 0 {
60            return Err(io::Error::last_os_error());
61        }
62        Ok(rv as usize)
63    }
64}
65
66impl<'a> io::Write for &'a Inner {
67    fn write(&mut self, buf: &[u8]) -> Result<usize> {
68        let rv = unsafe { libc::write(self.0, buf.as_ptr() as *const std::ffi::c_void, buf.len()) };
69        if rv < 0 {
70            return Err(io::Error::last_os_error());
71        }
72        Ok(rv as usize)
73    }
74
75    fn flush(&mut self) -> Result<()> {
76        Ok(())
77    }
78}
79
80pub struct EventFd(AsyncFd<Inner>);
81
82impl EventFd {
83    /// Create new Eventfd. `init` is the initial value of the counter
84    /// `is_semaphore` determines eventfd behaviour:
85    ///   - if true and counter has non-zero value read returns 8 bytes containing the value 1,
86    ///   and the counter's value is decremented by 1
87    ///   - if false and counter has non-zero value read returns the value and the counter's value
88    ///   is reset to 0.
89    pub fn new(init: u32, is_semaphore: bool) -> Result<Self> {
90        let inner = Inner::new(init, is_semaphore)?;
91        Ok(EventFd(AsyncFd::new(inner)?))
92    }
93
94    pub fn try_clone(&self) -> Result<Self> {
95        let inner = self.0.get_ref().try_clone()?;
96        Ok(EventFd(AsyncFd::new(inner)?))
97    }
98}
99
100impl AsRawFd for EventFd {
101    fn as_raw_fd(&self) -> RawFd {
102        self.0.get_ref().0
103    }
104}
105
106impl FromRawFd for EventFd {
107    unsafe fn from_raw_fd(fd: RawFd) -> Self {
108        EventFd(AsyncFd::new(Inner(fd)).unwrap())
109    }
110}
111
112impl AsFd for EventFd {
113    fn as_fd(&self) -> BorrowedFd {
114        self.0.as_fd()
115    }
116}
117
118impl AsyncRead for EventFd {
119    fn poll_read(
120        self: Pin<&mut Self>,
121        cx: &mut Context<'_>,
122        buf: &mut ReadBuf<'_>,
123    ) -> Poll<Result<()>> {
124        loop {
125            let mut guard = ready!(self.0.poll_read_ready(cx))?;
126
127            let unfilled = buf.initialize_unfilled();
128            match guard.try_io(|inner| inner.get_ref().read(unfilled)) {
129                Ok(Ok(len)) => {
130                    buf.advance(len);
131                    return Poll::Ready(Ok(()));
132                }
133                Ok(Err(err)) => return Poll::Ready(Err(err)),
134                Err(_would_block) => continue,
135            }
136        }
137    }
138}
139
140impl AsyncWrite for EventFd {
141    fn poll_write(
142        self: Pin<&mut Self>,
143        cx: &mut Context<'_>,
144        buf: &[u8],
145    ) -> Poll<io::Result<usize>> {
146        loop {
147            let mut guard = ready!(self.0.poll_write_ready(cx))?;
148
149            match guard.try_io(|inner| inner.get_ref().write(buf)) {
150                Ok(result) => return Poll::Ready(result),
151                Err(_would_block) => continue,
152            }
153        }
154    }
155
156    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
157        Poll::Ready(Ok(()))
158    }
159
160    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
161        Poll::Ready(Ok(()))
162    }
163}
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168    use std::time::Duration;
169    use tokio::io::{AsyncReadExt, AsyncWriteExt};
170    use tokio::time::sleep;
171
172    #[tokio::test]
173    async fn not_semaphore_reads_and_resets() {
174        const VALUE: u64 = 42;
175
176        let mut writer = EventFd::new(0, false).unwrap();
177        let mut reader = writer.try_clone().unwrap();
178
179        writer.write(&VALUE.to_ne_bytes()).await.unwrap();
180        let mut buf = [0; 8];
181        reader.read(&mut buf).await.unwrap();
182        assert_eq!(buf, VALUE.to_ne_bytes());
183
184        // check it blocks on zero
185        let delay = sleep(Duration::from_secs(1));
186        let read_should_block = reader.read(&mut buf);
187        tokio::select! {
188            _ = delay => {},
189            val = read_should_block => {
190                panic!("{:?}", val)
191            },
192        }
193    }
194
195    #[tokio::test]
196    async fn semaphore_reads_ones() {
197        use tokio::io::{AsyncReadExt, AsyncWriteExt};
198
199        const VALUE: u64 = 42;
200
201        let mut writer = EventFd::new(0, true).unwrap();
202        let mut reader = writer.try_clone().unwrap();
203
204        writer.write(&VALUE.to_ne_bytes()).await.unwrap();
205        let mut buf = [0; 8];
206        for _ in 0..VALUE {
207            reader.read(&mut buf).await.unwrap();
208            assert_eq!(buf, 1u64.to_ne_bytes());
209        }
210
211        // check it blocks on zero
212        let delay = sleep(Duration::from_secs(1));
213        let read_should_block = reader.read(&mut buf);
214        tokio::select! {
215            _ = delay => {},
216            val = read_should_block => {
217                panic!("{:?}", val)
218            },
219        }
220    }
221
222    #[tokio::test]
223    async fn read_twice() {
224        let mut writer = EventFd::new(0, false).unwrap();
225        let mut reader = writer.try_clone().unwrap();
226        let (tx1, rx1) = tokio::sync::oneshot::channel();
227        let (tx2, rx2) = tokio::sync::oneshot::channel();
228
229        let server = tokio::spawn(async move {
230            let mut buf = [0; 8];
231            reader.read(&mut buf).await.unwrap();
232            tx1.send(()).unwrap();
233            reader.read(&mut buf).await.unwrap();
234            tx2.send(()).unwrap();
235        });
236
237        writer.write(&1u64.to_ne_bytes()).await.unwrap();
238        rx1.await.unwrap();
239        writer.write(&1u64.to_ne_bytes()).await.unwrap();
240        rx2.await.unwrap();
241
242        server.await.unwrap();
243    }
244}