Skip to main content

compio_runtime/fd/poll_fd/
mod.rs

1cfg_select! {
2    windows => {
3        #[path = "windows.rs"]
4        mod sys;
5    }
6    unix => {
7        #[path = "unix.rs"]
8        mod sys;
9    }
10    _ => {}
11}
12
13#[cfg(windows)]
14use std::os::windows::io::{AsRawSocket, RawSocket};
15use std::{
16    future::poll_fn,
17    io,
18    ops::Deref,
19    pin::Pin,
20    task::{Context, Poll},
21};
22
23use compio_buf::IntoInner;
24use compio_driver::{AsFd, AsRawFd, BorrowedFd, RawFd, SharedFd, ToSharedFd};
25
26/// Providing functionalities to wait for readiness.
27#[derive(Debug)]
28pub struct PollFd<T: AsFd>(sys::PollFd<T>);
29
30impl<T: AsFd> PollFd<T> {
31    /// Create [`PollFd`] without attaching the source.
32    ///
33    /// Ready-based sources does not need to be attached.
34    pub fn new(source: T) -> io::Result<Self> {
35        Self::from_shared_fd(SharedFd::new(source))
36    }
37
38    /// Create [`PollFd`] from a shared file descriptor.
39    pub fn from_shared_fd(inner: SharedFd<T>) -> io::Result<Self> {
40        Ok(Self(sys::PollFd::new(inner)?))
41    }
42}
43
44impl<T: AsFd + 'static> PollFd<T> {
45    /// Wait for accept readiness, before calling `accept`, or after `accept`
46    /// returns `WouldBlock`.
47    pub async fn accept_ready(&self) -> io::Result<()> {
48        poll_fn(|cx| self.poll_accept_ready(cx)).await
49    }
50
51    /// Wait for connect readiness.
52    pub async fn connect_ready(&self) -> io::Result<()> {
53        poll_fn(|cx| self.poll_connect_ready(cx)).await
54    }
55
56    /// Wait for read readiness.
57    pub async fn read_ready(&self) -> io::Result<()> {
58        poll_fn(|cx| self.poll_read_ready(cx)).await
59    }
60
61    /// Wait for write readiness.
62    pub async fn write_ready(&self) -> io::Result<()> {
63        poll_fn(|cx| self.poll_write_ready(cx)).await
64    }
65
66    /// Poll for accept readiness.
67    pub fn poll_accept_ready(&self, cx: &mut Context) -> Poll<io::Result<()>> {
68        self.0.poll_accept_ready(cx)
69    }
70
71    /// Poll for connect readiness.
72    pub fn poll_connect_ready(&self, cx: &mut Context) -> Poll<io::Result<()>> {
73        self.0.poll_connect_ready(cx)
74    }
75
76    /// Poll for read readiness.
77    pub fn poll_read_ready(&self, cx: &mut Context) -> Poll<io::Result<()>> {
78        self.0.poll_read_ready(cx)
79    }
80
81    /// Poll for write readiness.
82    pub fn poll_write_ready(&self, cx: &mut Context) -> Poll<io::Result<()>> {
83        self.0.poll_write_ready(cx)
84    }
85
86    /// Poll for accept readiness and call the provided function.
87    pub fn poll_accept_with<R>(
88        &self,
89        cx: &mut Context,
90        mut f: impl FnMut(&T) -> io::Result<R>,
91    ) -> Poll<io::Result<R>> {
92        loop {
93            match f(&self.0) {
94                Ok(result) => break Poll::Ready(Ok(result)),
95                Err(e) if is_would_block(&e) => {
96                    std::task::ready!(self.poll_accept_ready(cx))?;
97                }
98                Err(e) => break Poll::Ready(Err(e)),
99            }
100        }
101    }
102
103    /// Poll for read readiness and call the provided function.
104    pub fn poll_read_with<R>(
105        &self,
106        cx: &mut Context,
107        mut f: impl FnMut(&T) -> io::Result<R>,
108    ) -> Poll<io::Result<R>> {
109        loop {
110            match f(&self.0) {
111                Ok(result) => break Poll::Ready(Ok(result)),
112                Err(e) if is_would_block(&e) => {
113                    std::task::ready!(self.poll_read_ready(cx))?;
114                }
115                Err(e) => break Poll::Ready(Err(e)),
116            }
117        }
118    }
119
120    /// Poll for write readiness and call the provided function.
121    pub fn poll_write_with<R>(
122        &self,
123        cx: &mut Context,
124        mut f: impl FnMut(&T) -> io::Result<R>,
125    ) -> Poll<io::Result<R>> {
126        loop {
127            match f(&self.0) {
128                Ok(result) => break Poll::Ready(Ok(result)),
129                Err(e) if is_would_block(&e) => {
130                    std::task::ready!(self.poll_write_ready(cx))?;
131                }
132                Err(e) => break Poll::Ready(Err(e)),
133            }
134        }
135    }
136}
137
138impl<T: AsFd + 'static> PollFd<T>
139where
140    for<'a> &'a T: std::io::Read,
141{
142    /// Poll for read readiness and read data.
143    pub fn poll_read(&self, cx: &mut Context, buf: &mut [u8]) -> Poll<io::Result<usize>> {
144        self.poll_read_with(cx, |fd| std::io::Read::read(&mut &*fd, buf))
145    }
146
147    /// Poll for read readiness and read data into an uninitialized buffer.
148    #[cfg(feature = "read_buf")]
149    pub fn poll_read_buf(
150        &self,
151        cx: &mut Context,
152        mut buf: std::io::BorrowedCursor<u8>,
153    ) -> Poll<io::Result<()>> {
154        self.poll_read_with(cx, |fd| std::io::Read::read_buf(&mut &*fd, buf.reborrow()))
155    }
156}
157
158impl<T: AsFd + 'static> PollFd<T>
159where
160    for<'a> &'a T: std::io::Write,
161{
162    /// Poll for write readiness and write data.
163    pub fn poll_write(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
164        self.poll_write_with(cx, |fd| std::io::Write::write(&mut &*fd, buf))
165    }
166}
167
168impl<T: AsFd> IntoInner for PollFd<T> {
169    type Inner = SharedFd<T>;
170
171    fn into_inner(self) -> Self::Inner {
172        self.0.into_inner()
173    }
174}
175
176impl<T: AsFd> ToSharedFd<T> for PollFd<T> {
177    fn to_shared_fd(&self) -> SharedFd<T> {
178        self.0.to_shared_fd()
179    }
180}
181
182impl<T: AsFd> AsFd for PollFd<T> {
183    fn as_fd(&self) -> BorrowedFd<'_> {
184        self.0.as_fd()
185    }
186}
187
188impl<T: AsFd> AsRawFd for PollFd<T> {
189    fn as_raw_fd(&self) -> RawFd {
190        self.0.as_raw_fd()
191    }
192}
193
194#[cfg(windows)]
195impl<T: AsFd + AsRawSocket> AsRawSocket for PollFd<T> {
196    fn as_raw_socket(&self) -> RawSocket {
197        self.0.as_raw_socket()
198    }
199}
200
201impl<T: AsFd> Deref for PollFd<T> {
202    type Target = T;
203
204    fn deref(&self) -> &Self::Target {
205        &self.0
206    }
207}
208
209fn is_would_block(e: &io::Error) -> bool {
210    #[cfg(unix)]
211    {
212        e.kind() == io::ErrorKind::WouldBlock || e.raw_os_error() == Some(libc::EINPROGRESS)
213    }
214    #[cfg(not(unix))]
215    {
216        e.kind() == io::ErrorKind::WouldBlock
217    }
218}
219
220impl<T: AsFd + 'static> futures_util::AsyncRead for &PollFd<T>
221where
222    for<'a> &'a T: std::io::Read,
223{
224    fn poll_read(
225        self: Pin<&mut Self>,
226        cx: &mut Context<'_>,
227        buf: &mut [u8],
228    ) -> Poll<io::Result<usize>> {
229        (*self).poll_read(cx, buf)
230    }
231}
232
233impl<T: AsFd + 'static> futures_util::AsyncRead for PollFd<T>
234where
235    for<'a> &'a T: std::io::Read,
236{
237    fn poll_read(
238        self: Pin<&mut Self>,
239        cx: &mut Context<'_>,
240        buf: &mut [u8],
241    ) -> Poll<io::Result<usize>> {
242        (*self).poll_read(cx, buf)
243    }
244}
245
246impl<T: AsFd + 'static> futures_util::AsyncWrite for &PollFd<T>
247where
248    for<'a> &'a T: std::io::Write,
249{
250    fn poll_write(
251        self: Pin<&mut Self>,
252        cx: &mut Context<'_>,
253        buf: &[u8],
254    ) -> Poll<io::Result<usize>> {
255        (*self).poll_write(cx, buf)
256    }
257
258    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
259        Poll::Ready(Ok(()))
260    }
261
262    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
263        Poll::Ready(Ok(()))
264    }
265}
266
267impl<T: AsFd + 'static> futures_util::AsyncWrite for PollFd<T>
268where
269    for<'a> &'a T: std::io::Write,
270{
271    fn poll_write(
272        self: Pin<&mut Self>,
273        cx: &mut Context<'_>,
274        buf: &[u8],
275    ) -> Poll<io::Result<usize>> {
276        (*self).poll_write(cx, buf)
277    }
278
279    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
280        Poll::Ready(Ok(()))
281    }
282
283    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
284        Poll::Ready(Ok(()))
285    }
286}