conch_runtime_pshaw/env/async_io/
tokio.rs1use crate::env::{AsyncIoEnvironment, SubEnvironment};
2use crate::io::FileDesc;
3use futures_core::future::BoxFuture;
4use std::borrow::Cow;
5use std::io;
6use tokio::io::{AsyncReadExt, AsyncWriteExt};
7
8#[derive(Default, Debug, Clone)]
11#[allow(missing_copy_implementations)]
12pub struct TokioAsyncIoEnv(());
13
14impl TokioAsyncIoEnv {
15 pub fn new() -> Self {
17 Self(())
18 }
19}
20
21impl SubEnvironment for TokioAsyncIoEnv {
22 fn sub_env(&self) -> Self {
23 self.clone()
24 }
25}
26
27enum AsyncIo {
28 #[cfg(unix)]
30 PollEvented(tokio::io::PollEvented<FileDesc>),
31 File(tokio::fs::File),
33}
34
35impl AsyncIo {
36 fn new(fd: FileDesc) -> Self {
37 match Self::try_as_evented(&fd) {
38 Some(io) => io,
39 None => AsyncIo::File(tokio::fs::File::from_std(convert_to_file(fd))),
40 }
41 }
42
43 #[cfg(not(unix))]
44 fn try_as_evented(_: &FileDesc) -> Option<Self> {
45 None
46 }
47
48 #[cfg(unix)]
49 fn try_as_evented(fd: &FileDesc) -> Option<Self> {
50 use crate::sys::cvt_r;
51 use std::mem;
52 use std::os::unix::io::AsRawFd;
53
54 #[cfg(not(linux))]
55 fn get_mode(fd: &FileDesc) -> io::Result<libc::mode_t> {
56 unsafe {
57 let mut stat: libc::stat = mem::zeroed();
58 cvt_r(|| libc::fstat(fd.as_raw_fd(), &mut stat)).map(|_| stat.st_mode)
59 }
60 }
61
62 #[cfg(linux)]
63 fn get_mode(fd: &FileDesc) -> Result<libc::mode_t> {
64 unsafe {
65 let mut stat: libc::stat64 = mem::zeroed();
66 cvt_r(|| libc::fstat64(fd.as_raw_fd(), &mut stat)).map(|_| stat.st_mode)
67 }
68 }
69
70 let supports_evented_io = get_mode(&fd)
71 .map(|mode| mode & libc::S_IFMT == libc::S_IFREG)
72 .map(|is_regular_file| !is_regular_file);
73
74 match supports_evented_io {
75 Ok(true) => fd
76 .duplicate()
77 .and_then(|mut fd| {
78 fd.set_nonblock(true)?;
79 tokio::io::PollEvented::new(fd)
80 })
81 .map(AsyncIo::PollEvented)
82 .ok(),
83
84 _ => None,
85 }
86 }
87}
88
89async fn do_write_all(fd: FileDesc, data: Cow<'_, [u8]>) -> io::Result<()> {
90 match AsyncIo::new(fd) {
91 #[cfg(unix)]
92 AsyncIo::PollEvented(mut fd) => fd.write_all(&*data).await,
93 AsyncIo::File(mut fd) => fd.write_all(&*data).await,
94 }
95}
96
97impl AsyncIoEnvironment for TokioAsyncIoEnv {
98 type IoHandle = FileDesc;
99
100 fn read_all(&mut self, fd: Self::IoHandle) -> BoxFuture<'static, io::Result<Vec<u8>>> {
101 Box::pin(async {
102 let mut data = Vec::new();
103
104 let _read = match AsyncIo::new(fd) {
105 #[cfg(unix)]
106 AsyncIo::PollEvented(mut fd) => fd.read_to_end(&mut data).await?,
107 AsyncIo::File(mut fd) => fd.read_to_end(&mut data).await?,
108 };
109
110 Ok(data)
111 })
112 }
113
114 fn write_all<'a>(
115 &mut self,
116 fd: Self::IoHandle,
117 data: Cow<'a, [u8]>,
118 ) -> BoxFuture<'a, io::Result<()>> {
119 Box::pin(do_write_all(fd, data))
120 }
121
122 fn write_all_best_effort(&mut self, fd: Self::IoHandle, data: Vec<u8>) {
123 let _ = tokio::spawn(async move {
124 let _ = do_write_all(fd, Cow::Owned(data)).await;
125 });
126 }
127}
128
129#[cfg(unix)]
130fn convert_to_file(fd: FileDesc) -> std::fs::File {
131 use std::os::unix::io::{FromRawFd, IntoRawFd};
132
133 unsafe { FromRawFd::from_raw_fd(fd.into_raw_fd()) }
134}
135
136#[cfg(windows)]
137fn convert_to_file(fd: FileDesc) -> std::fs::File {
138 use std::os::windows::io::{FromRawHandle, IntoRawHandle};
139
140 unsafe { FromRawHandle::from_raw_handle(fd.into_raw_handle()) }
141}