compio_runtime/fd/async_fd/
mod.rs1use std::{io, ops::Deref};
2
3use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
4use compio_driver::{
5 AsFd, AsRawFd, BorrowedFd, BufferRef, RawFd, SharedFd, ToSharedFd,
6 op::{BufResultExt, Read, ReadManaged, ReadMulti, ResultTakeBuffer, Write},
7};
8use compio_io::{AsyncRead, AsyncReadManaged, AsyncReadMulti, AsyncWrite, util::Splittable};
9use futures_util::{Stream, future::Either};
10#[cfg(unix)]
11use {
12 compio_buf::{IoVectoredBuf, IoVectoredBufMut},
13 compio_driver::op::{ReadVectored, WriteVectored},
14};
15
16use crate::{Attacher, Runtime};
17
18#[cfg(windows)]
19mod windows;
20
21#[cfg(unix)]
22mod unix;
23
24#[derive(Debug)]
26pub struct AsyncFd<T: AsFd> {
27 inner: Attacher<T>,
28}
29
30impl<T: AsFd> AsyncFd<T> {
31 pub fn new(source: T) -> io::Result<Self> {
33 Ok(Self {
34 inner: Attacher::new(source)?,
35 })
36 }
37
38 pub unsafe fn new_unchecked(source: T) -> Self {
45 Self {
46 inner: unsafe { Attacher::new_unchecked(source) },
47 }
48 }
49}
50
51impl<T: AsFd + 'static> AsyncRead for AsyncFd<T> {
52 #[inline]
53 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
54 (&*self).read(buf).await
55 }
56
57 #[cfg(unix)]
58 #[inline]
59 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
60 (&*self).read_vectored(buf).await
61 }
62}
63
64impl<T: AsFd + 'static> AsyncRead for &AsyncFd<T> {
65 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
66 let fd = self.inner.to_shared_fd();
67 let op = Read::new(fd, buf);
68 let res = crate::submit(op).await.into_inner();
69 unsafe { res.map_advanced() }
70 }
71
72 #[cfg(unix)]
73 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
74 use compio_driver::op::VecBufResultExt;
75
76 let fd = self.inner.to_shared_fd();
77 let op = ReadVectored::new(fd, buf);
78 let res = crate::submit(op).await.into_inner();
79 unsafe { res.map_vec_advanced() }
80 }
81}
82
83impl<T: AsFd + 'static> AsyncReadManaged for AsyncFd<T> {
84 type Buffer = BufferRef;
85
86 async fn read_managed(&mut self, len: usize) -> io::Result<Option<Self::Buffer>> {
87 (&*self).read_managed(len).await
88 }
89}
90
91impl<T: AsFd + 'static> AsyncReadManaged for &AsyncFd<T> {
92 type Buffer = BufferRef;
93
94 async fn read_managed(&mut self, len: usize) -> io::Result<Option<Self::Buffer>> {
95 let runtime = Runtime::current();
96 let fd = self.to_shared_fd();
97 let op = ReadManaged::new(fd, &runtime.buffer_pool()?, len)?;
98 let res = runtime.submit(op).await;
99 unsafe { res.take_buffer() }
100 }
101}
102
103impl<T: AsFd + 'static> AsyncFd<T> {
104 #[doc(hidden)]
107 pub fn read_multi_shared(&self, len: usize) -> impl Stream<Item = io::Result<BufferRef>> {
108 let fd = self.to_shared_fd();
109 read_multi(fd, len)
110 }
111}
112
113impl<T: AsFd + 'static> AsyncReadMulti for AsyncFd<T> {
114 fn read_multi(&mut self, len: usize) -> impl Stream<Item = io::Result<Self::Buffer>> {
115 let fd = self.to_shared_fd();
116 read_multi(fd, len)
117 }
118}
119
120impl<T: AsFd + 'static> AsyncReadMulti for &AsyncFd<T> {
121 fn read_multi(&mut self, len: usize) -> impl Stream<Item = io::Result<Self::Buffer>> {
122 let fd = self.to_shared_fd();
123 read_multi(fd, len)
124 }
125}
126
127fn read_multi<T: AsFd + 'static>(
128 fd: SharedFd<T>,
129 len: usize,
130) -> impl Stream<Item = io::Result<BufferRef>> {
131 let runtime = Runtime::current();
132 let pool = runtime.buffer_pool();
133 pool.and_then(|pool| Ok((ReadMulti::new(fd, &pool, len)?, pool)))
134 .map(|(op, pool)| runtime.submit_multi(op).into_managed(pool))
135 .map(Either::Left)
136 .unwrap_or_else(|e| Either::Right(futures_util::stream::once(std::future::ready(Err(e)))))
137}
138
139impl<T: AsFd + 'static> AsyncWrite for AsyncFd<T> {
140 #[inline]
141 async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
142 (&*self).write(buf).await
143 }
144
145 #[cfg(unix)]
146 #[inline]
147 async fn write_vectored<V: IoVectoredBuf>(&mut self, buf: V) -> BufResult<usize, V> {
148 (&*self).write_vectored(buf).await
149 }
150
151 #[inline]
152 async fn flush(&mut self) -> io::Result<()> {
153 (&*self).flush().await
154 }
155
156 #[inline]
157 async fn shutdown(&mut self) -> io::Result<()> {
158 (&*self).shutdown().await
159 }
160}
161
162impl<T: AsFd + 'static> AsyncWrite for &AsyncFd<T> {
163 async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
164 let fd = self.inner.to_shared_fd();
165 let op = Write::new(fd, buf);
166 crate::submit(op).await.into_inner()
167 }
168
169 #[cfg(unix)]
170 async fn write_vectored<V: IoVectoredBuf>(&mut self, buf: V) -> BufResult<usize, V> {
171 let fd = self.inner.to_shared_fd();
172 let op = WriteVectored::new(fd, buf);
173 crate::submit(op).await.into_inner()
174 }
175
176 async fn flush(&mut self) -> io::Result<()> {
177 Ok(())
178 }
179
180 async fn shutdown(&mut self) -> io::Result<()> {
181 Ok(())
182 }
183}
184
185impl<T: AsFd> IntoInner for AsyncFd<T> {
186 type Inner = SharedFd<T>;
187
188 fn into_inner(self) -> Self::Inner {
189 self.inner.into_inner()
190 }
191}
192
193impl<T: AsFd> AsFd for AsyncFd<T> {
194 fn as_fd(&self) -> BorrowedFd<'_> {
195 self.inner.as_fd()
196 }
197}
198
199impl<T: AsFd> AsRawFd for AsyncFd<T> {
200 fn as_raw_fd(&self) -> RawFd {
201 self.inner.as_fd().as_raw_fd()
202 }
203}
204
205impl<T: AsFd> ToSharedFd<T> for AsyncFd<T> {
206 fn to_shared_fd(&self) -> SharedFd<T> {
207 self.inner.to_shared_fd()
208 }
209}
210
211impl<T: AsFd> Clone for AsyncFd<T> {
212 fn clone(&self) -> Self {
213 Self {
214 inner: self.inner.clone(),
215 }
216 }
217}
218
219impl<T: AsFd> Deref for AsyncFd<T> {
220 type Target = T;
221
222 fn deref(&self) -> &Self::Target {
223 &self.inner
224 }
225}
226
227impl<T: AsFd> Splittable for AsyncFd<T> {
228 type ReadHalf = Self;
229 type WriteHalf = Self;
230
231 fn split(self) -> (Self::ReadHalf, Self::WriteHalf) {
232 (self.clone(), self)
233 }
234}