1use std::io::{self, Read, Result, Write};
8use std::os::fd::{AsFd, BorrowedFd};
9use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13use futures_lite::ready;
14use tokio::io::unix::AsyncFd;
15use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
16
17struct Inner(RawFd);
18
19impl Inner {
20 fn new(init: u32, is_semaphore: bool) -> Result<Self> {
21 let flags = libc::EFD_NONBLOCK | libc::EFD_CLOEXEC;
22 let flags = if is_semaphore {
23 flags | libc::EFD_SEMAPHORE
24 } else {
25 flags
26 };
27 let rv = unsafe { libc::eventfd(init, flags) };
28 if rv < 0 {
29 return Err(io::Error::last_os_error());
30 }
31 Ok(Inner(rv))
32 }
33
34 fn try_clone(&self) -> Result<Self> {
35 let rv = unsafe { libc::dup(self.0) };
36 if rv < 0 {
37 return Err(io::Error::last_os_error());
38 }
39 Ok(Inner(rv))
40 }
41}
42
43impl Drop for Inner {
44 fn drop(&mut self) {
45 unsafe { libc::close(self.0) };
46 }
47}
48
49impl AsRawFd for Inner {
50 fn as_raw_fd(&self) -> RawFd {
51 self.0
52 }
53}
54
55impl<'a> io::Read for &'a Inner {
56 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
57 let rv =
58 unsafe { libc::read(self.0, buf.as_mut_ptr() as *mut std::ffi::c_void, buf.len()) };
59 if rv < 0 {
60 return Err(io::Error::last_os_error());
61 }
62 Ok(rv as usize)
63 }
64}
65
66impl<'a> io::Write for &'a Inner {
67 fn write(&mut self, buf: &[u8]) -> Result<usize> {
68 let rv = unsafe { libc::write(self.0, buf.as_ptr() as *const std::ffi::c_void, buf.len()) };
69 if rv < 0 {
70 return Err(io::Error::last_os_error());
71 }
72 Ok(rv as usize)
73 }
74
75 fn flush(&mut self) -> Result<()> {
76 Ok(())
77 }
78}
79
80pub struct EventFd(AsyncFd<Inner>);
81
82impl EventFd {
83 pub fn new(init: u32, is_semaphore: bool) -> Result<Self> {
90 let inner = Inner::new(init, is_semaphore)?;
91 Ok(EventFd(AsyncFd::new(inner)?))
92 }
93
94 pub fn try_clone(&self) -> Result<Self> {
95 let inner = self.0.get_ref().try_clone()?;
96 Ok(EventFd(AsyncFd::new(inner)?))
97 }
98}
99
100impl AsRawFd for EventFd {
101 fn as_raw_fd(&self) -> RawFd {
102 self.0.get_ref().0
103 }
104}
105
106impl FromRawFd for EventFd {
107 unsafe fn from_raw_fd(fd: RawFd) -> Self {
108 EventFd(AsyncFd::new(Inner(fd)).unwrap())
109 }
110}
111
112impl AsFd for EventFd {
113 fn as_fd(&self) -> BorrowedFd {
114 self.0.as_fd()
115 }
116}
117
118impl AsyncRead for EventFd {
119 fn poll_read(
120 self: Pin<&mut Self>,
121 cx: &mut Context<'_>,
122 buf: &mut ReadBuf<'_>,
123 ) -> Poll<Result<()>> {
124 loop {
125 let mut guard = ready!(self.0.poll_read_ready(cx))?;
126
127 let unfilled = buf.initialize_unfilled();
128 match guard.try_io(|inner| inner.get_ref().read(unfilled)) {
129 Ok(Ok(len)) => {
130 buf.advance(len);
131 return Poll::Ready(Ok(()));
132 }
133 Ok(Err(err)) => return Poll::Ready(Err(err)),
134 Err(_would_block) => continue,
135 }
136 }
137 }
138}
139
140impl AsyncWrite for EventFd {
141 fn poll_write(
142 self: Pin<&mut Self>,
143 cx: &mut Context<'_>,
144 buf: &[u8],
145 ) -> Poll<io::Result<usize>> {
146 loop {
147 let mut guard = ready!(self.0.poll_write_ready(cx))?;
148
149 match guard.try_io(|inner| inner.get_ref().write(buf)) {
150 Ok(result) => return Poll::Ready(result),
151 Err(_would_block) => continue,
152 }
153 }
154 }
155
156 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
157 Poll::Ready(Ok(()))
158 }
159
160 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
161 Poll::Ready(Ok(()))
162 }
163}
164
165#[cfg(test)]
166mod tests {
167 use super::*;
168 use std::time::Duration;
169 use tokio::io::{AsyncReadExt, AsyncWriteExt};
170 use tokio::time::sleep;
171
172 #[tokio::test]
173 async fn not_semaphore_reads_and_resets() {
174 const VALUE: u64 = 42;
175
176 let mut writer = EventFd::new(0, false).unwrap();
177 let mut reader = writer.try_clone().unwrap();
178
179 writer.write(&VALUE.to_ne_bytes()).await.unwrap();
180 let mut buf = [0; 8];
181 reader.read(&mut buf).await.unwrap();
182 assert_eq!(buf, VALUE.to_ne_bytes());
183
184 let delay = sleep(Duration::from_secs(1));
186 let read_should_block = reader.read(&mut buf);
187 tokio::select! {
188 _ = delay => {},
189 val = read_should_block => {
190 panic!("{:?}", val)
191 },
192 }
193 }
194
195 #[tokio::test]
196 async fn semaphore_reads_ones() {
197 use tokio::io::{AsyncReadExt, AsyncWriteExt};
198
199 const VALUE: u64 = 42;
200
201 let mut writer = EventFd::new(0, true).unwrap();
202 let mut reader = writer.try_clone().unwrap();
203
204 writer.write(&VALUE.to_ne_bytes()).await.unwrap();
205 let mut buf = [0; 8];
206 for _ in 0..VALUE {
207 reader.read(&mut buf).await.unwrap();
208 assert_eq!(buf, 1u64.to_ne_bytes());
209 }
210
211 let delay = sleep(Duration::from_secs(1));
213 let read_should_block = reader.read(&mut buf);
214 tokio::select! {
215 _ = delay => {},
216 val = read_should_block => {
217 panic!("{:?}", val)
218 },
219 }
220 }
221
222 #[tokio::test]
223 async fn read_twice() {
224 let mut writer = EventFd::new(0, false).unwrap();
225 let mut reader = writer.try_clone().unwrap();
226 let (tx1, rx1) = tokio::sync::oneshot::channel();
227 let (tx2, rx2) = tokio::sync::oneshot::channel();
228
229 let server = tokio::spawn(async move {
230 let mut buf = [0; 8];
231 reader.read(&mut buf).await.unwrap();
232 tx1.send(()).unwrap();
233 reader.read(&mut buf).await.unwrap();
234 tx2.send(()).unwrap();
235 });
236
237 writer.write(&1u64.to_ne_bytes()).await.unwrap();
238 rx1.await.unwrap();
239 writer.write(&1u64.to_ne_bytes()).await.unwrap();
240 rx2.await.unwrap();
241
242 server.await.unwrap();
243 }
244}