Skip to main content

compio_runtime/fd/async_fd/
mod.rs

1use std::{io, ops::Deref};
2
3use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
4use compio_driver::{
5    AsFd, AsRawFd, BorrowedFd, BufferRef, RawFd, SharedFd, ToSharedFd,
6    op::{BufResultExt, Read, ReadManaged, ReadMulti, ResultTakeBuffer, Write},
7};
8use compio_io::{AsyncRead, AsyncReadManaged, AsyncReadMulti, AsyncWrite, util::Splittable};
9use futures_util::{Stream, future::Either};
10#[cfg(unix)]
11use {
12    compio_buf::{IoVectoredBuf, IoVectoredBufMut},
13    compio_driver::op::{ReadVectored, WriteVectored},
14};
15
16use crate::{Attacher, Runtime};
17
18#[cfg(windows)]
19mod windows;
20
21#[cfg(unix)]
22mod unix;
23
24/// Providing implementations for [`AsyncRead`] and [`AsyncWrite`].
25#[derive(Debug)]
26pub struct AsyncFd<T: AsFd> {
27    inner: Attacher<T>,
28}
29
30impl<T: AsFd> AsyncFd<T> {
31    /// Create [`AsyncFd`] and attach the source to the current runtime.
32    pub fn new(source: T) -> io::Result<Self> {
33        Ok(Self {
34            inner: Attacher::new(source)?,
35        })
36    }
37
38    /// Create [`AsyncFd`] without attaching the source.
39    ///
40    /// # Safety
41    ///
42    /// * The user should handle the attachment correctly.
43    /// * `T` should be an owned fd.
44    pub unsafe fn new_unchecked(source: T) -> Self {
45        Self {
46            inner: unsafe { Attacher::new_unchecked(source) },
47        }
48    }
49}
50
51impl<T: AsFd + 'static> AsyncRead for AsyncFd<T> {
52    #[inline]
53    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
54        (&*self).read(buf).await
55    }
56
57    #[cfg(unix)]
58    #[inline]
59    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
60        (&*self).read_vectored(buf).await
61    }
62}
63
64impl<T: AsFd + 'static> AsyncRead for &AsyncFd<T> {
65    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
66        let fd = self.inner.to_shared_fd();
67        let op = Read::new(fd, buf);
68        let res = crate::submit(op).await.into_inner();
69        unsafe { res.map_advanced() }
70    }
71
72    #[cfg(unix)]
73    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
74        use compio_driver::op::VecBufResultExt;
75
76        let fd = self.inner.to_shared_fd();
77        let op = ReadVectored::new(fd, buf);
78        let res = crate::submit(op).await.into_inner();
79        unsafe { res.map_vec_advanced() }
80    }
81}
82
83impl<T: AsFd + 'static> AsyncReadManaged for AsyncFd<T> {
84    type Buffer = BufferRef;
85
86    async fn read_managed(&mut self, len: usize) -> io::Result<Option<Self::Buffer>> {
87        (&*self).read_managed(len).await
88    }
89}
90
91impl<T: AsFd + 'static> AsyncReadManaged for &AsyncFd<T> {
92    type Buffer = BufferRef;
93
94    async fn read_managed(&mut self, len: usize) -> io::Result<Option<Self::Buffer>> {
95        let runtime = Runtime::current();
96        let fd = self.to_shared_fd();
97        let op = ReadManaged::new(fd, &runtime.buffer_pool()?, len)?;
98        let res = runtime.submit(op).await;
99        unsafe { res.take_buffer() }
100    }
101}
102
103impl<T: AsFd + 'static> AsyncFd<T> {
104    /// Identical to [`AsyncReadMulti::read_multi`] but without borrowing `self`
105    /// mutably.
106    #[doc(hidden)]
107    pub fn read_multi_shared(&self, len: usize) -> impl Stream<Item = io::Result<BufferRef>> {
108        let fd = self.to_shared_fd();
109        read_multi(fd, len)
110    }
111}
112
113impl<T: AsFd + 'static> AsyncReadMulti for AsyncFd<T> {
114    fn read_multi(&mut self, len: usize) -> impl Stream<Item = io::Result<Self::Buffer>> {
115        let fd = self.to_shared_fd();
116        read_multi(fd, len)
117    }
118}
119
120impl<T: AsFd + 'static> AsyncReadMulti for &AsyncFd<T> {
121    fn read_multi(&mut self, len: usize) -> impl Stream<Item = io::Result<Self::Buffer>> {
122        let fd = self.to_shared_fd();
123        read_multi(fd, len)
124    }
125}
126
127fn read_multi<T: AsFd + 'static>(
128    fd: SharedFd<T>,
129    len: usize,
130) -> impl Stream<Item = io::Result<BufferRef>> {
131    let runtime = Runtime::current();
132    let pool = runtime.buffer_pool();
133    pool.and_then(|pool| Ok((ReadMulti::new(fd, &pool, len)?, pool)))
134        .map(|(op, pool)| runtime.submit_multi(op).into_managed(pool))
135        .map(Either::Left)
136        .unwrap_or_else(|e| Either::Right(futures_util::stream::once(std::future::ready(Err(e)))))
137}
138
139impl<T: AsFd + 'static> AsyncWrite for AsyncFd<T> {
140    #[inline]
141    async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
142        (&*self).write(buf).await
143    }
144
145    #[cfg(unix)]
146    #[inline]
147    async fn write_vectored<V: IoVectoredBuf>(&mut self, buf: V) -> BufResult<usize, V> {
148        (&*self).write_vectored(buf).await
149    }
150
151    #[inline]
152    async fn flush(&mut self) -> io::Result<()> {
153        (&*self).flush().await
154    }
155
156    #[inline]
157    async fn shutdown(&mut self) -> io::Result<()> {
158        (&*self).shutdown().await
159    }
160}
161
162impl<T: AsFd + 'static> AsyncWrite for &AsyncFd<T> {
163    async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
164        let fd = self.inner.to_shared_fd();
165        let op = Write::new(fd, buf);
166        crate::submit(op).await.into_inner()
167    }
168
169    #[cfg(unix)]
170    async fn write_vectored<V: IoVectoredBuf>(&mut self, buf: V) -> BufResult<usize, V> {
171        let fd = self.inner.to_shared_fd();
172        let op = WriteVectored::new(fd, buf);
173        crate::submit(op).await.into_inner()
174    }
175
176    async fn flush(&mut self) -> io::Result<()> {
177        Ok(())
178    }
179
180    async fn shutdown(&mut self) -> io::Result<()> {
181        Ok(())
182    }
183}
184
185impl<T: AsFd> IntoInner for AsyncFd<T> {
186    type Inner = SharedFd<T>;
187
188    fn into_inner(self) -> Self::Inner {
189        self.inner.into_inner()
190    }
191}
192
193impl<T: AsFd> AsFd for AsyncFd<T> {
194    fn as_fd(&self) -> BorrowedFd<'_> {
195        self.inner.as_fd()
196    }
197}
198
199impl<T: AsFd> AsRawFd for AsyncFd<T> {
200    fn as_raw_fd(&self) -> RawFd {
201        self.inner.as_fd().as_raw_fd()
202    }
203}
204
205impl<T: AsFd> ToSharedFd<T> for AsyncFd<T> {
206    fn to_shared_fd(&self) -> SharedFd<T> {
207        self.inner.to_shared_fd()
208    }
209}
210
211impl<T: AsFd> Clone for AsyncFd<T> {
212    fn clone(&self) -> Self {
213        Self {
214            inner: self.inner.clone(),
215        }
216    }
217}
218
219impl<T: AsFd> Deref for AsyncFd<T> {
220    type Target = T;
221
222    fn deref(&self) -> &Self::Target {
223        &self.inner
224    }
225}
226
227impl<T: AsFd> Splittable for AsyncFd<T> {
228    type ReadHalf = Self;
229    type WriteHalf = Self;
230
231    fn split(self) -> (Self::ReadHalf, Self::WriteHalf) {
232        (self.clone(), self)
233    }
234}