compio_fs/
async_fd.rs

1#[cfg(unix)]
2use std::os::fd::FromRawFd;
3#[cfg(windows)]
4use std::os::windows::io::{
5    AsRawHandle, AsRawSocket, FromRawHandle, FromRawSocket, RawHandle, RawSocket,
6};
7use std::{io, ops::Deref};
8
9use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
10use compio_driver::{
11    AsFd, AsRawFd, BorrowedFd, RawFd, SharedFd, ToSharedFd,
12    op::{BufResultExt, Recv, RecvManaged, ResultTakeBuffer, Send},
13};
14use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite};
15use compio_runtime::{Attacher, BorrowedBuffer, BufferPool};
16#[cfg(unix)]
17use {
18    compio_buf::{IoVectoredBuf, IoVectoredBufMut},
19    compio_driver::op::{RecvVectored, SendVectored},
20};
21
22/// A wrapper for IO source, providing implementations for [`AsyncRead`] and
23/// [`AsyncWrite`].
24#[derive(Debug)]
25pub struct AsyncFd<T: AsFd> {
26    inner: Attacher<T>,
27}
28
29impl<T: AsFd> AsyncFd<T> {
30    /// Create [`AsyncFd`] and attach the source to the current runtime.
31    pub fn new(source: T) -> io::Result<Self> {
32        Ok(Self {
33            inner: Attacher::new(source)?,
34        })
35    }
36
37    /// Create [`AsyncFd`] without attaching the source.
38    ///
39    /// # Safety
40    ///
41    /// The user should handle the attachment correctly.
42    pub unsafe fn new_unchecked(source: T) -> Self {
43        Self {
44            inner: Attacher::new_unchecked(source),
45        }
46    }
47}
48
49impl<T: AsFd + 'static> AsyncRead for AsyncFd<T> {
50    #[inline]
51    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
52        (&*self).read(buf).await
53    }
54
55    #[cfg(unix)]
56    #[inline]
57    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
58        (&*self).read_vectored(buf).await
59    }
60}
61
62impl<T: AsFd + 'static> AsyncReadManaged for AsyncFd<T> {
63    type Buffer<'a> = BorrowedBuffer<'a>;
64    type BufferPool = BufferPool;
65
66    async fn read_managed<'a>(
67        &mut self,
68        buffer_pool: &'a Self::BufferPool,
69        len: usize,
70    ) -> io::Result<Self::Buffer<'a>> {
71        (&*self).read_managed(buffer_pool, len).await
72    }
73}
74
75impl<T: AsFd + 'static> AsyncReadManaged for &AsyncFd<T> {
76    type Buffer<'a> = BorrowedBuffer<'a>;
77    type BufferPool = BufferPool;
78
79    async fn read_managed<'a>(
80        &mut self,
81        buffer_pool: &'a Self::BufferPool,
82        len: usize,
83    ) -> io::Result<Self::Buffer<'a>> {
84        let fd = self.to_shared_fd();
85        let buffer_pool = buffer_pool.try_inner()?;
86        let op = RecvManaged::new(fd, buffer_pool, len)?;
87        compio_runtime::submit_with_flags(op)
88            .await
89            .take_buffer(buffer_pool)
90    }
91}
92
93impl<T: AsFd + 'static> AsyncRead for &AsyncFd<T> {
94    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
95        let fd = self.inner.to_shared_fd();
96        let op = Recv::new(fd, buf);
97        compio_runtime::submit(op).await.into_inner().map_advanced()
98    }
99
100    #[cfg(unix)]
101    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
102        let fd = self.inner.to_shared_fd();
103        let op = RecvVectored::new(fd, buf);
104        compio_runtime::submit(op).await.into_inner().map_advanced()
105    }
106}
107
108impl<T: AsFd + 'static> AsyncWrite for AsyncFd<T> {
109    #[inline]
110    async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
111        (&*self).write(buf).await
112    }
113
114    #[cfg(unix)]
115    #[inline]
116    async fn write_vectored<V: IoVectoredBuf>(&mut self, buf: V) -> BufResult<usize, V> {
117        (&*self).write_vectored(buf).await
118    }
119
120    #[inline]
121    async fn flush(&mut self) -> io::Result<()> {
122        (&*self).flush().await
123    }
124
125    #[inline]
126    async fn shutdown(&mut self) -> io::Result<()> {
127        (&*self).shutdown().await
128    }
129}
130
131impl<T: AsFd + 'static> AsyncWrite for &AsyncFd<T> {
132    async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
133        let fd = self.inner.to_shared_fd();
134        let op = Send::new(fd, buf);
135        compio_runtime::submit(op).await.into_inner()
136    }
137
138    #[cfg(unix)]
139    async fn write_vectored<V: IoVectoredBuf>(&mut self, buf: V) -> BufResult<usize, V> {
140        let fd = self.inner.to_shared_fd();
141        let op = SendVectored::new(fd, buf);
142        compio_runtime::submit(op).await.into_inner()
143    }
144
145    async fn flush(&mut self) -> io::Result<()> {
146        Ok(())
147    }
148
149    async fn shutdown(&mut self) -> io::Result<()> {
150        Ok(())
151    }
152}
153
154impl<T: AsFd> IntoInner for AsyncFd<T> {
155    type Inner = SharedFd<T>;
156
157    fn into_inner(self) -> Self::Inner {
158        self.inner.into_inner()
159    }
160}
161
162impl<T: AsFd> AsFd for AsyncFd<T> {
163    fn as_fd(&self) -> BorrowedFd<'_> {
164        self.inner.as_fd()
165    }
166}
167
168impl<T: AsFd> AsRawFd for AsyncFd<T> {
169    fn as_raw_fd(&self) -> RawFd {
170        self.inner.as_fd().as_raw_fd()
171    }
172}
173
174#[cfg(windows)]
175impl<T: AsFd + AsRawHandle> AsRawHandle for AsyncFd<T> {
176    fn as_raw_handle(&self) -> RawHandle {
177        self.inner.as_raw_handle()
178    }
179}
180
181#[cfg(windows)]
182impl<T: AsFd + AsRawSocket> AsRawSocket for AsyncFd<T> {
183    fn as_raw_socket(&self) -> RawSocket {
184        self.inner.as_raw_socket()
185    }
186}
187
188impl<T: AsFd> ToSharedFd<T> for AsyncFd<T> {
189    fn to_shared_fd(&self) -> SharedFd<T> {
190        self.inner.to_shared_fd()
191    }
192}
193
194impl<T: AsFd> Clone for AsyncFd<T> {
195    fn clone(&self) -> Self {
196        Self {
197            inner: self.inner.clone(),
198        }
199    }
200}
201
202#[cfg(unix)]
203impl<T: AsFd + FromRawFd> FromRawFd for AsyncFd<T> {
204    unsafe fn from_raw_fd(fd: RawFd) -> Self {
205        Self::new_unchecked(FromRawFd::from_raw_fd(fd))
206    }
207}
208
209#[cfg(windows)]
210impl<T: AsFd + FromRawHandle> FromRawHandle for AsyncFd<T> {
211    unsafe fn from_raw_handle(handle: RawHandle) -> Self {
212        Self::new_unchecked(FromRawHandle::from_raw_handle(handle))
213    }
214}
215
216#[cfg(windows)]
217impl<T: AsFd + FromRawSocket> FromRawSocket for AsyncFd<T> {
218    unsafe fn from_raw_socket(sock: RawSocket) -> Self {
219        Self::new_unchecked(FromRawSocket::from_raw_socket(sock))
220    }
221}
222
223impl<T: AsFd> Deref for AsyncFd<T> {
224    type Target = T;
225
226    fn deref(&self) -> &Self::Target {
227        &self.inner
228    }
229}