conch_runtime_pshaw/env/async_io/
tokio.rs

1use crate::env::{AsyncIoEnvironment, SubEnvironment};
2use crate::io::FileDesc;
3use futures_core::future::BoxFuture;
4use std::borrow::Cow;
5use std::io;
6use tokio::io::{AsyncReadExt, AsyncWriteExt};
7
8/// An environment implementation which leverages Tokio formanages async
9/// operations on file descriptor handles.
10#[derive(Default, Debug, Clone)]
11#[allow(missing_copy_implementations)]
12pub struct TokioAsyncIoEnv(());
13
14impl TokioAsyncIoEnv {
15    /// Create a new environment which always uses the default runtime.
16    pub fn new() -> Self {
17        Self(())
18    }
19}
20
21impl SubEnvironment for TokioAsyncIoEnv {
22    fn sub_env(&self) -> Self {
23        self.clone()
24    }
25}
26
27enum AsyncIo {
28    /// An evented file descriptor registered with tokio.
29    #[cfg(unix)]
30    PollEvented(tokio::io::PollEvented<FileDesc>),
31    /// Evented IO not supported, use a blocking operation
32    File(tokio::fs::File),
33}
34
35impl AsyncIo {
36    fn new(fd: FileDesc) -> Self {
37        match Self::try_as_evented(&fd) {
38            Some(io) => io,
39            None => AsyncIo::File(tokio::fs::File::from_std(convert_to_file(fd))),
40        }
41    }
42
43    #[cfg(not(unix))]
44    fn try_as_evented(_: &FileDesc) -> Option<Self> {
45        None
46    }
47
48    #[cfg(unix)]
49    fn try_as_evented(fd: &FileDesc) -> Option<Self> {
50        use crate::sys::cvt_r;
51        use std::mem;
52        use std::os::unix::io::AsRawFd;
53
54        #[cfg(not(linux))]
55        fn get_mode(fd: &FileDesc) -> io::Result<libc::mode_t> {
56            unsafe {
57                let mut stat: libc::stat = mem::zeroed();
58                cvt_r(|| libc::fstat(fd.as_raw_fd(), &mut stat)).map(|_| stat.st_mode)
59            }
60        }
61
62        #[cfg(linux)]
63        fn get_mode(fd: &FileDesc) -> Result<libc::mode_t> {
64            unsafe {
65                let mut stat: libc::stat64 = mem::zeroed();
66                cvt_r(|| libc::fstat64(fd.as_raw_fd(), &mut stat)).map(|_| stat.st_mode)
67            }
68        }
69
70        let supports_evented_io = get_mode(&fd)
71            .map(|mode| mode & libc::S_IFMT == libc::S_IFREG)
72            .map(|is_regular_file| !is_regular_file);
73
74        match supports_evented_io {
75            Ok(true) => fd
76                .duplicate()
77                .and_then(|mut fd| {
78                    fd.set_nonblock(true)?;
79                    tokio::io::PollEvented::new(fd)
80                })
81                .map(AsyncIo::PollEvented)
82                .ok(),
83
84            _ => None,
85        }
86    }
87}
88
89async fn do_write_all(fd: FileDesc, data: Cow<'_, [u8]>) -> io::Result<()> {
90    match AsyncIo::new(fd) {
91        #[cfg(unix)]
92        AsyncIo::PollEvented(mut fd) => fd.write_all(&*data).await,
93        AsyncIo::File(mut fd) => fd.write_all(&*data).await,
94    }
95}
96
97impl AsyncIoEnvironment for TokioAsyncIoEnv {
98    type IoHandle = FileDesc;
99
100    fn read_all(&mut self, fd: Self::IoHandle) -> BoxFuture<'static, io::Result<Vec<u8>>> {
101        Box::pin(async {
102            let mut data = Vec::new();
103
104            let _read = match AsyncIo::new(fd) {
105                #[cfg(unix)]
106                AsyncIo::PollEvented(mut fd) => fd.read_to_end(&mut data).await?,
107                AsyncIo::File(mut fd) => fd.read_to_end(&mut data).await?,
108            };
109
110            Ok(data)
111        })
112    }
113
114    fn write_all<'a>(
115        &mut self,
116        fd: Self::IoHandle,
117        data: Cow<'a, [u8]>,
118    ) -> BoxFuture<'a, io::Result<()>> {
119        Box::pin(do_write_all(fd, data))
120    }
121
122    fn write_all_best_effort(&mut self, fd: Self::IoHandle, data: Vec<u8>) {
123        let _ = tokio::spawn(async move {
124            let _ = do_write_all(fd, Cow::Owned(data)).await;
125        });
126    }
127}
128
129#[cfg(unix)]
130fn convert_to_file(fd: FileDesc) -> std::fs::File {
131    use std::os::unix::io::{FromRawFd, IntoRawFd};
132
133    unsafe { FromRawFd::from_raw_fd(fd.into_raw_fd()) }
134}
135
136#[cfg(windows)]
137fn convert_to_file(fd: FileDesc) -> std::fs::File {
138    use std::os::windows::io::{FromRawHandle, IntoRawHandle};
139
140    unsafe { FromRawHandle::from_raw_handle(fd.into_raw_handle()) }
141}