compio_runtime/fd/poll_fd/
mod.rs1cfg_select! {
2 windows => {
3 #[path = "windows.rs"]
4 mod sys;
5 }
6 unix => {
7 #[path = "unix.rs"]
8 mod sys;
9 }
10 _ => {}
11}
12
13#[cfg(windows)]
14use std::os::windows::io::{AsRawSocket, RawSocket};
15use std::{
16 future::poll_fn,
17 io,
18 ops::Deref,
19 pin::Pin,
20 task::{Context, Poll},
21};
22
23use compio_buf::IntoInner;
24use compio_driver::{AsFd, AsRawFd, BorrowedFd, RawFd, SharedFd, ToSharedFd};
25
26#[derive(Debug)]
28pub struct PollFd<T: AsFd>(sys::PollFd<T>);
29
30impl<T: AsFd> PollFd<T> {
31 pub fn new(source: T) -> io::Result<Self> {
35 Self::from_shared_fd(SharedFd::new(source))
36 }
37
38 pub fn from_shared_fd(inner: SharedFd<T>) -> io::Result<Self> {
40 Ok(Self(sys::PollFd::new(inner)?))
41 }
42}
43
44impl<T: AsFd + 'static> PollFd<T> {
45 pub async fn accept_ready(&self) -> io::Result<()> {
48 poll_fn(|cx| self.poll_accept_ready(cx)).await
49 }
50
51 pub async fn connect_ready(&self) -> io::Result<()> {
53 poll_fn(|cx| self.poll_connect_ready(cx)).await
54 }
55
56 pub async fn read_ready(&self) -> io::Result<()> {
58 poll_fn(|cx| self.poll_read_ready(cx)).await
59 }
60
61 pub async fn write_ready(&self) -> io::Result<()> {
63 poll_fn(|cx| self.poll_write_ready(cx)).await
64 }
65
66 pub fn poll_accept_ready(&self, cx: &mut Context) -> Poll<io::Result<()>> {
68 self.0.poll_accept_ready(cx)
69 }
70
71 pub fn poll_connect_ready(&self, cx: &mut Context) -> Poll<io::Result<()>> {
73 self.0.poll_connect_ready(cx)
74 }
75
76 pub fn poll_read_ready(&self, cx: &mut Context) -> Poll<io::Result<()>> {
78 self.0.poll_read_ready(cx)
79 }
80
81 pub fn poll_write_ready(&self, cx: &mut Context) -> Poll<io::Result<()>> {
83 self.0.poll_write_ready(cx)
84 }
85
86 pub fn poll_accept_with<R>(
88 &self,
89 cx: &mut Context,
90 mut f: impl FnMut(&T) -> io::Result<R>,
91 ) -> Poll<io::Result<R>> {
92 loop {
93 match f(&self.0) {
94 Ok(result) => break Poll::Ready(Ok(result)),
95 Err(e) if is_would_block(&e) => {
96 std::task::ready!(self.poll_accept_ready(cx))?;
97 }
98 Err(e) => break Poll::Ready(Err(e)),
99 }
100 }
101 }
102
103 pub fn poll_read_with<R>(
105 &self,
106 cx: &mut Context,
107 mut f: impl FnMut(&T) -> io::Result<R>,
108 ) -> Poll<io::Result<R>> {
109 loop {
110 match f(&self.0) {
111 Ok(result) => break Poll::Ready(Ok(result)),
112 Err(e) if is_would_block(&e) => {
113 std::task::ready!(self.poll_read_ready(cx))?;
114 }
115 Err(e) => break Poll::Ready(Err(e)),
116 }
117 }
118 }
119
120 pub fn poll_write_with<R>(
122 &self,
123 cx: &mut Context,
124 mut f: impl FnMut(&T) -> io::Result<R>,
125 ) -> Poll<io::Result<R>> {
126 loop {
127 match f(&self.0) {
128 Ok(result) => break Poll::Ready(Ok(result)),
129 Err(e) if is_would_block(&e) => {
130 std::task::ready!(self.poll_write_ready(cx))?;
131 }
132 Err(e) => break Poll::Ready(Err(e)),
133 }
134 }
135 }
136}
137
138impl<T: AsFd + 'static> PollFd<T>
139where
140 for<'a> &'a T: std::io::Read,
141{
142 pub fn poll_read(&self, cx: &mut Context, buf: &mut [u8]) -> Poll<io::Result<usize>> {
144 self.poll_read_with(cx, |fd| std::io::Read::read(&mut &*fd, buf))
145 }
146
147 #[cfg(feature = "read_buf")]
149 pub fn poll_read_buf(
150 &self,
151 cx: &mut Context,
152 mut buf: std::io::BorrowedCursor<u8>,
153 ) -> Poll<io::Result<()>> {
154 self.poll_read_with(cx, |fd| std::io::Read::read_buf(&mut &*fd, buf.reborrow()))
155 }
156}
157
158impl<T: AsFd + 'static> PollFd<T>
159where
160 for<'a> &'a T: std::io::Write,
161{
162 pub fn poll_write(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
164 self.poll_write_with(cx, |fd| std::io::Write::write(&mut &*fd, buf))
165 }
166}
167
168impl<T: AsFd> IntoInner for PollFd<T> {
169 type Inner = SharedFd<T>;
170
171 fn into_inner(self) -> Self::Inner {
172 self.0.into_inner()
173 }
174}
175
176impl<T: AsFd> ToSharedFd<T> for PollFd<T> {
177 fn to_shared_fd(&self) -> SharedFd<T> {
178 self.0.to_shared_fd()
179 }
180}
181
182impl<T: AsFd> AsFd for PollFd<T> {
183 fn as_fd(&self) -> BorrowedFd<'_> {
184 self.0.as_fd()
185 }
186}
187
188impl<T: AsFd> AsRawFd for PollFd<T> {
189 fn as_raw_fd(&self) -> RawFd {
190 self.0.as_raw_fd()
191 }
192}
193
194#[cfg(windows)]
195impl<T: AsFd + AsRawSocket> AsRawSocket for PollFd<T> {
196 fn as_raw_socket(&self) -> RawSocket {
197 self.0.as_raw_socket()
198 }
199}
200
201impl<T: AsFd> Deref for PollFd<T> {
202 type Target = T;
203
204 fn deref(&self) -> &Self::Target {
205 &self.0
206 }
207}
208
209fn is_would_block(e: &io::Error) -> bool {
210 #[cfg(unix)]
211 {
212 e.kind() == io::ErrorKind::WouldBlock || e.raw_os_error() == Some(libc::EINPROGRESS)
213 }
214 #[cfg(not(unix))]
215 {
216 e.kind() == io::ErrorKind::WouldBlock
217 }
218}
219
220impl<T: AsFd + 'static> futures_util::AsyncRead for &PollFd<T>
221where
222 for<'a> &'a T: std::io::Read,
223{
224 fn poll_read(
225 self: Pin<&mut Self>,
226 cx: &mut Context<'_>,
227 buf: &mut [u8],
228 ) -> Poll<io::Result<usize>> {
229 (*self).poll_read(cx, buf)
230 }
231}
232
233impl<T: AsFd + 'static> futures_util::AsyncRead for PollFd<T>
234where
235 for<'a> &'a T: std::io::Read,
236{
237 fn poll_read(
238 self: Pin<&mut Self>,
239 cx: &mut Context<'_>,
240 buf: &mut [u8],
241 ) -> Poll<io::Result<usize>> {
242 (*self).poll_read(cx, buf)
243 }
244}
245
246impl<T: AsFd + 'static> futures_util::AsyncWrite for &PollFd<T>
247where
248 for<'a> &'a T: std::io::Write,
249{
250 fn poll_write(
251 self: Pin<&mut Self>,
252 cx: &mut Context<'_>,
253 buf: &[u8],
254 ) -> Poll<io::Result<usize>> {
255 (*self).poll_write(cx, buf)
256 }
257
258 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
259 Poll::Ready(Ok(()))
260 }
261
262 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
263 Poll::Ready(Ok(()))
264 }
265}
266
267impl<T: AsFd + 'static> futures_util::AsyncWrite for PollFd<T>
268where
269 for<'a> &'a T: std::io::Write,
270{
271 fn poll_write(
272 self: Pin<&mut Self>,
273 cx: &mut Context<'_>,
274 buf: &[u8],
275 ) -> Poll<io::Result<usize>> {
276 (*self).poll_write(cx, buf)
277 }
278
279 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
280 Poll::Ready(Ok(()))
281 }
282
283 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
284 Poll::Ready(Ok(()))
285 }
286}