compio_fs/
async_fd.rs

1#[cfg(unix)]
2use std::os::fd::FromRawFd;
3#[cfg(windows)]
4use std::os::windows::io::{
5    AsRawHandle, AsRawSocket, FromRawHandle, FromRawSocket, RawHandle, RawSocket,
6};
7use std::{io, ops::Deref};
8
9use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
10use compio_driver::{
11    AsFd, AsRawFd, BorrowedFd, RawFd, SharedFd, ToSharedFd,
12    op::{BufResultExt, Recv, RecvManaged, ResultTakeBuffer, Send},
13};
14use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite};
15use compio_runtime::{Attacher, BorrowedBuffer, BufferPool};
16#[cfg(unix)]
17use {
18    compio_buf::{IoVectoredBuf, IoVectoredBufMut},
19    compio_driver::op::{RecvVectored, SendVectored},
20};
21
22/// A wrapper for IO source, providing implementations for [`AsyncRead`] and
23/// [`AsyncWrite`].
24#[derive(Debug)]
25pub struct AsyncFd<T: AsFd> {
26    inner: Attacher<T>,
27}
28
29impl<T: AsFd> AsyncFd<T> {
30    /// Create [`AsyncFd`] and attach the source to the current runtime.
31    pub fn new(source: T) -> io::Result<Self> {
32        Ok(Self {
33            inner: Attacher::new(source)?,
34        })
35    }
36
37    /// Create [`AsyncFd`] without attaching the source.
38    ///
39    /// # Safety
40    ///
41    /// * The user should handle the attachment correctly.
42    /// * `T` should be an owned fd.
43    pub unsafe fn new_unchecked(source: T) -> Self {
44        Self {
45            inner: unsafe { Attacher::new_unchecked(source) },
46        }
47    }
48}
49
50impl<T: AsFd + 'static> AsyncRead for AsyncFd<T> {
51    #[inline]
52    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
53        (&*self).read(buf).await
54    }
55
56    #[cfg(unix)]
57    #[inline]
58    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
59        (&*self).read_vectored(buf).await
60    }
61}
62
63impl<T: AsFd + 'static> AsyncReadManaged for AsyncFd<T> {
64    type Buffer<'a> = BorrowedBuffer<'a>;
65    type BufferPool = BufferPool;
66
67    async fn read_managed<'a>(
68        &mut self,
69        buffer_pool: &'a Self::BufferPool,
70        len: usize,
71    ) -> io::Result<Self::Buffer<'a>> {
72        (&*self).read_managed(buffer_pool, len).await
73    }
74}
75
76impl<T: AsFd + 'static> AsyncReadManaged for &AsyncFd<T> {
77    type Buffer<'a> = BorrowedBuffer<'a>;
78    type BufferPool = BufferPool;
79
80    async fn read_managed<'a>(
81        &mut self,
82        buffer_pool: &'a Self::BufferPool,
83        len: usize,
84    ) -> io::Result<Self::Buffer<'a>> {
85        let fd = self.to_shared_fd();
86        let buffer_pool = buffer_pool.try_inner()?;
87        let op = RecvManaged::new(fd, buffer_pool, len)?;
88        compio_runtime::submit_with_flags(op)
89            .await
90            .take_buffer(buffer_pool)
91    }
92}
93
94impl<T: AsFd + 'static> AsyncRead for &AsyncFd<T> {
95    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
96        let fd = self.inner.to_shared_fd();
97        let op = Recv::new(fd, buf);
98        compio_runtime::submit(op).await.into_inner().map_advanced()
99    }
100
101    #[cfg(unix)]
102    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
103        let fd = self.inner.to_shared_fd();
104        let op = RecvVectored::new(fd, buf);
105        compio_runtime::submit(op).await.into_inner().map_advanced()
106    }
107}
108
109impl<T: AsFd + 'static> AsyncWrite for AsyncFd<T> {
110    #[inline]
111    async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
112        (&*self).write(buf).await
113    }
114
115    #[cfg(unix)]
116    #[inline]
117    async fn write_vectored<V: IoVectoredBuf>(&mut self, buf: V) -> BufResult<usize, V> {
118        (&*self).write_vectored(buf).await
119    }
120
121    #[inline]
122    async fn flush(&mut self) -> io::Result<()> {
123        (&*self).flush().await
124    }
125
126    #[inline]
127    async fn shutdown(&mut self) -> io::Result<()> {
128        (&*self).shutdown().await
129    }
130}
131
132impl<T: AsFd + 'static> AsyncWrite for &AsyncFd<T> {
133    async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
134        let fd = self.inner.to_shared_fd();
135        let op = Send::new(fd, buf);
136        compio_runtime::submit(op).await.into_inner()
137    }
138
139    #[cfg(unix)]
140    async fn write_vectored<V: IoVectoredBuf>(&mut self, buf: V) -> BufResult<usize, V> {
141        let fd = self.inner.to_shared_fd();
142        let op = SendVectored::new(fd, buf);
143        compio_runtime::submit(op).await.into_inner()
144    }
145
146    async fn flush(&mut self) -> io::Result<()> {
147        Ok(())
148    }
149
150    async fn shutdown(&mut self) -> io::Result<()> {
151        Ok(())
152    }
153}
154
155impl<T: AsFd> IntoInner for AsyncFd<T> {
156    type Inner = SharedFd<T>;
157
158    fn into_inner(self) -> Self::Inner {
159        self.inner.into_inner()
160    }
161}
162
163impl<T: AsFd> AsFd for AsyncFd<T> {
164    fn as_fd(&self) -> BorrowedFd<'_> {
165        self.inner.as_fd()
166    }
167}
168
169impl<T: AsFd> AsRawFd for AsyncFd<T> {
170    fn as_raw_fd(&self) -> RawFd {
171        self.inner.as_fd().as_raw_fd()
172    }
173}
174
175#[cfg(windows)]
176impl<T: AsFd + AsRawHandle> AsRawHandle for AsyncFd<T> {
177    fn as_raw_handle(&self) -> RawHandle {
178        self.inner.as_raw_handle()
179    }
180}
181
182#[cfg(windows)]
183impl<T: AsFd + AsRawSocket> AsRawSocket for AsyncFd<T> {
184    fn as_raw_socket(&self) -> RawSocket {
185        self.inner.as_raw_socket()
186    }
187}
188
189impl<T: AsFd> ToSharedFd<T> for AsyncFd<T> {
190    fn to_shared_fd(&self) -> SharedFd<T> {
191        self.inner.to_shared_fd()
192    }
193}
194
195impl<T: AsFd> Clone for AsyncFd<T> {
196    fn clone(&self) -> Self {
197        Self {
198            inner: self.inner.clone(),
199        }
200    }
201}
202
203#[cfg(unix)]
204impl<T: AsFd + FromRawFd> FromRawFd for AsyncFd<T> {
205    unsafe fn from_raw_fd(fd: RawFd) -> Self {
206        unsafe { Self::new_unchecked(FromRawFd::from_raw_fd(fd)) }
207    }
208}
209
210#[cfg(windows)]
211impl<T: AsFd + FromRawHandle> FromRawHandle for AsyncFd<T> {
212    unsafe fn from_raw_handle(handle: RawHandle) -> Self {
213        unsafe { Self::new_unchecked(FromRawHandle::from_raw_handle(handle)) }
214    }
215}
216
217#[cfg(windows)]
218impl<T: AsFd + FromRawSocket> FromRawSocket for AsyncFd<T> {
219    unsafe fn from_raw_socket(sock: RawSocket) -> Self {
220        unsafe { Self::new_unchecked(FromRawSocket::from_raw_socket(sock)) }
221    }
222}
223
224impl<T: AsFd> Deref for AsyncFd<T> {
225    type Target = T;
226
227    fn deref(&self) -> &Self::Target {
228        &self.inner
229    }
230}